aboutsummaryrefslogtreecommitdiff
path: root/tx.go
diff options
context:
space:
mode:
Diffstat (limited to 'tx.go')
-rw-r--r--tx.go34
1 files changed, 24 insertions, 10 deletions
diff --git a/tx.go b/tx.go
index 2d16549..0f8a449 100644
--- a/tx.go
+++ b/tx.go
@@ -9,11 +9,12 @@ import (
// A Tx represents an atomic transaction.
type Tx struct {
- reads map[*Var]uint64
- writes map[*Var]interface{}
- locks txLocks
- mu sync.Mutex
- cond sync.Cond
+ reads map[*Var]uint64
+ writes map[*Var]interface{}
+ watching map[*Var]struct{}
+ locks txLocks
+ mu sync.Mutex
+ cond sync.Cond
}
// Check that none of the logged values have changed since the transaction began.
@@ -39,8 +40,17 @@ func (tx *Tx) wait() {
if len(tx.reads) == 0 {
panic("not waiting on anything")
}
+ for v := range tx.watching {
+ if _, ok := tx.reads[v]; !ok {
+ delete(tx.watching, v)
+ v.watchers.Delete(tx)
+ }
+ }
for v := range tx.reads {
- v.watchers.Store(tx, nil)
+ if _, ok := tx.watching[v]; !ok {
+ v.watchers.Store(tx, nil)
+ tx.watching[v] = struct{}{}
+ }
}
tx.mu.Lock()
firstWait := true
@@ -53,9 +63,9 @@ func (tx *Tx) wait() {
firstWait = false
}
tx.mu.Unlock()
- for v := range tx.reads {
- v.watchers.Delete(tx)
- }
+ //for v := range tx.reads {
+ // v.watchers.Delete(tx)
+ //}
}
// Get returns the value of v as of the start of the transaction.
@@ -104,6 +114,10 @@ func (tx *Tx) reset() {
}
func (tx *Tx) recycle() {
+ for v := range tx.watching {
+ delete(tx.watching, v)
+ v.watchers.Delete(tx)
+ }
txPool.Put(tx)
}
@@ -134,7 +148,7 @@ func (tx *Tx) collectAllLocks() {
}
func (tx *Tx) sortLocks() {
- sort.Sort(tx.locks)
+ sort.Sort(&tx.locks)
}
func (tx *Tx) lock() {