aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--funcs.go15
-rw-r--r--tx.go18
2 files changed, 25 insertions, 8 deletions
diff --git a/funcs.go b/funcs.go
index aad9c90..650a559 100644
--- a/funcs.go
+++ b/funcs.go
@@ -1,8 +1,10 @@
package stm
import (
+ "math/rand"
"runtime/pprof"
"sync"
+ "time"
)
var (
@@ -43,8 +45,21 @@ func Atomically(op Operation) interface{} {
expvars.Add("atomically", 1)
// run the transaction
tx := newTx()
+ tx.tries = 0
retry:
+ tx.tries++
tx.reset()
+ shift := int64(tx.tries - 1)
+ const maxShift = 30
+ if shift > maxShift {
+ shift = maxShift
+ }
+ ns := int64(1) << shift
+ ns = rand.Int63n(ns)
+ if ns > 0 {
+ tx.updateWatchers()
+ time.Sleep(time.Duration(ns))
+ }
ret, retry := catchRetry(op, tx)
if retry {
expvars.Add("retries", 1)
diff --git a/tx.go b/tx.go
index 0f8a449..6e8b06f 100644
--- a/tx.go
+++ b/tx.go
@@ -15,6 +15,7 @@ type Tx struct {
locks txLocks
mu sync.Mutex
cond sync.Cond
+ tries int
}
// Check that none of the logged values have changed since the transaction began.
@@ -35,11 +36,7 @@ func (tx *Tx) commit() {
}
}
-// wait blocks until another transaction modifies any of the Vars read by tx.
-func (tx *Tx) wait() {
- if len(tx.reads) == 0 {
- panic("not waiting on anything")
- }
+func (tx *Tx) updateWatchers() {
for v := range tx.watching {
if _, ok := tx.reads[v]; !ok {
delete(tx.watching, v)
@@ -52,6 +49,14 @@ func (tx *Tx) wait() {
tx.watching[v] = struct{}{}
}
}
+}
+
+// wait blocks until another transaction modifies any of the Vars read by tx.
+func (tx *Tx) wait() {
+ if len(tx.reads) == 0 {
+ panic("not waiting on anything")
+ }
+ tx.updateWatchers()
tx.mu.Lock()
firstWait := true
for tx.verify() {
@@ -63,9 +68,6 @@ func (tx *Tx) wait() {
firstWait = false
}
tx.mu.Unlock()
- //for v := range tx.reads {
- // v.watchers.Delete(tx)
- //}
}
// Get returns the value of v as of the start of the transaction.