diff options
author | EuAndreh <eu@euandre.org> | 2025-01-22 12:31:30 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2025-01-22 12:31:30 -0300 |
commit | 59d879ef4e654ce53c2450e000ffa435f06c2f0e (patch) | |
tree | 05ae996bf799b1e51f891a5586b3b72fa9bdfe3f /funcs.go | |
parent | Setup Makefile build skeleton (diff) | |
download | stm-59d879ef4e654ce53c2450e000ffa435f06c2f0e.tar.gz stm-59d879ef4e654ce53c2450e000ffa435f06c2f0e.tar.xz |
Unify code into default repo format
Diffstat (limited to 'funcs.go')
-rw-r--r-- | funcs.go | 169 |
1 files changed, 0 insertions, 169 deletions
diff --git a/funcs.go b/funcs.go deleted file mode 100644 index 07d35ec..0000000 --- a/funcs.go +++ /dev/null @@ -1,169 +0,0 @@ -package stm - -import ( - "math/rand" - "runtime/pprof" - "sync" - "time" -) - -var ( - txPool = sync.Pool{New: func() any { - expvars.Add("new txs", 1) - tx := &Tx{ - reads: make(map[txVar]VarValue), - writes: make(map[txVar]any), - watching: make(map[txVar]struct{}), - } - tx.cond.L = &tx.mu - return tx - }} - failedCommitsProfile *pprof.Profile -) - -const ( - profileFailedCommits = false - sleepBetweenRetries = false -) - -func init() { - if profileFailedCommits { - failedCommitsProfile = pprof.NewProfile("stmFailedCommits") - } -} - -func newTx() *Tx { - tx := txPool.Get().(*Tx) - tx.tries = 0 - tx.completed = false - return tx -} - -func WouldBlock[R any](fn Operation[R]) (block bool) { - tx := newTx() - tx.reset() - _, block = catchRetry(fn, tx) - if len(tx.watching) != 0 { - panic("shouldn't have installed any watchers") - } - tx.recycle() - return -} - -// Atomically executes the atomic function fn. -func Atomically[R any](op Operation[R]) R { - expvars.Add("atomically", 1) - // run the transaction - tx := newTx() -retry: - tx.tries++ - tx.reset() - if sleepBetweenRetries { - shift := int64(tx.tries - 1) - const maxShift = 30 - if shift > maxShift { - shift = maxShift - } - ns := int64(1) << shift - d := time.Duration(rand.Int63n(ns)) - if d > 100*time.Microsecond { - tx.updateWatchers() - time.Sleep(time.Duration(ns)) - } - } - tx.mu.Lock() - ret, retry := catchRetry(op, tx) - tx.mu.Unlock() - if retry { - expvars.Add("retries", 1) - // wait for one of the variables we read to change before retrying - tx.wait() - goto retry - } - // verify the read log - tx.lockAllVars() - if tx.inputsChanged() { - tx.unlock() - expvars.Add("failed commits", 1) - if profileFailedCommits { - failedCommitsProfile.Add(new(int), 0) - } - goto retry - } - // commit the write log and broadcast that variables have changed - tx.commit() - tx.mu.Lock() - tx.completed = true - tx.cond.Broadcast() - tx.mu.Unlock() - tx.unlock() - expvars.Add("commits", 1) - tx.recycle() - return ret -} - -// AtomicGet is a helper function that atomically reads a value. -func AtomicGet[T any](v *Var[T]) T { - return v.value.Load().Get().(T) -} - -// AtomicSet is a helper function that atomically writes a value. -func AtomicSet[T any](v *Var[T], val T) { - v.mu.Lock() - v.changeValue(val) - v.mu.Unlock() -} - -// Compose is a helper function that composes multiple transactions into a -// single transaction. -func Compose[R any](fns ...Operation[R]) Operation[struct{}] { - return VoidOperation(func(tx *Tx) { - for _, f := range fns { - f(tx) - } - }) -} - -// Select runs the supplied functions in order. Execution stops when a -// function succeeds without calling Retry. If no functions succeed, the -// entire selection will be retried. -func Select[R any](fns ...Operation[R]) Operation[R] { - return func(tx *Tx) R { - switch len(fns) { - case 0: - // empty Select blocks forever - tx.Retry() - panic("unreachable") - case 1: - return fns[0](tx) - default: - oldWrites := tx.writes - tx.writes = make(map[txVar]any, len(oldWrites)) - for k, v := range oldWrites { - tx.writes[k] = v - } - ret, retry := catchRetry(fns[0], tx) - if retry { - tx.writes = oldWrites - return Select(fns[1:]...)(tx) - } else { - return ret - } - } - } -} - -type Operation[R any] func(*Tx) R - -func VoidOperation(f func(*Tx)) Operation[struct{}] { - return func(tx *Tx) struct{} { - f(tx) - return struct{}{} - } -} - -func AtomicModify[T any](v *Var[T], f func(T) T) { - Atomically(VoidOperation(func(tx *Tx) { - v.Set(tx, f(v.Get(tx))) - })) -} |