aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Joiner <anacrolix@gmail.com>2020-09-30 19:13:27 +1000
committerMatt Joiner <anacrolix@gmail.com>2020-09-30 19:13:27 +1000
commita3e788d52494b75b927538c9c2bfee1573aa50e1 (patch)
treef1eed1cda0c25008c51b375799b9d65a33ce080d
parentDon't sleep and only wake watchers if the variable value has changed (diff)
downloadstm-a3e788d52494b75b927538c9c2bfee1573aa50e1.tar.gz
stm-a3e788d52494b75b927538c9c2bfee1573aa50e1.tar.xz
Wake watchers until the var changes again
-rw-r--r--funcs.go6
-rw-r--r--stm_test.go6
-rw-r--r--tx.go27
-rw-r--r--var.go10
4 files changed, 30 insertions, 19 deletions
diff --git a/funcs.go b/funcs.go
index 59c0e74..b8c8464 100644
--- a/funcs.go
+++ b/funcs.go
@@ -74,7 +74,7 @@ retry:
}
// verify the read log
tx.lockAllVars()
- if !tx.verify() {
+ if tx.inputsChanged() {
tx.unlock()
expvars.Add("failed commits", 1)
if profileFailedCommits {
@@ -84,6 +84,10 @@ retry:
}
// commit the write log and broadcast that variables have changed
tx.commit()
+ tx.mu.Lock()
+ tx.completed = true
+ tx.cond.Broadcast()
+ tx.mu.Unlock()
tx.unlock()
expvars.Add("commits", 1)
tx.recycle()
diff --git a/stm_test.go b/stm_test.go
index 8526139..d6e4475 100644
--- a/stm_test.go
+++ b/stm_test.go
@@ -32,7 +32,7 @@ func TestDecrement(t *testing.T) {
}
}
-// read-only transaction aren't exempt from calling tx.verify
+// read-only transaction aren't exempt from calling tx.inputsChanged
func TestReadVerify(t *testing.T) {
read := make(chan struct{})
x, y := NewVar(1), NewVar(2)
@@ -89,7 +89,7 @@ func TestRetry(t *testing.T) {
}
func TestVerify(t *testing.T) {
- // tx.verify should check more than pointer equality
+ // tx.inputsChanged should check more than pointer equality
type foo struct {
i int
}
@@ -119,7 +119,7 @@ func TestVerify(t *testing.T) {
<-read // wait for other tx to complete
}))
if i == 3 {
- t.Fatal("verify did not retry despite modified Var", i)
+ t.Fatal("inputsChanged did not retry despite modified Var", i)
}
}
diff --git a/tx.go b/tx.go
index f3bc617..f613efb 100644
--- a/tx.go
+++ b/tx.go
@@ -9,23 +9,25 @@ import (
// A Tx represents an atomic transaction.
type Tx struct {
- reads map[*Var]VarValue
- writes map[*Var]interface{}
- watching map[*Var]struct{}
- locks txLocks
- mu sync.Mutex
- cond sync.Cond
- tries int
+ reads map[*Var]VarValue
+ writes map[*Var]interface{}
+ watching map[*Var]struct{}
+ locks txLocks
+ mu sync.Mutex
+ cond sync.Cond
+ waiting bool
+ completed bool
+ tries int
}
// Check that none of the logged values have changed since the transaction began.
-func (tx *Tx) verify() bool {
+func (tx *Tx) inputsChanged() bool {
for v, read := range tx.reads {
if read.Changed(v.value.Load().(VarValue)) {
- return false
+ return true
}
}
- return true
+ return false
}
// Writes the values in the transaction log to their respective Vars.
@@ -58,12 +60,15 @@ func (tx *Tx) wait() {
tx.updateWatchers()
tx.mu.Lock()
firstWait := true
- for tx.verify() {
+ for !tx.inputsChanged() {
if !firstWait {
expvars.Add("wakes for unchanged versions", 1)
}
expvars.Add("waits", 1)
+ tx.waiting = true
+ tx.cond.Broadcast()
tx.cond.Wait()
+ tx.waiting = false
firstWait = false
}
tx.mu.Unlock()
diff --git a/var.go b/var.go
index 7e50cac..f3b8d40 100644
--- a/var.go
+++ b/var.go
@@ -17,20 +17,22 @@ func (v *Var) changeValue(new interface{}) {
newVarValue := old.Set(new)
v.value.Store(newVarValue)
if old.Changed(newVarValue) {
- v.wakeWatchers()
+ go v.wakeWatchers(newVarValue)
}
}
-func (v *Var) wakeWatchers() {
+func (v *Var) wakeWatchers(new VarValue) {
v.watchers.Range(func(k, _ interface{}) bool {
tx := k.(*Tx)
-
// We have to lock here to ensure that the Tx is waiting before we signal it. Otherwise we
// could signal it before it goes to sleep and it will miss the notification.
tx.mu.Lock()
tx.cond.Broadcast()
+ for !tx.waiting && !tx.completed {
+ tx.cond.Wait()
+ }
tx.mu.Unlock()
- return true
+ return !v.value.Load().(VarValue).Changed(new)
})
}