aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--funcs.go5
-rw-r--r--tx.go34
2 files changed, 27 insertions, 12 deletions
diff --git a/funcs.go b/funcs.go
index b13a917..aad9c90 100644
--- a/funcs.go
+++ b/funcs.go
@@ -9,8 +9,9 @@ var (
txPool = sync.Pool{New: func() interface{} {
expvars.Add("new txs", 1)
tx := &Tx{
- reads: make(map[*Var]uint64),
- writes: make(map[*Var]interface{}),
+ reads: make(map[*Var]uint64),
+ writes: make(map[*Var]interface{}),
+ watching: make(map[*Var]struct{}),
}
tx.cond.L = &tx.mu
return tx
diff --git a/tx.go b/tx.go
index 2d16549..0f8a449 100644
--- a/tx.go
+++ b/tx.go
@@ -9,11 +9,12 @@ import (
// A Tx represents an atomic transaction.
type Tx struct {
- reads map[*Var]uint64
- writes map[*Var]interface{}
- locks txLocks
- mu sync.Mutex
- cond sync.Cond
+ reads map[*Var]uint64
+ writes map[*Var]interface{}
+ watching map[*Var]struct{}
+ locks txLocks
+ mu sync.Mutex
+ cond sync.Cond
}
// Check that none of the logged values have changed since the transaction began.
@@ -39,8 +40,17 @@ func (tx *Tx) wait() {
if len(tx.reads) == 0 {
panic("not waiting on anything")
}
+ for v := range tx.watching {
+ if _, ok := tx.reads[v]; !ok {
+ delete(tx.watching, v)
+ v.watchers.Delete(tx)
+ }
+ }
for v := range tx.reads {
- v.watchers.Store(tx, nil)
+ if _, ok := tx.watching[v]; !ok {
+ v.watchers.Store(tx, nil)
+ tx.watching[v] = struct{}{}
+ }
}
tx.mu.Lock()
firstWait := true
@@ -53,9 +63,9 @@ func (tx *Tx) wait() {
firstWait = false
}
tx.mu.Unlock()
- for v := range tx.reads {
- v.watchers.Delete(tx)
- }
+ //for v := range tx.reads {
+ // v.watchers.Delete(tx)
+ //}
}
// Get returns the value of v as of the start of the transaction.
@@ -104,6 +114,10 @@ func (tx *Tx) reset() {
}
func (tx *Tx) recycle() {
+ for v := range tx.watching {
+ delete(tx.watching, v)
+ v.watchers.Delete(tx)
+ }
txPool.Put(tx)
}
@@ -134,7 +148,7 @@ func (tx *Tx) collectAllLocks() {
}
func (tx *Tx) sortLocks() {
- sort.Sort(tx.locks)
+ sort.Sort(&tx.locks)
}
func (tx *Tx) lock() {