aboutsummaryrefslogtreecommitdiff
path: root/tests/stm.go
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2025-01-22 12:31:30 -0300
committerEuAndreh <eu@euandre.org>2025-01-22 12:31:30 -0300
commit59d879ef4e654ce53c2450e000ffa435f06c2f0e (patch)
tree05ae996bf799b1e51f891a5586b3b72fa9bdfe3f /tests/stm.go
parentSetup Makefile build skeleton (diff)
downloadstm-59d879ef4e654ce53c2450e000ffa435f06c2f0e.tar.gz
stm-59d879ef4e654ce53c2450e000ffa435f06c2f0e.tar.xz
Unify code into default repo format
Diffstat (limited to 'tests/stm.go')
-rw-r--r--tests/stm.go1177
1 files changed, 1177 insertions, 0 deletions
diff --git a/tests/stm.go b/tests/stm.go
index a30ac1a..816cdec 100644
--- a/tests/stm.go
+++ b/tests/stm.go
@@ -1,8 +1,1185 @@
package stm
import (
+ "context"
+ "fmt"
+ "math"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ g "gobang"
+)
+
+
+
+func TestValue(t *testing.T) {
+ v := New("hello")
+ g.TAssertEqual("hello", v.Load())
+ v.Store("world")
+ g.TAssertEqual("world", v.Load())
+}
+
+func TestValueZeroValue(t *testing.T) {
+ var v Value[string]
+ // assert.Panics(t, func() { v.Load() })
+ v.Store("world")
+ g.TAssertEqual("world", v.Load())
+}
+
+func TestValueSwapZeroValue(t *testing.T) {
+ // var v Value[string]
+ // assert.Panics(t, func() { v.Swap("hello") })
+}
+
+func TestInt32(t *testing.T) {
+ v := NewInt32(0)
+ g.TAssertEqual(0, v.Load())
+ g.TAssertEqual(true, v.CompareAndSwap(0, 10))
+ g.TAssertEqual(false, v.CompareAndSwap(0, 10))
+}
+
+func BenchmarkInt64Add(b *testing.B) {
+ v := NewInt64(0)
+ for i := 0; i < b.N; i++ {
+ v.Add(1)
+ }
+}
+
+func BenchmarkIntInterfaceAdd(b *testing.B) {
+ var v Int[int64] = NewInt64(0)
+ for i := 0; i < b.N; i++ {
+ v.Add(1)
+ }
+}
+
+func BenchmarkStdlibInt64Add(b *testing.B) {
+ var n int64
+ for i := 0; i < b.N; i++ {
+ atomic.AddInt64(&n, 1)
+ }
+}
+
+func BenchmarkInterfaceStore(b *testing.B) {
+ var v Interface[string] = New("hello")
+ for i := 0; i < b.N; i++ {
+ v.Store(fmt.Sprint(i))
+ }
+}
+
+func BenchmarkValueStore(b *testing.B) {
+ v := New("hello")
+ for i := 0; i < b.N; i++ {
+ v.Store(fmt.Sprint(i))
+ }
+}
+
+func BenchmarkStdlibValueStore(b *testing.B) {
+ v := atomic.Value{}
+ for i := 0; i < b.N; i++ {
+ v.Store(fmt.Sprint(i))
+ }
+}
+
+func BenchmarkAtomicGet(b *testing.B) {
+ x := NewVar(0)
+ for i := 0; i < b.N; i++ {
+ AtomicGet(x)
+ }
+}
+
+func BenchmarkAtomicSet(b *testing.B) {
+ x := NewVar(0)
+ for i := 0; i < b.N; i++ {
+ AtomicSet(x, 0)
+ }
+}
+
+func BenchmarkIncrementSTM(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ // spawn 1000 goroutines that each increment x by 1
+ x := NewVar(0)
+ for i := 0; i < 1000; i++ {
+ go Atomically(VoidOperation(func(tx *Tx) {
+ cur := x.Get(tx)
+ x.Set(tx, cur+1)
+ }))
+ }
+ // wait for x to reach 1000
+ Atomically(VoidOperation(func(tx *Tx) {
+ tx.Assert(x.Get(tx) == 1000)
+ }))
+ }
+}
+
+func BenchmarkIncrementMutex(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ var mu sync.Mutex
+ x := 0
+ for i := 0; i < 1000; i++ {
+ go func() {
+ mu.Lock()
+ x++
+ mu.Unlock()
+ }()
+ }
+ for {
+ mu.Lock()
+ read := x
+ mu.Unlock()
+ if read == 1000 {
+ break
+ }
+ }
+ }
+}
+
+func BenchmarkIncrementChannel(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ c := make(chan int, 1)
+ c <- 0
+ for i := 0; i < 1000; i++ {
+ go func() {
+ c <- 1 + <-c
+ }()
+ }
+ for {
+ read := <-c
+ if read == 1000 {
+ break
+ }
+ c <- read
+ }
+ }
+}
+
+func BenchmarkReadVarSTM(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ var wg sync.WaitGroup
+ wg.Add(1000)
+ x := NewVar(0)
+ for i := 0; i < 1000; i++ {
+ go func() {
+ AtomicGet(x)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ }
+}
+
+func BenchmarkReadVarMutex(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ var mu sync.Mutex
+ var wg sync.WaitGroup
+ wg.Add(1000)
+ x := 0
+ for i := 0; i < 1000; i++ {
+ go func() {
+ mu.Lock()
+ _ = x
+ mu.Unlock()
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ }
+}
+
+func BenchmarkReadVarChannel(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ var wg sync.WaitGroup
+ wg.Add(1000)
+ c := make(chan int)
+ close(c)
+ for i := 0; i < 1000; i++ {
+ go func() {
+ <-c
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ }
+}
+
+func parallelPingPongs(b *testing.B, n int) {
+ var wg sync.WaitGroup
+ wg.Add(n)
+ for i := 0; i < n; i++ {
+ go func() {
+ defer wg.Done()
+ testPingPong(b, b.N, func(string) {})
+ }()
+ }
+ wg.Wait()
+}
+
+func BenchmarkPingPong4(b *testing.B) {
+ b.ReportAllocs()
+ parallelPingPongs(b, 4)
+}
+
+func BenchmarkPingPong(b *testing.B) {
+ b.ReportAllocs()
+ parallelPingPongs(b, 1)
+}
+
+func Example() {
+ // create a shared variable
+ n := NewVar(3)
+
+ // read a variable
+ var v int
+ Atomically(VoidOperation(func(tx *Tx) {
+ v = n.Get(tx)
+ }))
+ // or:
+ v = AtomicGet(n)
+ _ = v
+
+ // write to a variable
+ Atomically(VoidOperation(func(tx *Tx) {
+ n.Set(tx, 12)
+ }))
+ // or:
+ AtomicSet(n, 12)
+
+ // update a variable
+ Atomically(VoidOperation(func(tx *Tx) {
+ cur := n.Get(tx)
+ n.Set(tx, cur-1)
+ }))
+
+ // block until a condition is met
+ Atomically(VoidOperation(func(tx *Tx) {
+ cur := n.Get(tx)
+ if cur != 0 {
+ tx.Retry()
+ }
+ n.Set(tx, 10)
+ }))
+ // or:
+ Atomically(VoidOperation(func(tx *Tx) {
+ cur := n.Get(tx)
+ tx.Assert(cur == 0)
+ n.Set(tx, 10)
+ }))
+
+ // select among multiple (potentially blocking) transactions
+ Atomically(Select(
+ // this function blocks forever, so it will be skipped
+ VoidOperation(func(tx *Tx) { tx.Retry() }),
+
+ // this function will always succeed without blocking
+ VoidOperation(func(tx *Tx) { n.Set(tx, 10) }),
+
+ // this function will never run, because the previous
+ // function succeeded
+ VoidOperation(func(tx *Tx) { n.Set(tx, 11) }),
+ ))
+
+ // since Select is a normal transaction, if the entire select retries
+ // (blocks), it will be retried as a whole:
+ x := 0
+ Atomically(Select(
+ // this function will run twice, and succeed the second time
+ VoidOperation(func(tx *Tx) { tx.Assert(x == 1) }),
+
+ // this function will run once
+ VoidOperation(func(tx *Tx) {
+ x = 1
+ tx.Retry()
+ }),
+ ))
+ // But wait! Transactions are only retried when one of the Vars they read is
+ // updated. Since x isn't a stm Var, this code will actually block forever --
+ // but you get the idea.
+}
+
+const maxTokens = 25
+
+func BenchmarkThunderingHerdCondVar(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ var mu sync.Mutex
+ consumer := sync.NewCond(&mu)
+ generator := sync.NewCond(&mu)
+ done := false
+ tokens := 0
+ var pending sync.WaitGroup
+ for i := 0; i < 1000; i++ {
+ pending.Add(1)
+ go func() {
+ mu.Lock()
+ for {
+ if tokens > 0 {
+ tokens--
+ generator.Signal()
+ break
+ }
+ consumer.Wait()
+ }
+ mu.Unlock()
+ pending.Done()
+ }()
+ }
+ go func() {
+ mu.Lock()
+ for !done {
+ if tokens < maxTokens {
+ tokens++
+ consumer.Signal()
+ } else {
+ generator.Wait()
+ }
+ }
+ mu.Unlock()
+ }()
+ pending.Wait()
+ mu.Lock()
+ done = true
+ generator.Signal()
+ mu.Unlock()
+ }
+
+}
+
+func BenchmarkThunderingHerd(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ done := NewBuiltinEqVar(false)
+ tokens := NewBuiltinEqVar(0)
+ pending := NewBuiltinEqVar(0)
+ for i := 0; i < 1000; i++ {
+ Atomically(VoidOperation(func(tx *Tx) {
+ pending.Set(tx, pending.Get(tx)+1)
+ }))
+ go func() {
+ Atomically(VoidOperation(func(tx *Tx) {
+ t := tokens.Get(tx)
+ if t > 0 {
+ tokens.Set(tx, t-1)
+ pending.Set(tx, pending.Get(tx)-1)
+ } else {
+ tx.Retry()
+ }
+ }))
+ }()
+ }
+ go func() {
+ for Atomically(func(tx *Tx) bool {
+ if done.Get(tx) {
+ return false
+ }
+ tx.Assert(tokens.Get(tx) < maxTokens)
+ tokens.Set(tx, tokens.Get(tx)+1)
+ return true
+ }) {
+ }
+ }()
+ Atomically(VoidOperation(func(tx *Tx) {
+ tx.Assert(pending.Get(tx) == 0)
+ }))
+ AtomicSet(done, true)
+ }
+}
+
+func BenchmarkInvertedThunderingHerd(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ done := NewBuiltinEqVar(false)
+ tokens := NewBuiltinEqVar(0)
+ pending := NewVar(NewSet[*Var[bool]]())
+ for i := 0; i < 1000; i++ {
+ ready := NewVar(false)
+ Atomically(VoidOperation(func(tx *Tx) {
+ pending.Set(tx, pending.Get(tx).Add(ready))
+ }))
+ go func() {
+ Atomically(VoidOperation(func(tx *Tx) {
+ tx.Assert(ready.Get(tx))
+ set := pending.Get(tx)
+ if !set.Contains(ready) {
+ panic("couldn't find ourselves in pending")
+ }
+ pending.Set(tx, set.Delete(ready))
+ }))
+ //b.Log("waiter finished")
+ }()
+ }
+ go func() {
+ for Atomically(func(tx *Tx) bool {
+ if done.Get(tx) {
+ return false
+ }
+ tx.Assert(tokens.Get(tx) < maxTokens)
+ tokens.Set(tx, tokens.Get(tx)+1)
+ return true
+ }) {
+ }
+ }()
+ go func() {
+ for Atomically(func(tx *Tx) bool {
+ tx.Assert(tokens.Get(tx) > 0)
+ tokens.Set(tx, tokens.Get(tx)-1)
+ pending.Get(tx).Range(func(ready *Var[bool]) bool {
+ if !ready.Get(tx) {
+ ready.Set(tx, true)
+ return false
+ }
+ return true
+ })
+ return !done.Get(tx)
+ }) {
+ }
+ }()
+ Atomically(VoidOperation(func(tx *Tx) {
+ tx.Assert(pending.Get(tx).(Lenner).Len() == 0)
+ }))
+ AtomicSet(done, true)
+ }
+}
+
+func TestLimit(t *testing.T) {
+ if Limit(10) == Inf {
+ t.Errorf("Limit(10) == Inf should be false")
+ }
+}
+
+func closeEnough(a, b Limit) bool {
+ return (math.Abs(float64(a)/float64(b)) - 1.0) < 1e-9
+}
+
+func TestEvery(t *testing.T) {
+ cases := []struct {
+ interval time.Duration
+ lim Limit
+ }{
+ {0, Inf},
+ {-1, Inf},
+ {1 * time.Nanosecond, Limit(1e9)},
+ {1 * time.Microsecond, Limit(1e6)},
+ {1 * time.Millisecond, Limit(1e3)},
+ {10 * time.Millisecond, Limit(100)},
+ {100 * time.Millisecond, Limit(10)},
+ {1 * time.Second, Limit(1)},
+ {2 * time.Second, Limit(0.5)},
+ {time.Duration(2.5 * float64(time.Second)), Limit(0.4)},
+ {4 * time.Second, Limit(0.25)},
+ {10 * time.Second, Limit(0.1)},
+ {time.Duration(math.MaxInt64), Limit(1e9 / float64(math.MaxInt64))},
+ }
+ for _, tc := range cases {
+ lim := Every(tc.interval)
+ if !closeEnough(lim, tc.lim) {
+ t.Errorf("Every(%v) = %v want %v", tc.interval, lim, tc.lim)
+ }
+ }
+}
+
+const (
+ d = 100 * time.Millisecond
+)
+
+var (
+ t0 = time.Now()
+ t1 = t0.Add(time.Duration(1) * d)
+ t2 = t0.Add(time.Duration(2) * d)
+ t3 = t0.Add(time.Duration(3) * d)
+ t4 = t0.Add(time.Duration(4) * d)
+ t5 = t0.Add(time.Duration(5) * d)
+ t9 = t0.Add(time.Duration(9) * d)
)
+type allow struct {
+ t time.Time
+ n int
+ ok bool
+}
+
+//
+//func run(t *testing.T, lim *Limiter, allows []allow) {
+// for i, allow := range allows {
+// ok := lim.AllowN(allow.t, allow.n)
+// if ok != allow.ok {
+// t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v",
+// i, allow.t, allow.n, ok, allow.ok)
+// }
+// }
+//}
+//
+//func TestLimiterBurst1(t *testing.T) {
+// run(t, NewLimiter(10, 1), []allow{
+// {t0, 1, true},
+// {t0, 1, false},
+// {t0, 1, false},
+// {t1, 1, true},
+// {t1, 1, false},
+// {t1, 1, false},
+// {t2, 2, false}, // burst size is 1, so n=2 always fails
+// {t2, 1, true},
+// {t2, 1, false},
+// })
+//}
+//
+//func TestLimiterBurst3(t *testing.T) {
+// run(t, NewLimiter(10, 3), []allow{
+// {t0, 2, true},
+// {t0, 2, false},
+// {t0, 1, true},
+// {t0, 1, false},
+// {t1, 4, false},
+// {t2, 1, true},
+// {t3, 1, true},
+// {t4, 1, true},
+// {t4, 1, true},
+// {t4, 1, false},
+// {t4, 1, false},
+// {t9, 3, true},
+// {t9, 0, true},
+// })
+//}
+//
+//func TestLimiterJumpBackwards(t *testing.T) {
+// run(t, NewLimiter(10, 3), []allow{
+// {t1, 1, true}, // start at t1
+// {t0, 1, true}, // jump back to t0, two tokens remain
+// {t0, 1, true},
+// {t0, 1, false},
+// {t0, 1, false},
+// {t1, 1, true}, // got a token
+// {t1, 1, false},
+// {t1, 1, false},
+// {t2, 1, true}, // got another token
+// {t2, 1, false},
+// {t2, 1, false},
+// })
+//}
+
+// Ensure that tokensFromDuration doesn't produce
+// rounding errors by truncating nanoseconds.
+// See golang.org/issues/34861.
+func TestLimiter_noTruncationErrors(t *testing.T) {
+ if !NewLimiter(0.7692307692307693, 1).Allow() {
+ t.Fatal("expected true")
+ }
+}
+
+func TestSimultaneousRequests(t *testing.T) {
+ const (
+ limit = 1
+ burst = 5
+ numRequests = 15
+ )
+ var (
+ wg sync.WaitGroup
+ numOK = uint32(0)
+ )
+
+ // Very slow replenishing bucket.
+ lim := NewLimiter(limit, burst)
+
+ // Tries to take a token, atomically updates the counter and decreases the wait
+ // group counter.
+ f := func() {
+ defer wg.Done()
+ if ok := lim.Allow(); ok {
+ atomic.AddUint32(&numOK, 1)
+ }
+ }
+
+ wg.Add(numRequests)
+ for i := 0; i < numRequests; i++ {
+ go f()
+ }
+ wg.Wait()
+ if numOK != burst {
+ t.Errorf("numOK = %d, want %d", numOK, burst)
+ }
+}
+
+func TestLongRunningQPS(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping in short mode")
+ }
+ if runtime.GOOS == "openbsd" {
+ t.Skip("low resolution time.Sleep invalidates test (golang.org/issue/14183)")
+ return
+ }
+
+ // The test runs for a few seconds executing many requests and then checks
+ // that overall number of requests is reasonable.
+ const (
+ limit = 100
+ burst = 100
+ )
+ var numOK = int32(0)
+
+ lim := NewLimiter(limit, burst)
+
+ var wg sync.WaitGroup
+ f := func() {
+ if ok := lim.Allow(); ok {
+ atomic.AddInt32(&numOK, 1)
+ }
+ wg.Done()
+ }
+
+ start := time.Now()
+ end := start.Add(5 * time.Second)
+ for time.Now().Before(end) {
+ wg.Add(1)
+ go f()
+
+ // This will still offer ~500 requests per second, but won't consume
+ // outrageous amount of CPU.
+ time.Sleep(2 * time.Millisecond)
+ }
+ wg.Wait()
+ elapsed := time.Since(start)
+ ideal := burst + (limit * float64(elapsed) / float64(time.Second))
+
+ // We should never get more requests than allowed.
+ if want := int32(ideal + 1); numOK > want {
+ t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
+ }
+ // We should get very close to the number of requests allowed.
+ if want := int32(0.999 * ideal); numOK < want {
+ t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
+ }
+}
+
+type request struct {
+ t time.Time
+ n int
+ act time.Time
+ ok bool
+}
+
+// dFromDuration converts a duration to a multiple of the global constant d
+func dFromDuration(dur time.Duration) int {
+ // Adding a millisecond to be swallowed by the integer division
+ // because we don't care about small inaccuracies
+ return int((dur + time.Millisecond) / d)
+}
+
+// dSince returns multiples of d since t0
+func dSince(t time.Time) int {
+ return dFromDuration(t.Sub(t0))
+}
+
+//
+//func runReserve(t *testing.T, lim *Limiter, req request) *Reservation {
+// return runReserveMax(t, lim, req, InfDuration)
+//}
+//
+//func runReserveMax(t *testing.T, lim *Limiter, req request, maxReserve time.Duration) *Reservation {
+// r := lim.reserveN(req.t, req.n, maxReserve)
+// if r.ok && (dSince(r.timeToAct) != dSince(req.act)) || r.ok != req.ok {
+// t.Errorf("lim.reserveN(t%d, %v, %v) = (t%d, %v) want (t%d, %v)",
+// dSince(req.t), req.n, maxReserve, dSince(r.timeToAct), r.ok, dSince(req.act), req.ok)
+// }
+// return &r
+//}
+//
+//func TestSimpleReserve(t *testing.T) {
+// lim := NewLimiter(10, 2)
+//
+// runReserve(t, lim, request{t0, 2, t0, true})
+// runReserve(t, lim, request{t0, 2, t2, true})
+// runReserve(t, lim, request{t3, 2, t4, true})
+//}
+//
+//func TestMix(t *testing.T) {
+// lim := NewLimiter(10, 2)
+//
+// runReserve(t, lim, request{t0, 3, t1, false}) // should return false because n > Burst
+// runReserve(t, lim, request{t0, 2, t0, true})
+// run(t, lim, []allow{{t1, 2, false}}) // not enought tokens - don't allow
+// runReserve(t, lim, request{t1, 2, t2, true})
+// run(t, lim, []allow{{t1, 1, false}}) // negative tokens - don't allow
+// run(t, lim, []allow{{t3, 1, true}})
+//}
+//
+//func TestCancelInvalid(t *testing.T) {
+// lim := NewLimiter(10, 2)
+//
+// runReserve(t, lim, request{t0, 2, t0, true})
+// r := runReserve(t, lim, request{t0, 3, t3, false})
+// r.CancelAt(t0) // should have no effect
+// runReserve(t, lim, request{t0, 2, t2, true}) // did not get extra tokens
+//}
+//
+//func TestCancelLast(t *testing.T) {
+// lim := NewLimiter(10, 2)
+//
+// runReserve(t, lim, request{t0, 2, t0, true})
+// r := runReserve(t, lim, request{t0, 2, t2, true})
+// r.CancelAt(t1) // got 2 tokens back
+// runReserve(t, lim, request{t1, 2, t2, true})
+//}
+//
+//func TestCancelTooLate(t *testing.T) {
+// lim := NewLimiter(10, 2)
+//
+// runReserve(t, lim, request{t0, 2, t0, true})
+// r := runReserve(t, lim, request{t0, 2, t2, true})
+// r.CancelAt(t3) // too late to cancel - should have no effect
+// runReserve(t, lim, request{t3, 2, t4, true})
+//}
+//
+//func TestCancel0Tokens(t *testing.T) {
+// lim := NewLimiter(10, 2)
+//
+// runReserve(t, lim, request{t0, 2, t0, true})
+// r := runReserve(t, lim, request{t0, 1, t1, true})
+// runReserve(t, lim, request{t0, 1, t2, true})
+// r.CancelAt(t0) // got 0 tokens back
+// runReserve(t, lim, request{t0, 1, t3, true})
+//}
+//
+//func TestCancel1Token(t *testing.T) {
+// lim := NewLimiter(10, 2)
+//
+// runReserve(t, lim, request{t0, 2, t0, true})
+// r := runReserve(t, lim, request{t0, 2, t2, true})
+// runReserve(t, lim, request{t0, 1, t3, true})
+// r.CancelAt(t2) // got 1 token back
+// runReserve(t, lim, request{t2, 2, t4, true})
+//}
+//
+//func TestCancelMulti(t *testing.T) {
+// lim := NewLimiter(10, 4)
+//
+// runReserve(t, lim, request{t0, 4, t0, true})
+// rA := runReserve(t, lim, request{t0, 3, t3, true})
+// runReserve(t, lim, request{t0, 1, t4, true})
+// rC := runReserve(t, lim, request{t0, 1, t5, true})
+// rC.CancelAt(t1) // get 1 token back
+// rA.CancelAt(t1) // get 2 tokens back, as if C was never reserved
+// runReserve(t, lim, request{t1, 3, t5, true})
+//}
+//
+//func TestReserveJumpBack(t *testing.T) {
+// lim := NewLimiter(10, 2)
+//
+// runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
+// runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst
+// runReserve(t, lim, request{t2, 2, t3, true})
+//}
+
+//func TestReserveJumpBackCancel(t *testing.T) {
+// lim := NewLimiter(10, 2)
+//
+// runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
+// r := runReserve(t, lim, request{t1, 2, t3, true})
+// runReserve(t, lim, request{t1, 1, t4, true})
+// r.CancelAt(t0) // cancel at t0, get 1 token back
+// runReserve(t, lim, request{t1, 2, t4, true}) // should violate Limit,Burst
+//}
+//
+//func TestReserveSetLimit(t *testing.T) {
+// lim := NewLimiter(5, 2)
+//
+// runReserve(t, lim, request{t0, 2, t0, true})
+// runReserve(t, lim, request{t0, 2, t4, true})
+// lim.SetLimitAt(t2, 10)
+// runReserve(t, lim, request{t2, 1, t4, true}) // violates Limit and Burst
+//}
+//
+//func TestReserveSetBurst(t *testing.T) {
+// lim := NewLimiter(5, 2)
+//
+// runReserve(t, lim, request{t0, 2, t0, true})
+// runReserve(t, lim, request{t0, 2, t4, true})
+// lim.SetBurstAt(t3, 4)
+// runReserve(t, lim, request{t0, 4, t9, true}) // violates Limit and Burst
+//}
+//
+//func TestReserveSetLimitCancel(t *testing.T) {
+// lim := NewLimiter(5, 2)
+//
+// runReserve(t, lim, request{t0, 2, t0, true})
+// r := runReserve(t, lim, request{t0, 2, t4, true})
+// lim.SetLimitAt(t2, 10)
+// r.CancelAt(t2) // 2 tokens back
+// runReserve(t, lim, request{t2, 2, t3, true})
+//}
+//
+//func TestReserveMax(t *testing.T) {
+// lim := NewLimiter(10, 2)
+// maxT := d
+//
+// runReserveMax(t, lim, request{t0, 2, t0, true}, maxT)
+// runReserveMax(t, lim, request{t0, 1, t1, true}, maxT) // reserve for close future
+// runReserveMax(t, lim, request{t0, 1, t2, false}, maxT) // time to act too far in the future
+//}
+
+type wait struct {
+ name string
+ ctx context.Context
+ n int
+ delay int // in multiples of d
+ nilErr bool
+}
+
+func runWait(t *testing.T, lim *Limiter, w wait) {
+ t.Helper()
+ start := time.Now()
+ err := lim.WaitN(w.ctx, w.n)
+ delay := time.Since(start)
+ if (w.nilErr && err != nil) || (!w.nilErr && err == nil) || w.delay != dFromDuration(delay) {
+ errString := "<nil>"
+ if !w.nilErr {
+ errString = "<non-nil error>"
+ }
+ t.Errorf("lim.WaitN(%v, lim, %v) = %v with delay %v ; want %v with delay %v",
+ w.name, w.n, err, delay, errString, d*time.Duration(w.delay))
+ }
+}
+
+func TestWaitSimple(t *testing.T) {
+ lim := NewLimiter(10, 3)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ runWait(t, lim, wait{"already-cancelled", ctx, 1, 0, false})
+
+ runWait(t, lim, wait{"exceed-burst-error", context.Background(), 4, 0, false})
+
+ runWait(t, lim, wait{"act-now", context.Background(), 2, 0, true})
+ runWait(t, lim, wait{"act-later", context.Background(), 3, 2, true})
+}
+
+func TestWaitCancel(t *testing.T) {
+ lim := NewLimiter(10, 3)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) // after this lim.tokens = 1
+ go func() {
+ time.Sleep(d)
+ cancel()
+ }()
+ runWait(t, lim, wait{"will-cancel", ctx, 3, 1, false})
+ // should get 3 tokens back, and have lim.tokens = 2
+ //t.Logf("tokens:%v last:%v lastEvent:%v", lim.tokens, lim.last, lim.lastEvent)
+ runWait(t, lim, wait{"act-now-after-cancel", context.Background(), 2, 0, true})
+}
+
+func TestWaitTimeout(t *testing.T) {
+ lim := NewLimiter(10, 3)
+
+ ctx, cancel := context.WithTimeout(context.Background(), d)
+ defer cancel()
+ runWait(t, lim, wait{"act-now", ctx, 2, 0, true})
+ runWait(t, lim, wait{"w-timeout-err", ctx, 3, 0, false})
+}
+
+func TestWaitInf(t *testing.T) {
+ lim := NewLimiter(Inf, 0)
+
+ runWait(t, lim, wait{"exceed-burst-no-error", context.Background(), 3, 0, true})
+}
+
+func BenchmarkAllowN(b *testing.B) {
+ lim := NewLimiter(Every(1*time.Second), 1)
+ b.ReportAllocs()
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ lim.AllowN(1)
+ }
+ })
+}
+
+func BenchmarkWaitNNoDelay(b *testing.B) {
+ lim := NewLimiter(Limit(b.N), b.N)
+ ctx := context.Background()
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ lim.WaitN(ctx, 1)
+ }
+}
+
+func TestDecrement(t *testing.T) {
+ x := NewVar(1000)
+ for i := 0; i < 500; i++ {
+ go Atomically(VoidOperation(func(tx *Tx) {
+ cur := x.Get(tx)
+ x.Set(tx, cur-1)
+ }))
+ }
+ done := make(chan struct{})
+ go func() {
+ Atomically(VoidOperation(func(tx *Tx) {
+ tx.Assert(x.Get(tx) == 500)
+ }))
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ t.Fatal("decrement did not complete in time")
+ }
+}
+
+// read-only transaction aren't exempt from calling tx.inputsChanged
+func TestReadVerify(t *testing.T) {
+ read := make(chan struct{})
+ x, y := NewVar(1), NewVar(2)
+
+ // spawn a transaction that writes to x
+ go func() {
+ <-read
+ AtomicSet(x, 3)
+ read <- struct{}{}
+ // other tx should retry, so we need to read/send again
+ read <- <-read
+ }()
+
+ // spawn a transaction that reads x, then y. The other tx will modify x in
+ // between the reads, causing this tx to retry.
+ var x2, y2 int
+ Atomically(VoidOperation(func(tx *Tx) {
+ x2 = x.Get(tx)
+ read <- struct{}{}
+ <-read // wait for other tx to complete
+ y2 = y.Get(tx)
+ }))
+ if x2 == 1 && y2 == 2 {
+ t.Fatal("read was not verified")
+ }
+}
+
+func TestRetry(t *testing.T) {
+ x := NewVar(10)
+ // spawn 10 transactions, one every 10 milliseconds. This will decrement x
+ // to 0 over the course of 100 milliseconds.
+ go func() {
+ for i := 0; i < 10; i++ {
+ time.Sleep(10 * time.Millisecond)
+ Atomically(VoidOperation(func(tx *Tx) {
+ cur := x.Get(tx)
+ x.Set(tx, cur-1)
+ }))
+ }
+ }()
+ // Each time we read x before the above loop has finished, we need to
+ // retry. This should result in no more than 1 retry per transaction.
+ retry := 0
+ Atomically(VoidOperation(func(tx *Tx) {
+ cur := x.Get(tx)
+ if cur != 0 {
+ retry++
+ tx.Retry()
+ }
+ }))
+ if retry > 10 {
+ t.Fatal("should have retried at most 10 times, got", retry)
+ }
+}
+
+func TestVerify(t *testing.T) {
+ // tx.inputsChanged should check more than pointer equality
+ type foo struct {
+ i int
+ }
+ x := NewVar(&foo{3})
+ read := make(chan struct{})
+
+ // spawn a transaction that modifies x
+ go func() {
+ Atomically(VoidOperation(func(tx *Tx) {
+ <-read
+ rx := x.Get(tx)
+ rx.i = 7
+ x.Set(tx, rx)
+ }))
+ read <- struct{}{}
+ // other tx should retry, so we need to read/send again
+ read <- <-read
+ }()
+
+ // spawn a transaction that reads x, then y. The other tx will modify x in
+ // between the reads, causing this tx to retry.
+ var i int
+ Atomically(VoidOperation(func(tx *Tx) {
+ f := x.Get(tx)
+ i = f.i
+ read <- struct{}{}
+ <-read // wait for other tx to complete
+ }))
+ if i == 3 {
+ t.Fatal("inputsChanged did not retry despite modified Var", i)
+ }
+}
+
+func TestSelect(t *testing.T) {
+ // empty Select should panic
+ // require.Panics(t, func() { Atomically(Select[struct{}]()) })
+
+ // with one arg, Select adds no effect
+ x := NewVar(2)
+ Atomically(Select(VoidOperation(func(tx *Tx) {
+ tx.Assert(x.Get(tx) == 2)
+ })))
+
+ picked := Atomically(Select(
+ // always blocks; should never be selected
+ func(tx *Tx) int {
+ tx.Retry()
+ panic("unreachable")
+ },
+ // always succeeds; should always be selected
+ func(tx *Tx) int {
+ return 2
+ },
+ // always succeeds; should never be selected
+ func(tx *Tx) int {
+ return 3
+ },
+ ))
+ g.TAssertEqual(2, picked)
+}
+
+func TestCompose(t *testing.T) {
+ nums := make([]int, 100)
+ fns := make([]Operation[struct{}], 100)
+ for i := range fns {
+ fns[i] = func(x int) Operation[struct{}] {
+ return VoidOperation(func(*Tx) { nums[x] = x })
+ }(i) // capture loop var
+ }
+ Atomically(Compose(fns...))
+ for i := range nums {
+ if nums[i] != i {
+ t.Error("Compose failed:", nums[i], i)
+ }
+ }
+}
+
+func TestPanic(t *testing.T) {
+ // normal panics should escape Atomically
+ /*
+ assert.PanicsWithValue(t, "foo", func() {
+ Atomically(func(*Tx) any {
+ panic("foo")
+ })
+ })
+ */
+}
+
+func TestReadWritten(t *testing.T) {
+ // reading a variable written in the same transaction should return the
+ // previously written value
+ x := NewVar(3)
+ Atomically(VoidOperation(func(tx *Tx) {
+ x.Set(tx, 5)
+ tx.Assert(x.Get(tx) == 5)
+ }))
+}
+
+func TestAtomicSetRetry(t *testing.T) {
+ // AtomicSet should cause waiting transactions to retry
+ x := NewVar(3)
+ done := make(chan struct{})
+ go func() {
+ Atomically(VoidOperation(func(tx *Tx) {
+ tx.Assert(x.Get(tx) == 5)
+ }))
+ done <- struct{}{}
+ }()
+ time.Sleep(10 * time.Millisecond)
+ AtomicSet(x, 5)
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ t.Fatal("AtomicSet did not wake up a waiting transaction")
+ }
+}
+
+func testPingPong(t testing.TB, n int, afterHit func(string)) {
+ ball := NewBuiltinEqVar(false)
+ doneVar := NewVar(false)
+ hits := NewVar(0)
+ ready := NewVar(true) // The ball is ready for hitting.
+ var wg sync.WaitGroup
+ bat := func(from, to bool, noise string) {
+ defer wg.Done()
+ for !Atomically(func(tx *Tx) any {
+ if doneVar.Get(tx) {
+ return true
+ }
+ tx.Assert(ready.Get(tx))
+ if ball.Get(tx) == from {
+ ball.Set(tx, to)
+ hits.Set(tx, hits.Get(tx)+1)
+ ready.Set(tx, false)
+ return false
+ }
+ return tx.Retry()
+ }).(bool) {
+ afterHit(noise)
+ AtomicSet(ready, true)
+ }
+ }
+ wg.Add(2)
+ go bat(false, true, "ping!")
+ go bat(true, false, "pong!")
+ Atomically(VoidOperation(func(tx *Tx) {
+ tx.Assert(hits.Get(tx) >= n)
+ doneVar.Set(tx, true)
+ }))
+ wg.Wait()
+}
+
+func TestPingPong(t *testing.T) {
+ testPingPong(t, 42, func(s string) { t.Log(s) })
+}
+
+func TestSleepingBeauty(t *testing.T) {
+ /*
+ require.Panics(t, func() {
+ Atomically(func(tx *Tx) any {
+ tx.Assert(false)
+ return nil
+ })
+ })
+ */
+}
+
+//func TestRetryStack(t *testing.T) {
+// v := NewVar[int](nil)
+// go func() {
+// i := 0
+// for {
+// AtomicSet(v, i)
+// i++
+// }
+// }()
+// Atomically(func(tx *Tx) any {
+// debug.PrintStack()
+// ret := func() {
+// defer Atomically(nil)
+// }
+// v.Get(tx)
+// tx.Assert(false)
+// return ret
+// })
+//}
+
+func TestContextEquality(t *testing.T) {
+ ctx := context.Background()
+ g.TAssertEqual(ctx, context.Background())
+ childCtx, cancel := context.WithCancel(ctx)
+ g.TAssertEqual(childCtx != ctx, true)
+ g.TAssertEqual(childCtx != ctx, true)
+ g.TAssertEqual(context.Background(), ctx)
+ cancel()
+ g.TAssertEqual(context.Background(), ctx)
+ g.TAssertEqual(ctx != childCtx, true)
+}
+
func MainTest() {