diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cracha.go | 95 |
1 files changed, 93 insertions, 2 deletions
diff --git a/src/cracha.go b/src/cracha.go index 7a20449..5547d6e 100644 --- a/src/cracha.go +++ b/src/cracha.go @@ -1069,6 +1069,97 @@ func unregisterConsumers(queue q.IQueue, consumers []consumerT, prefix string) { } } +type resultT[T any] struct{ + value T + err error +} + +type taggedT[T any] struct{ + id int + value T +} + +func startRunner[A any, B any]( + in <-chan taggedT[A], + out chan<- taggedT[B], + fn func(A) B, + done func(), +) { + for input := range in { + out <- taggedT[B]{ + id: input.id, + value: fn(input.value), + } + } + done() +} + +func makePoolRunner[A any, B any](count int, fn func(A) B) (func(A) B, func()) { + var wg sync.WaitGroup + wg.Add(count) + + in := make(chan taggedT[A]) + out := make(chan taggedT[B]) + + for _ = range count { + go startRunner(in, out, fn, wg.Done) + } + + var mutex sync.Mutex + m := map[int]chan B{} + id := 0 + go func() { + for output := range out { + mutex.Lock() + defer mutex.Unlock() + m[output.id] <- output.value + close(m[output.id]) + delete(m, output.id) + } + }() + + poolRunFn := func(input A) B { + c := make(chan B) + { + mutex.Lock() + defer mutex.Unlock() + m[id] = c + id++ + } + + in <- taggedT[A]{ + id: id, + value: input, + } + return <- c + } + + close := func() { + close(in) + wg.Wait() + close(out) + } + + return poolRunFn, close +} + +func asResult[A any, B any](fn func(A) (B, error)) func(A) resultT[B] { + return func(input A) resultT[B] { + output, err := fn(input) + return resultT[B]{ + value: output, + err: err, + } + } +} + +func unwrapResult[A any, B any](fn func(A) resultT[B]) func(A) (B, error) { + return func(input A) (B, error) { + result := fn(input) + return result.value, result.err + } +} + func NewWithPrefix(databasePath string, prefix string) (IAuth, error) { queue, err := q.New(databasePath) if err != nil { @@ -1086,7 +1177,7 @@ func NewWithPrefix(databasePath string, prefix string) (IAuth, error) { } numCPU := runtime.NumCPU() - hasher, closeHasher := g.PoolRunner(numCPU, g.Resultify(scrypt.Hash)) + hasher, closeHasher := makePoolRunner(numCPU, asResult(scrypt.Hash)) closeFn := func() { unregisterConsumers(queue, consumers, prefix) @@ -1097,7 +1188,7 @@ func NewWithPrefix(databasePath string, prefix string) (IAuth, error) { queue: queue, db: db, queries: queries, - hasher: g.Unresultify(hasher), + hasher: unwrapResult(hasher), close: closeFn, } |