package stm import ( "sync" ) var ( txPool = sync.Pool{New: func() interface{} { expvars.Add("new txs", 1) tx := &Tx{ reads: make(map[*Var]uint64), writes: make(map[*Var]interface{}), } tx.cond.L = &tx.mu return tx }} ) // Atomically executes the atomic function fn. func Atomically(fn func(*Tx)) interface{} { expvars.Add("atomically", 1) // run the transaction tx := txPool.Get().(*Tx) retry: tx.reset() var ret interface{} if func() (retry bool) { defer func() { r := recover() if r == nil { return } if _ret, ok := r.(_return); ok { expvars.Add("explicit returns", 1) ret = _ret.value } else if r == Retry { expvars.Add("retries", 1) // wait for one of the variables we read to change before retrying tx.wait() retry = true } else { panic(r) } }() fn(tx) return false }() { goto retry } // verify the read log tx.lockAllVars() if !tx.verify() { tx.unlock() expvars.Add("failed commits", 1) goto retry } // commit the write log and broadcast that variables have changed tx.commit() tx.unlock() expvars.Add("commits", 1) tx.recycle() return ret } // AtomicGet is a helper function that atomically reads a value. func AtomicGet(v *Var) interface{} { return v.loadState().val } // AtomicSet is a helper function that atomically writes a value. func AtomicSet(v *Var, val interface{}) { v.mu.Lock() v.changeValue(val) v.mu.Unlock() } // Compose is a helper function that composes multiple transactions into a // single transaction. func Compose(fns ...func(*Tx)) func(*Tx) { return func(tx *Tx) { for _, f := range fns { f(tx) } } } // Select runs the supplied functions in order. Execution stops when a // function succeeds without calling Retry. If no functions succeed, the // entire selection will be retried. func Select(fns ...func(*Tx)) func(*Tx) { return func(tx *Tx) { switch len(fns) { case 0: // empty Select blocks forever tx.Retry() case 1: fns[0](tx) default: if catchRetry(fns[0], tx) { Select(fns[1:]...)(tx) } } } }