diff options
author | EuAndreh <eu@euandre.org> | 2025-01-22 12:31:30 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2025-01-22 12:31:30 -0300 |
commit | 59d879ef4e654ce53c2450e000ffa435f06c2f0e (patch) | |
tree | 05ae996bf799b1e51f891a5586b3b72fa9bdfe3f /tests/stm.go | |
parent | Setup Makefile build skeleton (diff) | |
download | stm-59d879ef4e654ce53c2450e000ffa435f06c2f0e.tar.gz stm-59d879ef4e654ce53c2450e000ffa435f06c2f0e.tar.xz |
Unify code into default repo format
Diffstat (limited to 'tests/stm.go')
-rw-r--r-- | tests/stm.go | 1177 |
1 files changed, 1177 insertions, 0 deletions
diff --git a/tests/stm.go b/tests/stm.go index a30ac1a..816cdec 100644 --- a/tests/stm.go +++ b/tests/stm.go @@ -1,8 +1,1185 @@ package stm import ( + "context" + "fmt" + "math" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + g "gobang" +) + + + +func TestValue(t *testing.T) { + v := New("hello") + g.TAssertEqual("hello", v.Load()) + v.Store("world") + g.TAssertEqual("world", v.Load()) +} + +func TestValueZeroValue(t *testing.T) { + var v Value[string] + // assert.Panics(t, func() { v.Load() }) + v.Store("world") + g.TAssertEqual("world", v.Load()) +} + +func TestValueSwapZeroValue(t *testing.T) { + // var v Value[string] + // assert.Panics(t, func() { v.Swap("hello") }) +} + +func TestInt32(t *testing.T) { + v := NewInt32(0) + g.TAssertEqual(0, v.Load()) + g.TAssertEqual(true, v.CompareAndSwap(0, 10)) + g.TAssertEqual(false, v.CompareAndSwap(0, 10)) +} + +func BenchmarkInt64Add(b *testing.B) { + v := NewInt64(0) + for i := 0; i < b.N; i++ { + v.Add(1) + } +} + +func BenchmarkIntInterfaceAdd(b *testing.B) { + var v Int[int64] = NewInt64(0) + for i := 0; i < b.N; i++ { + v.Add(1) + } +} + +func BenchmarkStdlibInt64Add(b *testing.B) { + var n int64 + for i := 0; i < b.N; i++ { + atomic.AddInt64(&n, 1) + } +} + +func BenchmarkInterfaceStore(b *testing.B) { + var v Interface[string] = New("hello") + for i := 0; i < b.N; i++ { + v.Store(fmt.Sprint(i)) + } +} + +func BenchmarkValueStore(b *testing.B) { + v := New("hello") + for i := 0; i < b.N; i++ { + v.Store(fmt.Sprint(i)) + } +} + +func BenchmarkStdlibValueStore(b *testing.B) { + v := atomic.Value{} + for i := 0; i < b.N; i++ { + v.Store(fmt.Sprint(i)) + } +} + +func BenchmarkAtomicGet(b *testing.B) { + x := NewVar(0) + for i := 0; i < b.N; i++ { + AtomicGet(x) + } +} + +func BenchmarkAtomicSet(b *testing.B) { + x := NewVar(0) + for i := 0; i < b.N; i++ { + AtomicSet(x, 0) + } +} + +func BenchmarkIncrementSTM(b *testing.B) { + for i := 0; i < b.N; i++ { + // spawn 1000 goroutines that each increment x by 1 + x := NewVar(0) + for i := 0; i < 1000; i++ { + go Atomically(VoidOperation(func(tx *Tx) { + cur := x.Get(tx) + x.Set(tx, cur+1) + })) + } + // wait for x to reach 1000 + Atomically(VoidOperation(func(tx *Tx) { + tx.Assert(x.Get(tx) == 1000) + })) + } +} + +func BenchmarkIncrementMutex(b *testing.B) { + for i := 0; i < b.N; i++ { + var mu sync.Mutex + x := 0 + for i := 0; i < 1000; i++ { + go func() { + mu.Lock() + x++ + mu.Unlock() + }() + } + for { + mu.Lock() + read := x + mu.Unlock() + if read == 1000 { + break + } + } + } +} + +func BenchmarkIncrementChannel(b *testing.B) { + for i := 0; i < b.N; i++ { + c := make(chan int, 1) + c <- 0 + for i := 0; i < 1000; i++ { + go func() { + c <- 1 + <-c + }() + } + for { + read := <-c + if read == 1000 { + break + } + c <- read + } + } +} + +func BenchmarkReadVarSTM(b *testing.B) { + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + wg.Add(1000) + x := NewVar(0) + for i := 0; i < 1000; i++ { + go func() { + AtomicGet(x) + wg.Done() + }() + } + wg.Wait() + } +} + +func BenchmarkReadVarMutex(b *testing.B) { + for i := 0; i < b.N; i++ { + var mu sync.Mutex + var wg sync.WaitGroup + wg.Add(1000) + x := 0 + for i := 0; i < 1000; i++ { + go func() { + mu.Lock() + _ = x + mu.Unlock() + wg.Done() + }() + } + wg.Wait() + } +} + +func BenchmarkReadVarChannel(b *testing.B) { + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + wg.Add(1000) + c := make(chan int) + close(c) + for i := 0; i < 1000; i++ { + go func() { + <-c + wg.Done() + }() + } + wg.Wait() + } +} + +func parallelPingPongs(b *testing.B, n int) { + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + testPingPong(b, b.N, func(string) {}) + }() + } + wg.Wait() +} + +func BenchmarkPingPong4(b *testing.B) { + b.ReportAllocs() + parallelPingPongs(b, 4) +} + +func BenchmarkPingPong(b *testing.B) { + b.ReportAllocs() + parallelPingPongs(b, 1) +} + +func Example() { + // create a shared variable + n := NewVar(3) + + // read a variable + var v int + Atomically(VoidOperation(func(tx *Tx) { + v = n.Get(tx) + })) + // or: + v = AtomicGet(n) + _ = v + + // write to a variable + Atomically(VoidOperation(func(tx *Tx) { + n.Set(tx, 12) + })) + // or: + AtomicSet(n, 12) + + // update a variable + Atomically(VoidOperation(func(tx *Tx) { + cur := n.Get(tx) + n.Set(tx, cur-1) + })) + + // block until a condition is met + Atomically(VoidOperation(func(tx *Tx) { + cur := n.Get(tx) + if cur != 0 { + tx.Retry() + } + n.Set(tx, 10) + })) + // or: + Atomically(VoidOperation(func(tx *Tx) { + cur := n.Get(tx) + tx.Assert(cur == 0) + n.Set(tx, 10) + })) + + // select among multiple (potentially blocking) transactions + Atomically(Select( + // this function blocks forever, so it will be skipped + VoidOperation(func(tx *Tx) { tx.Retry() }), + + // this function will always succeed without blocking + VoidOperation(func(tx *Tx) { n.Set(tx, 10) }), + + // this function will never run, because the previous + // function succeeded + VoidOperation(func(tx *Tx) { n.Set(tx, 11) }), + )) + + // since Select is a normal transaction, if the entire select retries + // (blocks), it will be retried as a whole: + x := 0 + Atomically(Select( + // this function will run twice, and succeed the second time + VoidOperation(func(tx *Tx) { tx.Assert(x == 1) }), + + // this function will run once + VoidOperation(func(tx *Tx) { + x = 1 + tx.Retry() + }), + )) + // But wait! Transactions are only retried when one of the Vars they read is + // updated. Since x isn't a stm Var, this code will actually block forever -- + // but you get the idea. +} + +const maxTokens = 25 + +func BenchmarkThunderingHerdCondVar(b *testing.B) { + for i := 0; i < b.N; i++ { + var mu sync.Mutex + consumer := sync.NewCond(&mu) + generator := sync.NewCond(&mu) + done := false + tokens := 0 + var pending sync.WaitGroup + for i := 0; i < 1000; i++ { + pending.Add(1) + go func() { + mu.Lock() + for { + if tokens > 0 { + tokens-- + generator.Signal() + break + } + consumer.Wait() + } + mu.Unlock() + pending.Done() + }() + } + go func() { + mu.Lock() + for !done { + if tokens < maxTokens { + tokens++ + consumer.Signal() + } else { + generator.Wait() + } + } + mu.Unlock() + }() + pending.Wait() + mu.Lock() + done = true + generator.Signal() + mu.Unlock() + } + +} + +func BenchmarkThunderingHerd(b *testing.B) { + for i := 0; i < b.N; i++ { + done := NewBuiltinEqVar(false) + tokens := NewBuiltinEqVar(0) + pending := NewBuiltinEqVar(0) + for i := 0; i < 1000; i++ { + Atomically(VoidOperation(func(tx *Tx) { + pending.Set(tx, pending.Get(tx)+1) + })) + go func() { + Atomically(VoidOperation(func(tx *Tx) { + t := tokens.Get(tx) + if t > 0 { + tokens.Set(tx, t-1) + pending.Set(tx, pending.Get(tx)-1) + } else { + tx.Retry() + } + })) + }() + } + go func() { + for Atomically(func(tx *Tx) bool { + if done.Get(tx) { + return false + } + tx.Assert(tokens.Get(tx) < maxTokens) + tokens.Set(tx, tokens.Get(tx)+1) + return true + }) { + } + }() + Atomically(VoidOperation(func(tx *Tx) { + tx.Assert(pending.Get(tx) == 0) + })) + AtomicSet(done, true) + } +} + +func BenchmarkInvertedThunderingHerd(b *testing.B) { + for i := 0; i < b.N; i++ { + done := NewBuiltinEqVar(false) + tokens := NewBuiltinEqVar(0) + pending := NewVar(NewSet[*Var[bool]]()) + for i := 0; i < 1000; i++ { + ready := NewVar(false) + Atomically(VoidOperation(func(tx *Tx) { + pending.Set(tx, pending.Get(tx).Add(ready)) + })) + go func() { + Atomically(VoidOperation(func(tx *Tx) { + tx.Assert(ready.Get(tx)) + set := pending.Get(tx) + if !set.Contains(ready) { + panic("couldn't find ourselves in pending") + } + pending.Set(tx, set.Delete(ready)) + })) + //b.Log("waiter finished") + }() + } + go func() { + for Atomically(func(tx *Tx) bool { + if done.Get(tx) { + return false + } + tx.Assert(tokens.Get(tx) < maxTokens) + tokens.Set(tx, tokens.Get(tx)+1) + return true + }) { + } + }() + go func() { + for Atomically(func(tx *Tx) bool { + tx.Assert(tokens.Get(tx) > 0) + tokens.Set(tx, tokens.Get(tx)-1) + pending.Get(tx).Range(func(ready *Var[bool]) bool { + if !ready.Get(tx) { + ready.Set(tx, true) + return false + } + return true + }) + return !done.Get(tx) + }) { + } + }() + Atomically(VoidOperation(func(tx *Tx) { + tx.Assert(pending.Get(tx).(Lenner).Len() == 0) + })) + AtomicSet(done, true) + } +} + +func TestLimit(t *testing.T) { + if Limit(10) == Inf { + t.Errorf("Limit(10) == Inf should be false") + } +} + +func closeEnough(a, b Limit) bool { + return (math.Abs(float64(a)/float64(b)) - 1.0) < 1e-9 +} + +func TestEvery(t *testing.T) { + cases := []struct { + interval time.Duration + lim Limit + }{ + {0, Inf}, + {-1, Inf}, + {1 * time.Nanosecond, Limit(1e9)}, + {1 * time.Microsecond, Limit(1e6)}, + {1 * time.Millisecond, Limit(1e3)}, + {10 * time.Millisecond, Limit(100)}, + {100 * time.Millisecond, Limit(10)}, + {1 * time.Second, Limit(1)}, + {2 * time.Second, Limit(0.5)}, + {time.Duration(2.5 * float64(time.Second)), Limit(0.4)}, + {4 * time.Second, Limit(0.25)}, + {10 * time.Second, Limit(0.1)}, + {time.Duration(math.MaxInt64), Limit(1e9 / float64(math.MaxInt64))}, + } + for _, tc := range cases { + lim := Every(tc.interval) + if !closeEnough(lim, tc.lim) { + t.Errorf("Every(%v) = %v want %v", tc.interval, lim, tc.lim) + } + } +} + +const ( + d = 100 * time.Millisecond +) + +var ( + t0 = time.Now() + t1 = t0.Add(time.Duration(1) * d) + t2 = t0.Add(time.Duration(2) * d) + t3 = t0.Add(time.Duration(3) * d) + t4 = t0.Add(time.Duration(4) * d) + t5 = t0.Add(time.Duration(5) * d) + t9 = t0.Add(time.Duration(9) * d) ) +type allow struct { + t time.Time + n int + ok bool +} + +// +//func run(t *testing.T, lim *Limiter, allows []allow) { +// for i, allow := range allows { +// ok := lim.AllowN(allow.t, allow.n) +// if ok != allow.ok { +// t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v", +// i, allow.t, allow.n, ok, allow.ok) +// } +// } +//} +// +//func TestLimiterBurst1(t *testing.T) { +// run(t, NewLimiter(10, 1), []allow{ +// {t0, 1, true}, +// {t0, 1, false}, +// {t0, 1, false}, +// {t1, 1, true}, +// {t1, 1, false}, +// {t1, 1, false}, +// {t2, 2, false}, // burst size is 1, so n=2 always fails +// {t2, 1, true}, +// {t2, 1, false}, +// }) +//} +// +//func TestLimiterBurst3(t *testing.T) { +// run(t, NewLimiter(10, 3), []allow{ +// {t0, 2, true}, +// {t0, 2, false}, +// {t0, 1, true}, +// {t0, 1, false}, +// {t1, 4, false}, +// {t2, 1, true}, +// {t3, 1, true}, +// {t4, 1, true}, +// {t4, 1, true}, +// {t4, 1, false}, +// {t4, 1, false}, +// {t9, 3, true}, +// {t9, 0, true}, +// }) +//} +// +//func TestLimiterJumpBackwards(t *testing.T) { +// run(t, NewLimiter(10, 3), []allow{ +// {t1, 1, true}, // start at t1 +// {t0, 1, true}, // jump back to t0, two tokens remain +// {t0, 1, true}, +// {t0, 1, false}, +// {t0, 1, false}, +// {t1, 1, true}, // got a token +// {t1, 1, false}, +// {t1, 1, false}, +// {t2, 1, true}, // got another token +// {t2, 1, false}, +// {t2, 1, false}, +// }) +//} + +// Ensure that tokensFromDuration doesn't produce +// rounding errors by truncating nanoseconds. +// See golang.org/issues/34861. +func TestLimiter_noTruncationErrors(t *testing.T) { + if !NewLimiter(0.7692307692307693, 1).Allow() { + t.Fatal("expected true") + } +} + +func TestSimultaneousRequests(t *testing.T) { + const ( + limit = 1 + burst = 5 + numRequests = 15 + ) + var ( + wg sync.WaitGroup + numOK = uint32(0) + ) + + // Very slow replenishing bucket. + lim := NewLimiter(limit, burst) + + // Tries to take a token, atomically updates the counter and decreases the wait + // group counter. + f := func() { + defer wg.Done() + if ok := lim.Allow(); ok { + atomic.AddUint32(&numOK, 1) + } + } + + wg.Add(numRequests) + for i := 0; i < numRequests; i++ { + go f() + } + wg.Wait() + if numOK != burst { + t.Errorf("numOK = %d, want %d", numOK, burst) + } +} + +func TestLongRunningQPS(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + if runtime.GOOS == "openbsd" { + t.Skip("low resolution time.Sleep invalidates test (golang.org/issue/14183)") + return + } + + // The test runs for a few seconds executing many requests and then checks + // that overall number of requests is reasonable. + const ( + limit = 100 + burst = 100 + ) + var numOK = int32(0) + + lim := NewLimiter(limit, burst) + + var wg sync.WaitGroup + f := func() { + if ok := lim.Allow(); ok { + atomic.AddInt32(&numOK, 1) + } + wg.Done() + } + + start := time.Now() + end := start.Add(5 * time.Second) + for time.Now().Before(end) { + wg.Add(1) + go f() + + // This will still offer ~500 requests per second, but won't consume + // outrageous amount of CPU. + time.Sleep(2 * time.Millisecond) + } + wg.Wait() + elapsed := time.Since(start) + ideal := burst + (limit * float64(elapsed) / float64(time.Second)) + + // We should never get more requests than allowed. + if want := int32(ideal + 1); numOK > want { + t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) + } + // We should get very close to the number of requests allowed. + if want := int32(0.999 * ideal); numOK < want { + t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) + } +} + +type request struct { + t time.Time + n int + act time.Time + ok bool +} + +// dFromDuration converts a duration to a multiple of the global constant d +func dFromDuration(dur time.Duration) int { + // Adding a millisecond to be swallowed by the integer division + // because we don't care about small inaccuracies + return int((dur + time.Millisecond) / d) +} + +// dSince returns multiples of d since t0 +func dSince(t time.Time) int { + return dFromDuration(t.Sub(t0)) +} + +// +//func runReserve(t *testing.T, lim *Limiter, req request) *Reservation { +// return runReserveMax(t, lim, req, InfDuration) +//} +// +//func runReserveMax(t *testing.T, lim *Limiter, req request, maxReserve time.Duration) *Reservation { +// r := lim.reserveN(req.t, req.n, maxReserve) +// if r.ok && (dSince(r.timeToAct) != dSince(req.act)) || r.ok != req.ok { +// t.Errorf("lim.reserveN(t%d, %v, %v) = (t%d, %v) want (t%d, %v)", +// dSince(req.t), req.n, maxReserve, dSince(r.timeToAct), r.ok, dSince(req.act), req.ok) +// } +// return &r +//} +// +//func TestSimpleReserve(t *testing.T) { +// lim := NewLimiter(10, 2) +// +// runReserve(t, lim, request{t0, 2, t0, true}) +// runReserve(t, lim, request{t0, 2, t2, true}) +// runReserve(t, lim, request{t3, 2, t4, true}) +//} +// +//func TestMix(t *testing.T) { +// lim := NewLimiter(10, 2) +// +// runReserve(t, lim, request{t0, 3, t1, false}) // should return false because n > Burst +// runReserve(t, lim, request{t0, 2, t0, true}) +// run(t, lim, []allow{{t1, 2, false}}) // not enought tokens - don't allow +// runReserve(t, lim, request{t1, 2, t2, true}) +// run(t, lim, []allow{{t1, 1, false}}) // negative tokens - don't allow +// run(t, lim, []allow{{t3, 1, true}}) +//} +// +//func TestCancelInvalid(t *testing.T) { +// lim := NewLimiter(10, 2) +// +// runReserve(t, lim, request{t0, 2, t0, true}) +// r := runReserve(t, lim, request{t0, 3, t3, false}) +// r.CancelAt(t0) // should have no effect +// runReserve(t, lim, request{t0, 2, t2, true}) // did not get extra tokens +//} +// +//func TestCancelLast(t *testing.T) { +// lim := NewLimiter(10, 2) +// +// runReserve(t, lim, request{t0, 2, t0, true}) +// r := runReserve(t, lim, request{t0, 2, t2, true}) +// r.CancelAt(t1) // got 2 tokens back +// runReserve(t, lim, request{t1, 2, t2, true}) +//} +// +//func TestCancelTooLate(t *testing.T) { +// lim := NewLimiter(10, 2) +// +// runReserve(t, lim, request{t0, 2, t0, true}) +// r := runReserve(t, lim, request{t0, 2, t2, true}) +// r.CancelAt(t3) // too late to cancel - should have no effect +// runReserve(t, lim, request{t3, 2, t4, true}) +//} +// +//func TestCancel0Tokens(t *testing.T) { +// lim := NewLimiter(10, 2) +// +// runReserve(t, lim, request{t0, 2, t0, true}) +// r := runReserve(t, lim, request{t0, 1, t1, true}) +// runReserve(t, lim, request{t0, 1, t2, true}) +// r.CancelAt(t0) // got 0 tokens back +// runReserve(t, lim, request{t0, 1, t3, true}) +//} +// +//func TestCancel1Token(t *testing.T) { +// lim := NewLimiter(10, 2) +// +// runReserve(t, lim, request{t0, 2, t0, true}) +// r := runReserve(t, lim, request{t0, 2, t2, true}) +// runReserve(t, lim, request{t0, 1, t3, true}) +// r.CancelAt(t2) // got 1 token back +// runReserve(t, lim, request{t2, 2, t4, true}) +//} +// +//func TestCancelMulti(t *testing.T) { +// lim := NewLimiter(10, 4) +// +// runReserve(t, lim, request{t0, 4, t0, true}) +// rA := runReserve(t, lim, request{t0, 3, t3, true}) +// runReserve(t, lim, request{t0, 1, t4, true}) +// rC := runReserve(t, lim, request{t0, 1, t5, true}) +// rC.CancelAt(t1) // get 1 token back +// rA.CancelAt(t1) // get 2 tokens back, as if C was never reserved +// runReserve(t, lim, request{t1, 3, t5, true}) +//} +// +//func TestReserveJumpBack(t *testing.T) { +// lim := NewLimiter(10, 2) +// +// runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 +// runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst +// runReserve(t, lim, request{t2, 2, t3, true}) +//} + +//func TestReserveJumpBackCancel(t *testing.T) { +// lim := NewLimiter(10, 2) +// +// runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 +// r := runReserve(t, lim, request{t1, 2, t3, true}) +// runReserve(t, lim, request{t1, 1, t4, true}) +// r.CancelAt(t0) // cancel at t0, get 1 token back +// runReserve(t, lim, request{t1, 2, t4, true}) // should violate Limit,Burst +//} +// +//func TestReserveSetLimit(t *testing.T) { +// lim := NewLimiter(5, 2) +// +// runReserve(t, lim, request{t0, 2, t0, true}) +// runReserve(t, lim, request{t0, 2, t4, true}) +// lim.SetLimitAt(t2, 10) +// runReserve(t, lim, request{t2, 1, t4, true}) // violates Limit and Burst +//} +// +//func TestReserveSetBurst(t *testing.T) { +// lim := NewLimiter(5, 2) +// +// runReserve(t, lim, request{t0, 2, t0, true}) +// runReserve(t, lim, request{t0, 2, t4, true}) +// lim.SetBurstAt(t3, 4) +// runReserve(t, lim, request{t0, 4, t9, true}) // violates Limit and Burst +//} +// +//func TestReserveSetLimitCancel(t *testing.T) { +// lim := NewLimiter(5, 2) +// +// runReserve(t, lim, request{t0, 2, t0, true}) +// r := runReserve(t, lim, request{t0, 2, t4, true}) +// lim.SetLimitAt(t2, 10) +// r.CancelAt(t2) // 2 tokens back +// runReserve(t, lim, request{t2, 2, t3, true}) +//} +// +//func TestReserveMax(t *testing.T) { +// lim := NewLimiter(10, 2) +// maxT := d +// +// runReserveMax(t, lim, request{t0, 2, t0, true}, maxT) +// runReserveMax(t, lim, request{t0, 1, t1, true}, maxT) // reserve for close future +// runReserveMax(t, lim, request{t0, 1, t2, false}, maxT) // time to act too far in the future +//} + +type wait struct { + name string + ctx context.Context + n int + delay int // in multiples of d + nilErr bool +} + +func runWait(t *testing.T, lim *Limiter, w wait) { + t.Helper() + start := time.Now() + err := lim.WaitN(w.ctx, w.n) + delay := time.Since(start) + if (w.nilErr && err != nil) || (!w.nilErr && err == nil) || w.delay != dFromDuration(delay) { + errString := "<nil>" + if !w.nilErr { + errString = "<non-nil error>" + } + t.Errorf("lim.WaitN(%v, lim, %v) = %v with delay %v ; want %v with delay %v", + w.name, w.n, err, delay, errString, d*time.Duration(w.delay)) + } +} + +func TestWaitSimple(t *testing.T) { + lim := NewLimiter(10, 3) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + runWait(t, lim, wait{"already-cancelled", ctx, 1, 0, false}) + + runWait(t, lim, wait{"exceed-burst-error", context.Background(), 4, 0, false}) + + runWait(t, lim, wait{"act-now", context.Background(), 2, 0, true}) + runWait(t, lim, wait{"act-later", context.Background(), 3, 2, true}) +} + +func TestWaitCancel(t *testing.T) { + lim := NewLimiter(10, 3) + + ctx, cancel := context.WithCancel(context.Background()) + runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) // after this lim.tokens = 1 + go func() { + time.Sleep(d) + cancel() + }() + runWait(t, lim, wait{"will-cancel", ctx, 3, 1, false}) + // should get 3 tokens back, and have lim.tokens = 2 + //t.Logf("tokens:%v last:%v lastEvent:%v", lim.tokens, lim.last, lim.lastEvent) + runWait(t, lim, wait{"act-now-after-cancel", context.Background(), 2, 0, true}) +} + +func TestWaitTimeout(t *testing.T) { + lim := NewLimiter(10, 3) + + ctx, cancel := context.WithTimeout(context.Background(), d) + defer cancel() + runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) + runWait(t, lim, wait{"w-timeout-err", ctx, 3, 0, false}) +} + +func TestWaitInf(t *testing.T) { + lim := NewLimiter(Inf, 0) + + runWait(t, lim, wait{"exceed-burst-no-error", context.Background(), 3, 0, true}) +} + +func BenchmarkAllowN(b *testing.B) { + lim := NewLimiter(Every(1*time.Second), 1) + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + lim.AllowN(1) + } + }) +} + +func BenchmarkWaitNNoDelay(b *testing.B) { + lim := NewLimiter(Limit(b.N), b.N) + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + lim.WaitN(ctx, 1) + } +} + +func TestDecrement(t *testing.T) { + x := NewVar(1000) + for i := 0; i < 500; i++ { + go Atomically(VoidOperation(func(tx *Tx) { + cur := x.Get(tx) + x.Set(tx, cur-1) + })) + } + done := make(chan struct{}) + go func() { + Atomically(VoidOperation(func(tx *Tx) { + tx.Assert(x.Get(tx) == 500) + })) + close(done) + }() + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("decrement did not complete in time") + } +} + +// read-only transaction aren't exempt from calling tx.inputsChanged +func TestReadVerify(t *testing.T) { + read := make(chan struct{}) + x, y := NewVar(1), NewVar(2) + + // spawn a transaction that writes to x + go func() { + <-read + AtomicSet(x, 3) + read <- struct{}{} + // other tx should retry, so we need to read/send again + read <- <-read + }() + + // spawn a transaction that reads x, then y. The other tx will modify x in + // between the reads, causing this tx to retry. + var x2, y2 int + Atomically(VoidOperation(func(tx *Tx) { + x2 = x.Get(tx) + read <- struct{}{} + <-read // wait for other tx to complete + y2 = y.Get(tx) + })) + if x2 == 1 && y2 == 2 { + t.Fatal("read was not verified") + } +} + +func TestRetry(t *testing.T) { + x := NewVar(10) + // spawn 10 transactions, one every 10 milliseconds. This will decrement x + // to 0 over the course of 100 milliseconds. + go func() { + for i := 0; i < 10; i++ { + time.Sleep(10 * time.Millisecond) + Atomically(VoidOperation(func(tx *Tx) { + cur := x.Get(tx) + x.Set(tx, cur-1) + })) + } + }() + // Each time we read x before the above loop has finished, we need to + // retry. This should result in no more than 1 retry per transaction. + retry := 0 + Atomically(VoidOperation(func(tx *Tx) { + cur := x.Get(tx) + if cur != 0 { + retry++ + tx.Retry() + } + })) + if retry > 10 { + t.Fatal("should have retried at most 10 times, got", retry) + } +} + +func TestVerify(t *testing.T) { + // tx.inputsChanged should check more than pointer equality + type foo struct { + i int + } + x := NewVar(&foo{3}) + read := make(chan struct{}) + + // spawn a transaction that modifies x + go func() { + Atomically(VoidOperation(func(tx *Tx) { + <-read + rx := x.Get(tx) + rx.i = 7 + x.Set(tx, rx) + })) + read <- struct{}{} + // other tx should retry, so we need to read/send again + read <- <-read + }() + + // spawn a transaction that reads x, then y. The other tx will modify x in + // between the reads, causing this tx to retry. + var i int + Atomically(VoidOperation(func(tx *Tx) { + f := x.Get(tx) + i = f.i + read <- struct{}{} + <-read // wait for other tx to complete + })) + if i == 3 { + t.Fatal("inputsChanged did not retry despite modified Var", i) + } +} + +func TestSelect(t *testing.T) { + // empty Select should panic + // require.Panics(t, func() { Atomically(Select[struct{}]()) }) + + // with one arg, Select adds no effect + x := NewVar(2) + Atomically(Select(VoidOperation(func(tx *Tx) { + tx.Assert(x.Get(tx) == 2) + }))) + + picked := Atomically(Select( + // always blocks; should never be selected + func(tx *Tx) int { + tx.Retry() + panic("unreachable") + }, + // always succeeds; should always be selected + func(tx *Tx) int { + return 2 + }, + // always succeeds; should never be selected + func(tx *Tx) int { + return 3 + }, + )) + g.TAssertEqual(2, picked) +} + +func TestCompose(t *testing.T) { + nums := make([]int, 100) + fns := make([]Operation[struct{}], 100) + for i := range fns { + fns[i] = func(x int) Operation[struct{}] { + return VoidOperation(func(*Tx) { nums[x] = x }) + }(i) // capture loop var + } + Atomically(Compose(fns...)) + for i := range nums { + if nums[i] != i { + t.Error("Compose failed:", nums[i], i) + } + } +} + +func TestPanic(t *testing.T) { + // normal panics should escape Atomically + /* + assert.PanicsWithValue(t, "foo", func() { + Atomically(func(*Tx) any { + panic("foo") + }) + }) + */ +} + +func TestReadWritten(t *testing.T) { + // reading a variable written in the same transaction should return the + // previously written value + x := NewVar(3) + Atomically(VoidOperation(func(tx *Tx) { + x.Set(tx, 5) + tx.Assert(x.Get(tx) == 5) + })) +} + +func TestAtomicSetRetry(t *testing.T) { + // AtomicSet should cause waiting transactions to retry + x := NewVar(3) + done := make(chan struct{}) + go func() { + Atomically(VoidOperation(func(tx *Tx) { + tx.Assert(x.Get(tx) == 5) + })) + done <- struct{}{} + }() + time.Sleep(10 * time.Millisecond) + AtomicSet(x, 5) + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("AtomicSet did not wake up a waiting transaction") + } +} + +func testPingPong(t testing.TB, n int, afterHit func(string)) { + ball := NewBuiltinEqVar(false) + doneVar := NewVar(false) + hits := NewVar(0) + ready := NewVar(true) // The ball is ready for hitting. + var wg sync.WaitGroup + bat := func(from, to bool, noise string) { + defer wg.Done() + for !Atomically(func(tx *Tx) any { + if doneVar.Get(tx) { + return true + } + tx.Assert(ready.Get(tx)) + if ball.Get(tx) == from { + ball.Set(tx, to) + hits.Set(tx, hits.Get(tx)+1) + ready.Set(tx, false) + return false + } + return tx.Retry() + }).(bool) { + afterHit(noise) + AtomicSet(ready, true) + } + } + wg.Add(2) + go bat(false, true, "ping!") + go bat(true, false, "pong!") + Atomically(VoidOperation(func(tx *Tx) { + tx.Assert(hits.Get(tx) >= n) + doneVar.Set(tx, true) + })) + wg.Wait() +} + +func TestPingPong(t *testing.T) { + testPingPong(t, 42, func(s string) { t.Log(s) }) +} + +func TestSleepingBeauty(t *testing.T) { + /* + require.Panics(t, func() { + Atomically(func(tx *Tx) any { + tx.Assert(false) + return nil + }) + }) + */ +} + +//func TestRetryStack(t *testing.T) { +// v := NewVar[int](nil) +// go func() { +// i := 0 +// for { +// AtomicSet(v, i) +// i++ +// } +// }() +// Atomically(func(tx *Tx) any { +// debug.PrintStack() +// ret := func() { +// defer Atomically(nil) +// } +// v.Get(tx) +// tx.Assert(false) +// return ret +// }) +//} + +func TestContextEquality(t *testing.T) { + ctx := context.Background() + g.TAssertEqual(ctx, context.Background()) + childCtx, cancel := context.WithCancel(ctx) + g.TAssertEqual(childCtx != ctx, true) + g.TAssertEqual(childCtx != ctx, true) + g.TAssertEqual(context.Background(), ctx) + cancel() + g.TAssertEqual(context.Background(), ctx) + g.TAssertEqual(ctx != childCtx, true) +} + func MainTest() { |