diff options
-rw-r--r-- | stm.go | 50 |
1 files changed, 27 insertions, 23 deletions
@@ -69,11 +69,6 @@ behavior. One common way to get around this is to build up a list of impure operations inside the transaction, and then perform them after the transaction completes. -Another caveat: stm uses simple pointer equality to determine whether Vars -have changed during a transaction. For types containing mutable pointers, this -may be insufficient; immutable values should be used instead. Support for -proper versioning is planned. - The stm API tries to mimic that of Haskell's Control.Concurrent.STM, but this is not entirely possible due to Go's type system; we are forced to use interface{} and type assertions. Furthermore, Haskell can enforce at compile @@ -84,7 +79,6 @@ package stm import ( "sync" - "sync/atomic" ) // Retry is a sentinel value. When thrown via panic, it indicates that a @@ -98,19 +92,19 @@ var globalCond = sync.NewCond(&globalLock) // A Var holds an STM variable. type Var struct { - val atomic.Value + val interface{} + version uint64 + mu sync.Mutex } // NewVar returns a new STM variable. func NewVar(val interface{}) *Var { - v := new(Var) - v.val.Store(val) - return v + return &Var{val: val} } // A Tx represents an atomic transaction. type Tx struct { - reads map[*Var]interface{} + reads map[*Var]uint64 writes map[*Var]interface{} } @@ -118,8 +112,11 @@ type Tx struct { // transaction began. // TODO: is pointer equality good enough? probably not, without immutable data func (tx *Tx) verify() bool { - for v, val := range tx.reads { - if v.val.Load() != val { + for v, version := range tx.reads { + v.mu.Lock() + changed := v.version != version + v.mu.Unlock() + if changed { return false } } @@ -129,7 +126,10 @@ func (tx *Tx) verify() bool { // commit writes the values in the transaction log to their respective Vars. func (tx *Tx) commit() { for v, val := range tx.writes { - v.val.Store(val) + v.mu.Lock() + v.val = val + v.version++ + v.mu.Unlock() } } @@ -148,13 +148,13 @@ func (tx *Tx) Get(v *Var) interface{} { if val, ok := tx.writes[v]; ok { return val } - // If we previously read v, it will be in the read log. - if val, ok := tx.reads[v]; ok { - return val + v.mu.Lock() + defer v.mu.Unlock() + // If we haven't previously read v, record its version + if _, ok := tx.reads[v]; !ok { + tx.reads[v] = v.version } - // Otherwise, record and return its current value. - tx.reads[v] = v.val.Load() - return tx.reads[v] + return v.val } // Set sets the value of a Var for the lifetime of the transaction. @@ -193,7 +193,7 @@ func Atomically(fn func(*Tx)) { retry: // run the transaction tx := &Tx{ - reads: make(map[*Var]interface{}), + reads: make(map[*Var]uint64), writes: make(map[*Var]interface{}), } if catchRetry(fn, tx) { @@ -219,7 +219,9 @@ retry: func AtomicGet(v *Var) interface{} { // since we're only doing one operation, we don't need a full transaction globalLock.Lock() - val := v.val.Load() + v.mu.Lock() + val := v.val + v.mu.Unlock() globalLock.Unlock() return val } @@ -228,7 +230,9 @@ func AtomicGet(v *Var) interface{} { func AtomicSet(v *Var, val interface{}) { // since we're only doing one operation, we don't need a full transaction globalLock.Lock() - v.val.Store(val) + v.mu.Lock() + v.val = val + v.mu.Unlock() globalCond.Broadcast() globalLock.Unlock() } |