summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cracha.go95
1 files changed, 93 insertions, 2 deletions
diff --git a/src/cracha.go b/src/cracha.go
index 7a20449..5547d6e 100644
--- a/src/cracha.go
+++ b/src/cracha.go
@@ -1069,6 +1069,97 @@ func unregisterConsumers(queue q.IQueue, consumers []consumerT, prefix string) {
}
}
+type resultT[T any] struct{
+ value T
+ err error
+}
+
+type taggedT[T any] struct{
+ id int
+ value T
+}
+
+func startRunner[A any, B any](
+ in <-chan taggedT[A],
+ out chan<- taggedT[B],
+ fn func(A) B,
+ done func(),
+) {
+ for input := range in {
+ out <- taggedT[B]{
+ id: input.id,
+ value: fn(input.value),
+ }
+ }
+ done()
+}
+
+func makePoolRunner[A any, B any](count int, fn func(A) B) (func(A) B, func()) {
+ var wg sync.WaitGroup
+ wg.Add(count)
+
+ in := make(chan taggedT[A])
+ out := make(chan taggedT[B])
+
+ for _ = range count {
+ go startRunner(in, out, fn, wg.Done)
+ }
+
+ var mutex sync.Mutex
+ m := map[int]chan B{}
+ id := 0
+ go func() {
+ for output := range out {
+ mutex.Lock()
+ defer mutex.Unlock()
+ m[output.id] <- output.value
+ close(m[output.id])
+ delete(m, output.id)
+ }
+ }()
+
+ poolRunFn := func(input A) B {
+ c := make(chan B)
+ {
+ mutex.Lock()
+ defer mutex.Unlock()
+ m[id] = c
+ id++
+ }
+
+ in <- taggedT[A]{
+ id: id,
+ value: input,
+ }
+ return <- c
+ }
+
+ close := func() {
+ close(in)
+ wg.Wait()
+ close(out)
+ }
+
+ return poolRunFn, close
+}
+
+func asResult[A any, B any](fn func(A) (B, error)) func(A) resultT[B] {
+ return func(input A) resultT[B] {
+ output, err := fn(input)
+ return resultT[B]{
+ value: output,
+ err: err,
+ }
+ }
+}
+
+func unwrapResult[A any, B any](fn func(A) resultT[B]) func(A) (B, error) {
+ return func(input A) (B, error) {
+ result := fn(input)
+ return result.value, result.err
+ }
+}
+
func NewWithPrefix(databasePath string, prefix string) (IAuth, error) {
queue, err := q.New(databasePath)
if err != nil {
@@ -1086,7 +1177,7 @@ func NewWithPrefix(databasePath string, prefix string) (IAuth, error) {
}
numCPU := runtime.NumCPU()
- hasher, closeHasher := g.PoolRunner(numCPU, g.Resultify(scrypt.Hash))
+ hasher, closeHasher := makePoolRunner(numCPU, asResult(scrypt.Hash))
closeFn := func() {
unregisterConsumers(queue, consumers, prefix)
@@ -1097,7 +1188,7 @@ func NewWithPrefix(databasePath string, prefix string) (IAuth, error) {
queue: queue,
db: db,
queries: queries,
- hasher: g.Unresultify(hasher),
+ hasher: unwrapResult(hasher),
close: closeFn,
}