From a3e788d52494b75b927538c9c2bfee1573aa50e1 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 30 Sep 2020 19:13:27 +1000 Subject: Wake watchers until the var changes again --- tx.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) (limited to 'tx.go') diff --git a/tx.go b/tx.go index f3bc617..f613efb 100644 --- a/tx.go +++ b/tx.go @@ -9,23 +9,25 @@ import ( // A Tx represents an atomic transaction. type Tx struct { - reads map[*Var]VarValue - writes map[*Var]interface{} - watching map[*Var]struct{} - locks txLocks - mu sync.Mutex - cond sync.Cond - tries int + reads map[*Var]VarValue + writes map[*Var]interface{} + watching map[*Var]struct{} + locks txLocks + mu sync.Mutex + cond sync.Cond + waiting bool + completed bool + tries int } // Check that none of the logged values have changed since the transaction began. -func (tx *Tx) verify() bool { +func (tx *Tx) inputsChanged() bool { for v, read := range tx.reads { if read.Changed(v.value.Load().(VarValue)) { - return false + return true } } - return true + return false } // Writes the values in the transaction log to their respective Vars. @@ -58,12 +60,15 @@ func (tx *Tx) wait() { tx.updateWatchers() tx.mu.Lock() firstWait := true - for tx.verify() { + for !tx.inputsChanged() { if !firstWait { expvars.Add("wakes for unchanged versions", 1) } expvars.Add("waits", 1) + tx.waiting = true + tx.cond.Broadcast() tx.cond.Wait() + tx.waiting = false firstWait = false } tx.mu.Unlock() -- cgit v1.2.3