diff options
-rw-r--r-- | src/cracha.go | 95 | ||||
-rw-r--r-- | tests/cracha.go | 20 |
2 files changed, 113 insertions, 2 deletions
diff --git a/src/cracha.go b/src/cracha.go index 7a20449..5547d6e 100644 --- a/src/cracha.go +++ b/src/cracha.go @@ -1069,6 +1069,97 @@ func unregisterConsumers(queue q.IQueue, consumers []consumerT, prefix string) { } } +type resultT[T any] struct{ + value T + err error +} + +type taggedT[T any] struct{ + id int + value T +} + +func startRunner[A any, B any]( + in <-chan taggedT[A], + out chan<- taggedT[B], + fn func(A) B, + done func(), +) { + for input := range in { + out <- taggedT[B]{ + id: input.id, + value: fn(input.value), + } + } + done() +} + +func makePoolRunner[A any, B any](count int, fn func(A) B) (func(A) B, func()) { + var wg sync.WaitGroup + wg.Add(count) + + in := make(chan taggedT[A]) + out := make(chan taggedT[B]) + + for _ = range count { + go startRunner(in, out, fn, wg.Done) + } + + var mutex sync.Mutex + m := map[int]chan B{} + id := 0 + go func() { + for output := range out { + mutex.Lock() + defer mutex.Unlock() + m[output.id] <- output.value + close(m[output.id]) + delete(m, output.id) + } + }() + + poolRunFn := func(input A) B { + c := make(chan B) + { + mutex.Lock() + defer mutex.Unlock() + m[id] = c + id++ + } + + in <- taggedT[A]{ + id: id, + value: input, + } + return <- c + } + + close := func() { + close(in) + wg.Wait() + close(out) + } + + return poolRunFn, close +} + +func asResult[A any, B any](fn func(A) (B, error)) func(A) resultT[B] { + return func(input A) resultT[B] { + output, err := fn(input) + return resultT[B]{ + value: output, + err: err, + } + } +} + +func unwrapResult[A any, B any](fn func(A) resultT[B]) func(A) (B, error) { + return func(input A) (B, error) { + result := fn(input) + return result.value, result.err + } +} + func NewWithPrefix(databasePath string, prefix string) (IAuth, error) { queue, err := q.New(databasePath) if err != nil { @@ -1086,7 +1177,7 @@ func NewWithPrefix(databasePath string, prefix string) (IAuth, error) { } numCPU := runtime.NumCPU() - hasher, closeHasher := g.PoolRunner(numCPU, g.Resultify(scrypt.Hash)) + hasher, closeHasher := makePoolRunner(numCPU, asResult(scrypt.Hash)) closeFn := func() { unregisterConsumers(queue, consumers, prefix) @@ -1097,7 +1188,7 @@ func NewWithPrefix(databasePath string, prefix string) (IAuth, error) { queue: queue, db: db, queries: queries, - hasher: g.Unresultify(hasher), + hasher: unwrapResult(hasher), close: closeFn, } diff --git a/tests/cracha.go b/tests/cracha.go index 746242b..905733c 100644 --- a/tests/cracha.go +++ b/tests/cracha.go @@ -740,6 +740,22 @@ func test_unregisterConsumers() { // FIXME } +func test_startRunner() { + // FIXME +} + +func test_makePoolRunner() { + // FIXME +} + +func test_asResult() { + // FIXME +} + +func test_unwrapResult() { + // FIXME +} + func test_NewWithPrefix() { // FIXME } @@ -889,6 +905,10 @@ func MainTest() { test_forgotPasswordRequestHandler() test_registerConsumers() test_unregisterConsumers() + test_startRunner() + test_makePoolRunner() + test_asResult() + test_unwrapResult() test_NewWithPrefix() test_New() test_newUserPayload() |