aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Joiner <anacrolix@gmail.com>2020-09-04 15:44:51 +1000
committerMatt Joiner <anacrolix@gmail.com>2020-09-10 09:22:35 +1000
commit2a64b89bdbc8fec645dd688bd0322f45c067f295 (patch)
tree6e1e7d5606d3a16a77ccdd47eb8654296862acf7
parentAdd exponentially longer sleeping between transaction attempts (diff)
downloadstm-2a64b89bdbc8fec645dd688bd0322f45c067f295.tar.gz
stm-2a64b89bdbc8fec645dd688bd0322f45c067f295.tar.xz
Add custom VarValue and const for sleep backoff
-rw-r--r--bench_test.go6
-rw-r--r--funcs.go31
-rw-r--r--stm_test.go2
-rw-r--r--tx.go16
-rw-r--r--var-value.go51
-rw-r--r--var.go28
6 files changed, 101 insertions, 33 deletions
diff --git a/bench_test.go b/bench_test.go
index 08faa96..da2bf97 100644
--- a/bench_test.go
+++ b/bench_test.go
@@ -199,9 +199,9 @@ func BenchmarkThunderingHerdCondVar(b *testing.B) {
func BenchmarkThunderingHerd(b *testing.B) {
for i := 0; i < b.N; i++ {
- done := NewVar(false)
- tokens := NewVar(0)
- pending := NewVar(0)
+ done := NewBuiltinEqVar(false)
+ tokens := NewBuiltinEqVar(0)
+ pending := NewBuiltinEqVar(0)
for range iter.N(1000) {
Atomically(VoidOperation(func(tx *Tx) {
tx.Set(pending, tx.Get(pending).(int)+1)
diff --git a/funcs.go b/funcs.go
index 650a559..2e2ad26 100644
--- a/funcs.go
+++ b/funcs.go
@@ -11,7 +11,7 @@ var (
txPool = sync.Pool{New: func() interface{} {
expvars.Add("new txs", 1)
tx := &Tx{
- reads: make(map[*Var]uint64),
+ reads: make(map[*Var]VarValue),
writes: make(map[*Var]interface{}),
watching: make(map[*Var]struct{}),
}
@@ -21,7 +21,10 @@ var (
failedCommitsProfile *pprof.Profile
)
-const profileFailedCommits = false
+const (
+ profileFailedCommits = false
+ sleepBetweenRetries = false
+)
func init() {
if profileFailedCommits {
@@ -49,16 +52,18 @@ func Atomically(op Operation) interface{} {
retry:
tx.tries++
tx.reset()
- shift := int64(tx.tries - 1)
- const maxShift = 30
- if shift > maxShift {
- shift = maxShift
- }
- ns := int64(1) << shift
- ns = rand.Int63n(ns)
- if ns > 0 {
- tx.updateWatchers()
- time.Sleep(time.Duration(ns))
+ if sleepBetweenRetries {
+ shift := int64(tx.tries - 1)
+ const maxShift = 30
+ if shift > maxShift {
+ shift = maxShift
+ }
+ ns := int64(1) << shift
+ ns = rand.Int63n(ns)
+ if ns > 0 {
+ tx.updateWatchers()
+ time.Sleep(time.Duration(ns))
+ }
}
ret, retry := catchRetry(op, tx)
if retry {
@@ -87,7 +92,7 @@ retry:
// AtomicGet is a helper function that atomically reads a value.
func AtomicGet(v *Var) interface{} {
- return v.loadState().val
+ return v.value.Load().(VarValue).Get()
}
// AtomicSet is a helper function that atomically writes a value.
diff --git a/stm_test.go b/stm_test.go
index 519415a..8526139 100644
--- a/stm_test.go
+++ b/stm_test.go
@@ -205,7 +205,7 @@ func TestAtomicSetRetry(t *testing.T) {
}
func testPingPong(t testing.TB, n int, afterHit func(string)) {
- ball := NewVar(false)
+ ball := NewBuiltinEqVar(false)
doneVar := NewVar(false)
hits := NewVar(0)
ready := NewVar(true) // The ball is ready for hitting.
diff --git a/tx.go b/tx.go
index 6e8b06f..f3bc617 100644
--- a/tx.go
+++ b/tx.go
@@ -9,7 +9,7 @@ import (
// A Tx represents an atomic transaction.
type Tx struct {
- reads map[*Var]uint64
+ reads map[*Var]VarValue
writes map[*Var]interface{}
watching map[*Var]struct{}
locks txLocks
@@ -20,9 +20,8 @@ type Tx struct {
// Check that none of the logged values have changed since the transaction began.
func (tx *Tx) verify() bool {
- for v, version := range tx.reads {
- changed := v.loadState().version != version
- if changed {
+ for v, read := range tx.reads {
+ if read.Changed(v.value.Load().(VarValue)) {
return false
}
}
@@ -76,12 +75,13 @@ func (tx *Tx) Get(v *Var) interface{} {
if val, ok := tx.writes[v]; ok {
return val
}
- state := v.loadState()
// If we haven't previously read v, record its version
- if _, ok := tx.reads[v]; !ok {
- tx.reads[v] = state.version
+ vv, ok := tx.reads[v]
+ if !ok {
+ vv = v.value.Load().(VarValue)
+ tx.reads[v] = vv
}
- return state.val
+ return vv.Get()
}
// Set sets the value of a Var for the lifetime of the transaction.
diff --git a/var-value.go b/var-value.go
new file mode 100644
index 0000000..ff97104
--- /dev/null
+++ b/var-value.go
@@ -0,0 +1,51 @@
+package stm
+
+type VarValue interface {
+ Set(interface{}) VarValue
+ Get() interface{}
+ Changed(VarValue) bool
+}
+
+type version uint64
+
+type versionedValue struct {
+ value interface{}
+ version version
+}
+
+func (me versionedValue) Set(newValue interface{}) VarValue {
+ return versionedValue{
+ value: newValue,
+ version: me.version + 1,
+ }
+}
+
+func (me versionedValue) Get() interface{} {
+ return me.value
+}
+
+func (me versionedValue) Changed(other VarValue) bool {
+ return me.version != other.(versionedValue).version
+}
+
+type customVarValue struct {
+ value interface{}
+ changed func(interface{}, interface{}) bool
+}
+
+var _ VarValue = customVarValue{}
+
+func (me customVarValue) Changed(other VarValue) bool {
+ return me.changed(me.value, other.(customVarValue).value)
+}
+
+func (me customVarValue) Set(newValue interface{}) VarValue {
+ return customVarValue{
+ value: newValue,
+ changed: me.changed,
+ }
+}
+
+func (me customVarValue) Get() interface{} {
+ return me.value
+}
diff --git a/var.go b/var.go
index facad56..cdf9667 100644
--- a/var.go
+++ b/var.go
@@ -7,18 +7,13 @@ import (
// Holds an STM variable.
type Var struct {
- state atomic.Value
+ value atomic.Value
watchers sync.Map
mu sync.Mutex
}
-func (v *Var) loadState() varSnapshot {
- return v.state.Load().(varSnapshot)
-}
-
func (v *Var) changeValue(new interface{}) {
- version := v.loadState().version
- v.state.Store(varSnapshot{version: version + 1, val: new})
+ v.value.Store(v.value.Load().(VarValue).Set(new))
v.wakeWatchers()
}
@@ -40,6 +35,23 @@ type varSnapshot struct {
// Returns a new STM variable.
func NewVar(val interface{}) *Var {
v := &Var{}
- v.state.Store(varSnapshot{version: 0, val: val})
+ v.value.Store(versionedValue{
+ value: val,
+ })
return v
}
+
+func NewCustomVar(val interface{}, changed func(interface{}, interface{}) bool) *Var {
+ v := &Var{}
+ v.value.Store(customVarValue{
+ value: val,
+ changed: changed,
+ })
+ return v
+}
+
+func NewBuiltinEqVar(val interface{}) *Var {
+ return NewCustomVar(val, func(a, b interface{}) bool {
+ return a != b
+ })
+}