diff options
author | Matt Joiner <anacrolix@gmail.com> | 2020-09-30 19:13:27 +1000 |
---|---|---|
committer | Matt Joiner <anacrolix@gmail.com> | 2020-09-30 19:13:27 +1000 |
commit | a3e788d52494b75b927538c9c2bfee1573aa50e1 (patch) | |
tree | f1eed1cda0c25008c51b375799b9d65a33ce080d | |
parent | Don't sleep and only wake watchers if the variable value has changed (diff) | |
download | stm-a3e788d52494b75b927538c9c2bfee1573aa50e1.tar.gz stm-a3e788d52494b75b927538c9c2bfee1573aa50e1.tar.xz |
Wake watchers until the var changes again
-rw-r--r-- | funcs.go | 6 | ||||
-rw-r--r-- | stm_test.go | 6 | ||||
-rw-r--r-- | tx.go | 27 | ||||
-rw-r--r-- | var.go | 10 |
4 files changed, 30 insertions, 19 deletions
@@ -74,7 +74,7 @@ retry: } // verify the read log tx.lockAllVars() - if !tx.verify() { + if tx.inputsChanged() { tx.unlock() expvars.Add("failed commits", 1) if profileFailedCommits { @@ -84,6 +84,10 @@ retry: } // commit the write log and broadcast that variables have changed tx.commit() + tx.mu.Lock() + tx.completed = true + tx.cond.Broadcast() + tx.mu.Unlock() tx.unlock() expvars.Add("commits", 1) tx.recycle() diff --git a/stm_test.go b/stm_test.go index 8526139..d6e4475 100644 --- a/stm_test.go +++ b/stm_test.go @@ -32,7 +32,7 @@ func TestDecrement(t *testing.T) { } } -// read-only transaction aren't exempt from calling tx.verify +// read-only transaction aren't exempt from calling tx.inputsChanged func TestReadVerify(t *testing.T) { read := make(chan struct{}) x, y := NewVar(1), NewVar(2) @@ -89,7 +89,7 @@ func TestRetry(t *testing.T) { } func TestVerify(t *testing.T) { - // tx.verify should check more than pointer equality + // tx.inputsChanged should check more than pointer equality type foo struct { i int } @@ -119,7 +119,7 @@ func TestVerify(t *testing.T) { <-read // wait for other tx to complete })) if i == 3 { - t.Fatal("verify did not retry despite modified Var", i) + t.Fatal("inputsChanged did not retry despite modified Var", i) } } @@ -9,23 +9,25 @@ import ( // A Tx represents an atomic transaction. type Tx struct { - reads map[*Var]VarValue - writes map[*Var]interface{} - watching map[*Var]struct{} - locks txLocks - mu sync.Mutex - cond sync.Cond - tries int + reads map[*Var]VarValue + writes map[*Var]interface{} + watching map[*Var]struct{} + locks txLocks + mu sync.Mutex + cond sync.Cond + waiting bool + completed bool + tries int } // Check that none of the logged values have changed since the transaction began. -func (tx *Tx) verify() bool { +func (tx *Tx) inputsChanged() bool { for v, read := range tx.reads { if read.Changed(v.value.Load().(VarValue)) { - return false + return true } } - return true + return false } // Writes the values in the transaction log to their respective Vars. @@ -58,12 +60,15 @@ func (tx *Tx) wait() { tx.updateWatchers() tx.mu.Lock() firstWait := true - for tx.verify() { + for !tx.inputsChanged() { if !firstWait { expvars.Add("wakes for unchanged versions", 1) } expvars.Add("waits", 1) + tx.waiting = true + tx.cond.Broadcast() tx.cond.Wait() + tx.waiting = false firstWait = false } tx.mu.Unlock() @@ -17,20 +17,22 @@ func (v *Var) changeValue(new interface{}) { newVarValue := old.Set(new) v.value.Store(newVarValue) if old.Changed(newVarValue) { - v.wakeWatchers() + go v.wakeWatchers(newVarValue) } } -func (v *Var) wakeWatchers() { +func (v *Var) wakeWatchers(new VarValue) { v.watchers.Range(func(k, _ interface{}) bool { tx := k.(*Tx) - // We have to lock here to ensure that the Tx is waiting before we signal it. Otherwise we // could signal it before it goes to sleep and it will miss the notification. tx.mu.Lock() tx.cond.Broadcast() + for !tx.waiting && !tx.completed { + tx.cond.Wait() + } tx.mu.Unlock() - return true + return !v.value.Load().(VarValue).Changed(new) }) } |