package stm import ( "context" "fmt" "math" "runtime" "sync" "sync/atomic" "testing" "time" g "gobang" ) func TestValue(t *testing.T) { v := New("hello") g.TAssertEqual("hello", v.Load()) v.Store("world") g.TAssertEqual("world", v.Load()) } func TestValueZeroValue(t *testing.T) { var v Value[string] // assert.Panics(t, func() { v.Load() }) v.Store("world") g.TAssertEqual("world", v.Load()) } func TestValueSwapZeroValue(t *testing.T) { // var v Value[string] // assert.Panics(t, func() { v.Swap("hello") }) } func TestInt32(t *testing.T) { v := NewInt32(0) g.TAssertEqual(0, v.Load()) g.TAssertEqual(true, v.CompareAndSwap(0, 10)) g.TAssertEqual(false, v.CompareAndSwap(0, 10)) } func BenchmarkInt64Add(b *testing.B) { v := NewInt64(0) for i := 0; i < b.N; i++ { v.Add(1) } } func BenchmarkIntInterfaceAdd(b *testing.B) { var v Int[int64] = NewInt64(0) for i := 0; i < b.N; i++ { v.Add(1) } } func BenchmarkStdlibInt64Add(b *testing.B) { var n int64 for i := 0; i < b.N; i++ { atomic.AddInt64(&n, 1) } } func BenchmarkInterfaceStore(b *testing.B) { var v Interface[string] = New("hello") for i := 0; i < b.N; i++ { v.Store(fmt.Sprint(i)) } } func BenchmarkValueStore(b *testing.B) { v := New("hello") for i := 0; i < b.N; i++ { v.Store(fmt.Sprint(i)) } } func BenchmarkStdlibValueStore(b *testing.B) { v := atomic.Value{} for i := 0; i < b.N; i++ { v.Store(fmt.Sprint(i)) } } func BenchmarkDeref(b *testing.B) { x := NewVar(0) for i := 0; i < b.N; i++ { Deref(x) } } func BenchmarkAtomicSet(b *testing.B) { x := NewVar(0) for i := 0; i < b.N; i++ { AtomicSet(x, 0) } } func BenchmarkIncrementSTM(b *testing.B) { for i := 0; i < b.N; i++ { // spawn 1000 goroutines that each increment x by 1 x := NewVar(0) for i := 0; i < 1000; i++ { go Atomically(VoidOperation(func(tx *Tx) { cur := x.Get(tx) x.Set(tx, cur+1) })) } // wait for x to reach 1000 Atomically(VoidOperation(func(tx *Tx) { tx.Assert(x.Get(tx) == 1000) })) } } func BenchmarkIncrementMutex(b *testing.B) { for i := 0; i < b.N; i++ { var mu sync.Mutex x := 0 for i := 0; i < 1000; i++ { go func() { mu.Lock() x++ mu.Unlock() }() } for { mu.Lock() read := x mu.Unlock() if read == 1000 { break } } } } func BenchmarkIncrementChannel(b *testing.B) { for i := 0; i < b.N; i++ { c := make(chan int, 1) c <- 0 for i := 0; i < 1000; i++ { go func() { c <- 1 + <-c }() } for { read := <-c if read == 1000 { break } c <- read } } } func BenchmarkReadVarSTM(b *testing.B) { for i := 0; i < b.N; i++ { var wg sync.WaitGroup wg.Add(1000) x := NewVar(0) for i := 0; i < 1000; i++ { go func() { Deref(x) wg.Done() }() } wg.Wait() } } func BenchmarkReadVarMutex(b *testing.B) { for i := 0; i < b.N; i++ { var mu sync.Mutex var wg sync.WaitGroup wg.Add(1000) x := 0 for i := 0; i < 1000; i++ { go func() { mu.Lock() _ = x mu.Unlock() wg.Done() }() } wg.Wait() } } func BenchmarkReadVarChannel(b *testing.B) { for i := 0; i < b.N; i++ { var wg sync.WaitGroup wg.Add(1000) c := make(chan int) close(c) for i := 0; i < 1000; i++ { go func() { <-c wg.Done() }() } wg.Wait() } } func parallelPingPongs(b *testing.B, n int) { var wg sync.WaitGroup wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() testPingPong(b, b.N, func(string) {}) }() } wg.Wait() } func BenchmarkPingPong4(b *testing.B) { b.ReportAllocs() parallelPingPongs(b, 4) } func BenchmarkPingPong(b *testing.B) { b.ReportAllocs() parallelPingPongs(b, 1) } func Example() { // create a shared variable n := NewVar(3) // read a variable var v int Atomically(VoidOperation(func(tx *Tx) { v = n.Get(tx) })) // or: v = Deref(n) _ = v // write to a variable Atomically(VoidOperation(func(tx *Tx) { n.Set(tx, 12) })) // or: AtomicSet(n, 12) // update a variable Atomically(VoidOperation(func(tx *Tx) { cur := n.Get(tx) n.Set(tx, cur-1) })) // block until a condition is met Atomically(VoidOperation(func(tx *Tx) { cur := n.Get(tx) if cur != 0 { tx.Retry() } n.Set(tx, 10) })) // or: Atomically(VoidOperation(func(tx *Tx) { cur := n.Get(tx) tx.Assert(cur == 0) n.Set(tx, 10) })) // select among multiple (potentially blocking) transactions Atomically(Select( // this function blocks forever, so it will be skipped VoidOperation(func(tx *Tx) { tx.Retry() }), // this function will always succeed without blocking VoidOperation(func(tx *Tx) { n.Set(tx, 10) }), // this function will never run, because the previous // function succeeded VoidOperation(func(tx *Tx) { n.Set(tx, 11) }), )) // since Select is a normal transaction, if the entire select retries // (blocks), it will be retried as a whole: x := 0 Atomically(Select( // this function will run twice, and succeed the second time VoidOperation(func(tx *Tx) { tx.Assert(x == 1) }), // this function will run once VoidOperation(func(tx *Tx) { x = 1 tx.Retry() }), )) // But wait! Transactions are only retried when one of the Vars they read is // updated. Since x isn't a stm Var, this code will actually block forever -- // but you get the idea. } const maxTokens = 25 func BenchmarkThunderingHerdCondVar(b *testing.B) { for i := 0; i < b.N; i++ { var mu sync.Mutex consumer := sync.NewCond(&mu) generator := sync.NewCond(&mu) done := false tokens := 0 var pending sync.WaitGroup for i := 0; i < 1000; i++ { pending.Add(1) go func() { mu.Lock() for { if tokens > 0 { tokens-- generator.Signal() break } consumer.Wait() } mu.Unlock() pending.Done() }() } go func() { mu.Lock() for !done { if tokens < maxTokens { tokens++ consumer.Signal() } else { generator.Wait() } } mu.Unlock() }() pending.Wait() mu.Lock() done = true generator.Signal() mu.Unlock() } } func BenchmarkThunderingHerd(b *testing.B) { for i := 0; i < b.N; i++ { done := NewBuiltinEqVar(false) tokens := NewBuiltinEqVar(0) pending := NewBuiltinEqVar(0) for i := 0; i < 1000; i++ { Atomically(VoidOperation(func(tx *Tx) { pending.Set(tx, pending.Get(tx)+1) })) go func() { Atomically(VoidOperation(func(tx *Tx) { t := tokens.Get(tx) if t > 0 { tokens.Set(tx, t-1) pending.Set(tx, pending.Get(tx)-1) } else { tx.Retry() } })) }() } go func() { for Atomically(func(tx *Tx) bool { if done.Get(tx) { return false } tx.Assert(tokens.Get(tx) < maxTokens) tokens.Set(tx, tokens.Get(tx)+1) return true }) { } }() Atomically(VoidOperation(func(tx *Tx) { tx.Assert(pending.Get(tx) == 0) })) AtomicSet(done, true) } } func BenchmarkInvertedThunderingHerd(b *testing.B) { for i := 0; i < b.N; i++ { done := NewBuiltinEqVar(false) tokens := NewBuiltinEqVar(0) pending := NewVar(NewSet[*Var[bool]]()) for i := 0; i < 1000; i++ { ready := NewVar(false) Atomically(VoidOperation(func(tx *Tx) { pending.Set(tx, pending.Get(tx).Add(ready)) })) go func() { Atomically(VoidOperation(func(tx *Tx) { tx.Assert(ready.Get(tx)) set := pending.Get(tx) if !set.Contains(ready) { panic("couldn't find ourselves in pending") } pending.Set(tx, set.Delete(ready)) })) //b.Log("waiter finished") }() } go func() { for Atomically(func(tx *Tx) bool { if done.Get(tx) { return false } tx.Assert(tokens.Get(tx) < maxTokens) tokens.Set(tx, tokens.Get(tx)+1) return true }) { } }() go func() { for Atomically(func(tx *Tx) bool { tx.Assert(tokens.Get(tx) > 0) tokens.Set(tx, tokens.Get(tx)-1) pending.Get(tx).Range(func(ready *Var[bool]) bool { if !ready.Get(tx) { ready.Set(tx, true) return false } return true }) return !done.Get(tx) }) { } }() Atomically(VoidOperation(func(tx *Tx) { tx.Assert(pending.Get(tx).(Lenner).Len() == 0) })) AtomicSet(done, true) } } func TestLimit(t *testing.T) { if Limit(10) == Inf { t.Errorf("Limit(10) == Inf should be false") } } func closeEnough(a, b Limit) bool { return (math.Abs(float64(a)/float64(b)) - 1.0) < 1e-9 } func TestEvery(t *testing.T) { cases := []struct { interval time.Duration lim Limit }{ {0, Inf}, {-1, Inf}, {1 * time.Nanosecond, Limit(1e9)}, {1 * time.Microsecond, Limit(1e6)}, {1 * time.Millisecond, Limit(1e3)}, {10 * time.Millisecond, Limit(100)}, {100 * time.Millisecond, Limit(10)}, {1 * time.Second, Limit(1)}, {2 * time.Second, Limit(0.5)}, {time.Duration(2.5 * float64(time.Second)), Limit(0.4)}, {4 * time.Second, Limit(0.25)}, {10 * time.Second, Limit(0.1)}, {time.Duration(math.MaxInt64), Limit(1e9 / float64(math.MaxInt64))}, } for _, tc := range cases { lim := Every(tc.interval) if !closeEnough(lim, tc.lim) { t.Errorf("Every(%v) = %v want %v", tc.interval, lim, tc.lim) } } } const ( d = 100 * time.Millisecond ) var ( t0 = time.Now() t1 = t0.Add(time.Duration(1) * d) t2 = t0.Add(time.Duration(2) * d) t3 = t0.Add(time.Duration(3) * d) t4 = t0.Add(time.Duration(4) * d) t5 = t0.Add(time.Duration(5) * d) t9 = t0.Add(time.Duration(9) * d) ) type allow struct { t time.Time n int ok bool } // //func run(t *testing.T, lim *Limiter, allows []allow) { // for i, allow := range allows { // ok := lim.AllowN(allow.t, allow.n) // if ok != allow.ok { // t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v", // i, allow.t, allow.n, ok, allow.ok) // } // } //} // //func TestLimiterBurst1(t *testing.T) { // run(t, NewLimiter(10, 1), []allow{ // {t0, 1, true}, // {t0, 1, false}, // {t0, 1, false}, // {t1, 1, true}, // {t1, 1, false}, // {t1, 1, false}, // {t2, 2, false}, // burst size is 1, so n=2 always fails // {t2, 1, true}, // {t2, 1, false}, // }) //} // //func TestLimiterBurst3(t *testing.T) { // run(t, NewLimiter(10, 3), []allow{ // {t0, 2, true}, // {t0, 2, false}, // {t0, 1, true}, // {t0, 1, false}, // {t1, 4, false}, // {t2, 1, true}, // {t3, 1, true}, // {t4, 1, true}, // {t4, 1, true}, // {t4, 1, false}, // {t4, 1, false}, // {t9, 3, true}, // {t9, 0, true}, // }) //} // //func TestLimiterJumpBackwards(t *testing.T) { // run(t, NewLimiter(10, 3), []allow{ // {t1, 1, true}, // start at t1 // {t0, 1, true}, // jump back to t0, two tokens remain // {t0, 1, true}, // {t0, 1, false}, // {t0, 1, false}, // {t1, 1, true}, // got a token // {t1, 1, false}, // {t1, 1, false}, // {t2, 1, true}, // got another token // {t2, 1, false}, // {t2, 1, false}, // }) //} // Ensure that tokensFromDuration doesn't produce // rounding errors by truncating nanoseconds. // See golang.org/issues/34861. func TestLimiter_noTruncationErrors(t *testing.T) { if !NewLimiter(0.7692307692307693, 1).Allow() { t.Fatal("expected true") } } func TestSimultaneousRequests(t *testing.T) { const ( limit = 1 burst = 5 numRequests = 15 ) var ( wg sync.WaitGroup numOK = uint32(0) ) // Very slow replenishing bucket. lim := NewLimiter(limit, burst) // Tries to take a token, atomically updates the counter and decreases the wait // group counter. f := func() { defer wg.Done() if ok := lim.Allow(); ok { atomic.AddUint32(&numOK, 1) } } wg.Add(numRequests) for i := 0; i < numRequests; i++ { go f() } wg.Wait() if numOK != burst { t.Errorf("numOK = %d, want %d", numOK, burst) } } func TestLongRunningQPS(t *testing.T) { if testing.Short() { t.Skip("skipping in short mode") } if runtime.GOOS == "openbsd" { t.Skip("low resolution time.Sleep invalidates test (golang.org/issue/14183)") return } // The test runs for a few seconds executing many requests and then checks // that overall number of requests is reasonable. const ( limit = 100 burst = 100 ) var numOK = int32(0) lim := NewLimiter(limit, burst) var wg sync.WaitGroup f := func() { if ok := lim.Allow(); ok { atomic.AddInt32(&numOK, 1) } wg.Done() } start := time.Now() end := start.Add(5 * time.Second) for time.Now().Before(end) { wg.Add(1) go f() // This will still offer ~500 requests per second, but won't consume // outrageous amount of CPU. time.Sleep(2 * time.Millisecond) } wg.Wait() elapsed := time.Since(start) ideal := burst + (limit * float64(elapsed) / float64(time.Second)) // We should never get more requests than allowed. if want := int32(ideal + 1); numOK > want { t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) } // We should get very close to the number of requests allowed. if want := int32(0.999 * ideal); numOK < want { t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) } } type request struct { t time.Time n int act time.Time ok bool } // dFromDuration converts a duration to a multiple of the global constant d func dFromDuration(dur time.Duration) int { // Adding a millisecond to be swallowed by the integer division // because we don't care about small inaccuracies return int((dur + time.Millisecond) / d) } // dSince returns multiples of d since t0 func dSince(t time.Time) int { return dFromDuration(t.Sub(t0)) } // //func runReserve(t *testing.T, lim *Limiter, req request) *Reservation { // return runReserveMax(t, lim, req, InfDuration) //} // //func runReserveMax(t *testing.T, lim *Limiter, req request, maxReserve time.Duration) *Reservation { // r := lim.reserveN(req.t, req.n, maxReserve) // if r.ok && (dSince(r.timeToAct) != dSince(req.act)) || r.ok != req.ok { // t.Errorf("lim.reserveN(t%d, %v, %v) = (t%d, %v) want (t%d, %v)", // dSince(req.t), req.n, maxReserve, dSince(r.timeToAct), r.ok, dSince(req.act), req.ok) // } // return &r //} // //func TestSimpleReserve(t *testing.T) { // lim := NewLimiter(10, 2) // // runReserve(t, lim, request{t0, 2, t0, true}) // runReserve(t, lim, request{t0, 2, t2, true}) // runReserve(t, lim, request{t3, 2, t4, true}) //} // //func TestMix(t *testing.T) { // lim := NewLimiter(10, 2) // // runReserve(t, lim, request{t0, 3, t1, false}) // should return false because n > Burst // runReserve(t, lim, request{t0, 2, t0, true}) // run(t, lim, []allow{{t1, 2, false}}) // not enought tokens - don't allow // runReserve(t, lim, request{t1, 2, t2, true}) // run(t, lim, []allow{{t1, 1, false}}) // negative tokens - don't allow // run(t, lim, []allow{{t3, 1, true}}) //} // //func TestCancelInvalid(t *testing.T) { // lim := NewLimiter(10, 2) // // runReserve(t, lim, request{t0, 2, t0, true}) // r := runReserve(t, lim, request{t0, 3, t3, false}) // r.CancelAt(t0) // should have no effect // runReserve(t, lim, request{t0, 2, t2, true}) // did not get extra tokens //} // //func TestCancelLast(t *testing.T) { // lim := NewLimiter(10, 2) // // runReserve(t, lim, request{t0, 2, t0, true}) // r := runReserve(t, lim, request{t0, 2, t2, true}) // r.CancelAt(t1) // got 2 tokens back // runReserve(t, lim, request{t1, 2, t2, true}) //} // //func TestCancelTooLate(t *testing.T) { // lim := NewLimiter(10, 2) // // runReserve(t, lim, request{t0, 2, t0, true}) // r := runReserve(t, lim, request{t0, 2, t2, true}) // r.CancelAt(t3) // too late to cancel - should have no effect // runReserve(t, lim, request{t3, 2, t4, true}) //} // //func TestCancel0Tokens(t *testing.T) { // lim := NewLimiter(10, 2) // // runReserve(t, lim, request{t0, 2, t0, true}) // r := runReserve(t, lim, request{t0, 1, t1, true}) // runReserve(t, lim, request{t0, 1, t2, true}) // r.CancelAt(t0) // got 0 tokens back // runReserve(t, lim, request{t0, 1, t3, true}) //} // //func TestCancel1Token(t *testing.T) { // lim := NewLimiter(10, 2) // // runReserve(t, lim, request{t0, 2, t0, true}) // r := runReserve(t, lim, request{t0, 2, t2, true}) // runReserve(t, lim, request{t0, 1, t3, true}) // r.CancelAt(t2) // got 1 token back // runReserve(t, lim, request{t2, 2, t4, true}) //} // //func TestCancelMulti(t *testing.T) { // lim := NewLimiter(10, 4) // // runReserve(t, lim, request{t0, 4, t0, true}) // rA := runReserve(t, lim, request{t0, 3, t3, true}) // runReserve(t, lim, request{t0, 1, t4, true}) // rC := runReserve(t, lim, request{t0, 1, t5, true}) // rC.CancelAt(t1) // get 1 token back // rA.CancelAt(t1) // get 2 tokens back, as if C was never reserved // runReserve(t, lim, request{t1, 3, t5, true}) //} // //func TestReserveJumpBack(t *testing.T) { // lim := NewLimiter(10, 2) // // runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 // runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst // runReserve(t, lim, request{t2, 2, t3, true}) //} //func TestReserveJumpBackCancel(t *testing.T) { // lim := NewLimiter(10, 2) // // runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 // r := runReserve(t, lim, request{t1, 2, t3, true}) // runReserve(t, lim, request{t1, 1, t4, true}) // r.CancelAt(t0) // cancel at t0, get 1 token back // runReserve(t, lim, request{t1, 2, t4, true}) // should violate Limit,Burst //} // //func TestReserveSetLimit(t *testing.T) { // lim := NewLimiter(5, 2) // // runReserve(t, lim, request{t0, 2, t0, true}) // runReserve(t, lim, request{t0, 2, t4, true}) // lim.SetLimitAt(t2, 10) // runReserve(t, lim, request{t2, 1, t4, true}) // violates Limit and Burst //} // //func TestReserveSetBurst(t *testing.T) { // lim := NewLimiter(5, 2) // // runReserve(t, lim, request{t0, 2, t0, true}) // runReserve(t, lim, request{t0, 2, t4, true}) // lim.SetBurstAt(t3, 4) // runReserve(t, lim, request{t0, 4, t9, true}) // violates Limit and Burst //} // //func TestReserveSetLimitCancel(t *testing.T) { // lim := NewLimiter(5, 2) // // runReserve(t, lim, request{t0, 2, t0, true}) // r := runReserve(t, lim, request{t0, 2, t4, true}) // lim.SetLimitAt(t2, 10) // r.CancelAt(t2) // 2 tokens back // runReserve(t, lim, request{t2, 2, t3, true}) //} // //func TestReserveMax(t *testing.T) { // lim := NewLimiter(10, 2) // maxT := d // // runReserveMax(t, lim, request{t0, 2, t0, true}, maxT) // runReserveMax(t, lim, request{t0, 1, t1, true}, maxT) // reserve for close future // runReserveMax(t, lim, request{t0, 1, t2, false}, maxT) // time to act too far in the future //} type wait struct { name string ctx context.Context n int delay int // in multiples of d nilErr bool } func runWait(t *testing.T, lim *Limiter, w wait) { t.Helper() start := time.Now() err := lim.WaitN(w.ctx, w.n) delay := time.Since(start) if (w.nilErr && err != nil) || (!w.nilErr && err == nil) || w.delay != dFromDuration(delay) { errString := "" if !w.nilErr { errString = "" } t.Errorf("lim.WaitN(%v, lim, %v) = %v with delay %v ; want %v with delay %v", w.name, w.n, err, delay, errString, d*time.Duration(w.delay)) } } func TestWaitSimple(t *testing.T) { lim := NewLimiter(10, 3) ctx, cancel := context.WithCancel(context.Background()) cancel() runWait(t, lim, wait{"already-cancelled", ctx, 1, 0, false}) runWait(t, lim, wait{"exceed-burst-error", context.Background(), 4, 0, false}) runWait(t, lim, wait{"act-now", context.Background(), 2, 0, true}) runWait(t, lim, wait{"act-later", context.Background(), 3, 2, true}) } func TestWaitCancel(t *testing.T) { lim := NewLimiter(10, 3) ctx, cancel := context.WithCancel(context.Background()) runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) // after this lim.tokens = 1 go func() { time.Sleep(d) cancel() }() runWait(t, lim, wait{"will-cancel", ctx, 3, 1, false}) // should get 3 tokens back, and have lim.tokens = 2 //t.Logf("tokens:%v last:%v lastEvent:%v", lim.tokens, lim.last, lim.lastEvent) runWait(t, lim, wait{"act-now-after-cancel", context.Background(), 2, 0, true}) } func TestWaitTimeout(t *testing.T) { lim := NewLimiter(10, 3) ctx, cancel := context.WithTimeout(context.Background(), d) defer cancel() runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) runWait(t, lim, wait{"w-timeout-err", ctx, 3, 0, false}) } func TestWaitInf(t *testing.T) { lim := NewLimiter(Inf, 0) runWait(t, lim, wait{"exceed-burst-no-error", context.Background(), 3, 0, true}) } func BenchmarkAllowN(b *testing.B) { lim := NewLimiter(Every(1*time.Second), 1) b.ReportAllocs() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { lim.AllowN(1) } }) } func BenchmarkWaitNNoDelay(b *testing.B) { lim := NewLimiter(Limit(b.N), b.N) ctx := context.Background() b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { lim.WaitN(ctx, 1) } } func TestDecrement(t *testing.T) { x := NewVar(1000) for i := 0; i < 500; i++ { go Atomically(VoidOperation(func(tx *Tx) { cur := x.Get(tx) x.Set(tx, cur-1) })) } done := make(chan struct{}) go func() { Atomically(VoidOperation(func(tx *Tx) { tx.Assert(x.Get(tx) == 500) })) close(done) }() select { case <-done: case <-time.After(10 * time.Second): t.Fatal("decrement did not complete in time") } } // read-only transaction aren't exempt from calling tx.inputsChanged func TestReadVerify(t *testing.T) { read := make(chan struct{}) x, y := NewVar(1), NewVar(2) // spawn a transaction that writes to x go func() { <-read AtomicSet(x, 3) read <- struct{}{} // other tx should retry, so we need to read/send again read <- <-read }() // spawn a transaction that reads x, then y. The other tx will modify x in // between the reads, causing this tx to retry. var x2, y2 int Atomically(VoidOperation(func(tx *Tx) { x2 = x.Get(tx) read <- struct{}{} <-read // wait for other tx to complete y2 = y.Get(tx) })) if x2 == 1 && y2 == 2 { t.Fatal("read was not verified") } } func TestRetry(t *testing.T) { x := NewVar(10) // spawn 10 transactions, one every 10 milliseconds. This will decrement x // to 0 over the course of 100 milliseconds. go func() { for i := 0; i < 10; i++ { time.Sleep(10 * time.Millisecond) Atomically(VoidOperation(func(tx *Tx) { cur := x.Get(tx) x.Set(tx, cur-1) })) } }() // Each time we read x before the above loop has finished, we need to // retry. This should result in no more than 1 retry per transaction. retry := 0 Atomically(VoidOperation(func(tx *Tx) { cur := x.Get(tx) if cur != 0 { retry++ tx.Retry() } })) if retry > 10 { t.Fatal("should have retried at most 10 times, got", retry) } } func TestVerify(t *testing.T) { // tx.inputsChanged should check more than pointer equality type foo struct { i int } x := NewVar(&foo{3}) read := make(chan struct{}) // spawn a transaction that modifies x go func() { Atomically(VoidOperation(func(tx *Tx) { <-read rx := x.Get(tx) rx.i = 7 x.Set(tx, rx) })) read <- struct{}{} // other tx should retry, so we need to read/send again read <- <-read }() // spawn a transaction that reads x, then y. The other tx will modify x in // between the reads, causing this tx to retry. var i int Atomically(VoidOperation(func(tx *Tx) { f := x.Get(tx) i = f.i read <- struct{}{} <-read // wait for other tx to complete })) if i == 3 { t.Fatal("inputsChanged did not retry despite modified Var", i) } } func TestSelect(t *testing.T) { // empty Select should panic // require.Panics(t, func() { Atomically(Select[struct{}]()) }) // with one arg, Select adds no effect x := NewVar(2) Atomically(Select(VoidOperation(func(tx *Tx) { tx.Assert(x.Get(tx) == 2) }))) picked := Atomically(Select( // always blocks; should never be selected func(tx *Tx) int { tx.Retry() panic("unreachable") }, // always succeeds; should always be selected func(tx *Tx) int { return 2 }, // always succeeds; should never be selected func(tx *Tx) int { return 3 }, )) g.TAssertEqual(2, picked) } func TestCompose(t *testing.T) { nums := make([]int, 100) fns := make([]Operation[struct{}], 100) for i := range fns { fns[i] = func(x int) Operation[struct{}] { return VoidOperation(func(*Tx) { nums[x] = x }) }(i) // capture loop var } Atomically(Compose(fns...)) for i := range nums { if nums[i] != i { t.Error("Compose failed:", nums[i], i) } } } func TestPanic(t *testing.T) { // normal panics should escape Atomically /* assert.PanicsWithValue(t, "foo", func() { Atomically(func(*Tx) any { panic("foo") }) }) */ } func TestReadWritten(t *testing.T) { // reading a variable written in the same transaction should return the // previously written value x := NewVar(3) Atomically(VoidOperation(func(tx *Tx) { x.Set(tx, 5) tx.Assert(x.Get(tx) == 5) })) } func TestAtomicSetRetry(t *testing.T) { // AtomicSet should cause waiting transactions to retry x := NewVar(3) done := make(chan struct{}) go func() { Atomically(VoidOperation(func(tx *Tx) { tx.Assert(x.Get(tx) == 5) })) done <- struct{}{} }() time.Sleep(10 * time.Millisecond) AtomicSet(x, 5) select { case <-done: case <-time.After(time.Second): t.Fatal("AtomicSet did not wake up a waiting transaction") } } func testPingPong(t testing.TB, n int, afterHit func(string)) { ball := NewBuiltinEqVar(false) doneVar := NewVar(false) hits := NewVar(0) ready := NewVar(true) // The ball is ready for hitting. var wg sync.WaitGroup bat := func(from, to bool, noise string) { defer wg.Done() for !Atomically(func(tx *Tx) any { if doneVar.Get(tx) { return true } tx.Assert(ready.Get(tx)) if ball.Get(tx) == from { ball.Set(tx, to) hits.Set(tx, hits.Get(tx)+1) ready.Set(tx, false) return false } return tx.Retry() }).(bool) { afterHit(noise) AtomicSet(ready, true) } } wg.Add(2) go bat(false, true, "ping!") go bat(true, false, "pong!") Atomically(VoidOperation(func(tx *Tx) { tx.Assert(hits.Get(tx) >= n) doneVar.Set(tx, true) })) wg.Wait() } func TestPingPong(t *testing.T) { testPingPong(t, 42, func(s string) { t.Log(s) }) } func TestSleepingBeauty(t *testing.T) { /* require.Panics(t, func() { Atomically(func(tx *Tx) any { tx.Assert(false) return nil }) }) */ } //func TestRetryStack(t *testing.T) { // v := NewVar[int](nil) // go func() { // i := 0 // for { // AtomicSet(v, i) // i++ // } // }() // Atomically(func(tx *Tx) any { // debug.PrintStack() // ret := func() { // defer Atomically(nil) // } // v.Get(tx) // tx.Assert(false) // return ret // }) //} func TestContextEquality(t *testing.T) { ctx := context.Background() g.TAssertEqual(ctx, context.Background()) childCtx, cancel := context.WithCancel(ctx) g.TAssertEqual(childCtx != ctx, true) g.TAssertEqual(childCtx != ctx, true) g.TAssertEqual(context.Background(), ctx) cancel() g.TAssertEqual(context.Background(), ctx) g.TAssertEqual(ctx != childCtx, true) } func MainTest() { }