diff options
author | Matt Joiner <anacrolix@gmail.com> | 2020-09-04 15:44:51 +1000 |
---|---|---|
committer | Matt Joiner <anacrolix@gmail.com> | 2020-09-10 09:22:35 +1000 |
commit | 2a64b89bdbc8fec645dd688bd0322f45c067f295 (patch) | |
tree | 6e1e7d5606d3a16a77ccdd47eb8654296862acf7 | |
parent | Add exponentially longer sleeping between transaction attempts (diff) | |
download | stm-2a64b89bdbc8fec645dd688bd0322f45c067f295.tar.gz stm-2a64b89bdbc8fec645dd688bd0322f45c067f295.tar.xz |
Add custom VarValue and const for sleep backoff
-rw-r--r-- | bench_test.go | 6 | ||||
-rw-r--r-- | funcs.go | 31 | ||||
-rw-r--r-- | stm_test.go | 2 | ||||
-rw-r--r-- | tx.go | 16 | ||||
-rw-r--r-- | var-value.go | 51 | ||||
-rw-r--r-- | var.go | 28 |
6 files changed, 101 insertions, 33 deletions
diff --git a/bench_test.go b/bench_test.go index 08faa96..da2bf97 100644 --- a/bench_test.go +++ b/bench_test.go @@ -199,9 +199,9 @@ func BenchmarkThunderingHerdCondVar(b *testing.B) { func BenchmarkThunderingHerd(b *testing.B) { for i := 0; i < b.N; i++ { - done := NewVar(false) - tokens := NewVar(0) - pending := NewVar(0) + done := NewBuiltinEqVar(false) + tokens := NewBuiltinEqVar(0) + pending := NewBuiltinEqVar(0) for range iter.N(1000) { Atomically(VoidOperation(func(tx *Tx) { tx.Set(pending, tx.Get(pending).(int)+1) @@ -11,7 +11,7 @@ var ( txPool = sync.Pool{New: func() interface{} { expvars.Add("new txs", 1) tx := &Tx{ - reads: make(map[*Var]uint64), + reads: make(map[*Var]VarValue), writes: make(map[*Var]interface{}), watching: make(map[*Var]struct{}), } @@ -21,7 +21,10 @@ var ( failedCommitsProfile *pprof.Profile ) -const profileFailedCommits = false +const ( + profileFailedCommits = false + sleepBetweenRetries = false +) func init() { if profileFailedCommits { @@ -49,16 +52,18 @@ func Atomically(op Operation) interface{} { retry: tx.tries++ tx.reset() - shift := int64(tx.tries - 1) - const maxShift = 30 - if shift > maxShift { - shift = maxShift - } - ns := int64(1) << shift - ns = rand.Int63n(ns) - if ns > 0 { - tx.updateWatchers() - time.Sleep(time.Duration(ns)) + if sleepBetweenRetries { + shift := int64(tx.tries - 1) + const maxShift = 30 + if shift > maxShift { + shift = maxShift + } + ns := int64(1) << shift + ns = rand.Int63n(ns) + if ns > 0 { + tx.updateWatchers() + time.Sleep(time.Duration(ns)) + } } ret, retry := catchRetry(op, tx) if retry { @@ -87,7 +92,7 @@ retry: // AtomicGet is a helper function that atomically reads a value. func AtomicGet(v *Var) interface{} { - return v.loadState().val + return v.value.Load().(VarValue).Get() } // AtomicSet is a helper function that atomically writes a value. diff --git a/stm_test.go b/stm_test.go index 519415a..8526139 100644 --- a/stm_test.go +++ b/stm_test.go @@ -205,7 +205,7 @@ func TestAtomicSetRetry(t *testing.T) { } func testPingPong(t testing.TB, n int, afterHit func(string)) { - ball := NewVar(false) + ball := NewBuiltinEqVar(false) doneVar := NewVar(false) hits := NewVar(0) ready := NewVar(true) // The ball is ready for hitting. @@ -9,7 +9,7 @@ import ( // A Tx represents an atomic transaction. type Tx struct { - reads map[*Var]uint64 + reads map[*Var]VarValue writes map[*Var]interface{} watching map[*Var]struct{} locks txLocks @@ -20,9 +20,8 @@ type Tx struct { // Check that none of the logged values have changed since the transaction began. func (tx *Tx) verify() bool { - for v, version := range tx.reads { - changed := v.loadState().version != version - if changed { + for v, read := range tx.reads { + if read.Changed(v.value.Load().(VarValue)) { return false } } @@ -76,12 +75,13 @@ func (tx *Tx) Get(v *Var) interface{} { if val, ok := tx.writes[v]; ok { return val } - state := v.loadState() // If we haven't previously read v, record its version - if _, ok := tx.reads[v]; !ok { - tx.reads[v] = state.version + vv, ok := tx.reads[v] + if !ok { + vv = v.value.Load().(VarValue) + tx.reads[v] = vv } - return state.val + return vv.Get() } // Set sets the value of a Var for the lifetime of the transaction. diff --git a/var-value.go b/var-value.go new file mode 100644 index 0000000..ff97104 --- /dev/null +++ b/var-value.go @@ -0,0 +1,51 @@ +package stm + +type VarValue interface { + Set(interface{}) VarValue + Get() interface{} + Changed(VarValue) bool +} + +type version uint64 + +type versionedValue struct { + value interface{} + version version +} + +func (me versionedValue) Set(newValue interface{}) VarValue { + return versionedValue{ + value: newValue, + version: me.version + 1, + } +} + +func (me versionedValue) Get() interface{} { + return me.value +} + +func (me versionedValue) Changed(other VarValue) bool { + return me.version != other.(versionedValue).version +} + +type customVarValue struct { + value interface{} + changed func(interface{}, interface{}) bool +} + +var _ VarValue = customVarValue{} + +func (me customVarValue) Changed(other VarValue) bool { + return me.changed(me.value, other.(customVarValue).value) +} + +func (me customVarValue) Set(newValue interface{}) VarValue { + return customVarValue{ + value: newValue, + changed: me.changed, + } +} + +func (me customVarValue) Get() interface{} { + return me.value +} @@ -7,18 +7,13 @@ import ( // Holds an STM variable. type Var struct { - state atomic.Value + value atomic.Value watchers sync.Map mu sync.Mutex } -func (v *Var) loadState() varSnapshot { - return v.state.Load().(varSnapshot) -} - func (v *Var) changeValue(new interface{}) { - version := v.loadState().version - v.state.Store(varSnapshot{version: version + 1, val: new}) + v.value.Store(v.value.Load().(VarValue).Set(new)) v.wakeWatchers() } @@ -40,6 +35,23 @@ type varSnapshot struct { // Returns a new STM variable. func NewVar(val interface{}) *Var { v := &Var{} - v.state.Store(varSnapshot{version: 0, val: val}) + v.value.Store(versionedValue{ + value: val, + }) return v } + +func NewCustomVar(val interface{}, changed func(interface{}, interface{}) bool) *Var { + v := &Var{} + v.value.Store(customVarValue{ + value: val, + changed: changed, + }) + return v +} + +func NewBuiltinEqVar(val interface{}) *Var { + return NewCustomVar(val, func(a, b interface{}) bool { + return a != b + }) +} |