summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2024-09-17 08:01:05 -0300
committerEuAndreh <eu@euandre.org>2024-10-20 07:39:33 -0300
commitab1795aeb8f00b61c331ac77fdc1011ec14c5253 (patch)
tree507b72b45f23f8a1bf1a1684a842fef51f1139a8
parentInit Go project skeleton with golite init (diff)
downloadfiinha-ab1795aeb8f00b61c331ac77fdc1011ec14c5253.tar.gz
fiinha-ab1795aeb8f00b61c331ac77fdc1011ec14c5253.tar.xz
Initial version: first implementation
-rw-r--r--.gitignore11
-rw-r--r--Makefile87
-rw-r--r--deps.mk264
-rwxr-xr-xmkdeps.sh25
-rw-r--r--src/liteq.go117
-rw-r--r--src/main.go4
-rw-r--r--src/q.go2495
l---------tests/benchmarks/deadletters/main.go1
-rw-r--r--tests/benchmarks/deadletters/q.go24
l---------tests/benchmarks/lookup/main.go1
-rw-r--r--tests/benchmarks/lookup/q.go24
l---------tests/benchmarks/multiple-consumers/main.go1
-rw-r--r--tests/benchmarks/multiple-consumers/q.go24
l---------tests/benchmarks/multiple-produces/main.go1
-rw-r--r--tests/benchmarks/multiple-produces/q.go24
l---------tests/benchmarks/reaper/main.go1
-rw-r--r--tests/benchmarks/reaper/q.go24
l---------tests/benchmarks/replay/main.go1
-rw-r--r--tests/benchmarks/replay/q.go24
l---------tests/benchmarks/single-consumer/main.go1
-rw-r--r--tests/benchmarks/single-consumer/q.go24
l---------tests/benchmarks/single-producer/main.go1
-rw-r--r--tests/benchmarks/single-producer/q.go24
l---------tests/benchmarks/subscribe/main.go1
-rw-r--r--tests/benchmarks/subscribe/q.go24
l---------tests/benchmarks/unsubscribe/main.go1
-rw-r--r--tests/benchmarks/unsubscribe/q.go24
l---------tests/benchmarks/waiter/main.go1
-rw-r--r--tests/benchmarks/waiter/q.go24
l---------tests/functional/consume-one-produce-many/main.go1
-rw-r--r--tests/functional/consume-one-produce-many/q.go5
l---------tests/functional/consumer-with-deadletter/main.go1
-rw-r--r--tests/functional/consumer-with-deadletter/q.go97
l---------tests/functional/custom-prefix/main.go1
-rw-r--r--tests/functional/custom-prefix/q.go5
l---------tests/functional/distinct-consumers-separate-instances/main.go1
-rw-r--r--tests/functional/distinct-consumers-separate-instances/q.go5
l---------tests/functional/flow-id/main.go1
-rw-r--r--tests/functional/flow-id/q.go5
l---------tests/functional/idempotency/main.go1
-rw-r--r--tests/functional/idempotency/q.go5
l---------tests/functional/new-instance-takeover/main.go1
-rw-r--r--tests/functional/new-instance-takeover/q.go127
l---------tests/functional/wait-after-publish/main.go1
-rw-r--r--tests/functional/wait-after-publish/q.go64
l---------tests/functional/waiter/main.go1
-rw-r--r--tests/functional/waiter/q.go5
l---------tests/fuzz/api-check/main.go1
-rw-r--r--tests/fuzz/api-check/q.go35
l---------tests/fuzz/cli-check/main.go1
-rw-r--r--tests/fuzz/cli-check/q.go35
l---------tests/fuzz/equal-produced-consumed-order-check/main.go1
-rw-r--r--tests/fuzz/equal-produced-consumed-order-check/q.go35
l---------tests/fuzz/exactly-once-check/main.go1
-rw-r--r--tests/fuzz/exactly-once-check/q.go35
l---------tests/fuzz/queries-check/main.go1
-rw-r--r--tests/fuzz/queries-check/q.go35
l---------tests/fuzz/total-order-check/main.go1
-rw-r--r--tests/fuzz/total-order-check/q.go35
-rw-r--r--tests/liteq.go35
-rw-r--r--tests/q.go5776
-rw-r--r--tests/queries.sql333
62 files changed, 9781 insertions, 184 deletions
diff --git a/.gitignore b/.gitignore
index c096254..d506124 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,7 +1,16 @@
/src/version.go
/*.bin
-/*.db
+/*.db*
/src/*.a
/src/*.bin
/tests/*.a
/tests/*.bin
+/tests/functional/*/*.a
+/tests/functional/*/*.bin
+/tests/functional/*/*.go.db*
+/tests/fuzz/*/*.a
+/tests/fuzz/*/*.bin
+/tests/benchmarks/*/*.a
+/tests/benchmarks/*/*.bin
+/tests/benchmarks/*/*.txt
+/tests/fuzz/corpus/
diff --git a/Makefile b/Makefile
index 2145e48..3301537 100644
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,7 @@
.POSIX:
DATE = 1970-01-01
VERSION = 0.1.0
-NAME = liteq
+NAME = q
NAME_UC = $(NAME)
LANGUAGES = en
## Installation prefix. Defaults to "/usr".
@@ -17,7 +17,7 @@ MANDIR = $(SHAREDIR)/man
EXEC = ./
## Where to store the installation. Empty by default.
DESTDIR =
-LDLIBS = -lsqlite3
+LDLIBS = --static -lsqlite3 -lm
GOCFLAGS = -I $(GOLIBDIR)
GOLDFLAGS = -L $(GOLIBDIR)
@@ -26,17 +26,26 @@ GOLDFLAGS = -L $(GOLIBDIR)
.SUFFIXES:
.SUFFIXES: .go .a .bin .bin-check
+.go.a:
+ go tool compile $(GOCFLAGS) -I $(@D) -o $@ -p $(*F) \
+ `find $< $$(if [ $(*F) != main ]; then \
+ echo src/$(NAME).go src/version.go; fi) | uniq`
+
+.a.bin:
+ go tool link $(GOLDFLAGS) -L $(@D) -o $@ --extldflags '$(LDLIBS)' $<
+
all:
include deps.mk
-objects = \
- src/$(NAME).a \
- src/main.a \
- tests/$(NAME).a \
- tests/main.a \
+libs.a = $(libs.go:.go=.a)
+mains.a = $(mains.go:.go=.a)
+mains.bin = $(mains.go:.go=.bin)
+functional-tests/lib.a = $(functional-tests/lib.go:.go=.a)
+fuzz-targets/lib.a = $(fuzz-targets/lib.go:.go=.a)
+benchmarks/lib.a = $(benchmarks/lib.go:.go=.a)
sources = \
src/$(NAME).go \
@@ -46,13 +55,16 @@ sources = \
derived-assets = \
src/version.go \
- $(objects) \
- src/main.bin \
- tests/main.bin \
+ $(libs.a) \
+ $(mains.a) \
+ $(mains.bin) \
$(NAME).bin \
side-assets = \
- liteq.db \
+ $(NAME).db* \
+ tests/functional/*/*.go.db* \
+ tests/fuzz/corpus/ \
+ tests/benchmarks/*/main.txt \
@@ -61,40 +73,35 @@ side-assets = \
all: $(derived-assets)
-$(objects): Makefile
+$(libs.a): Makefile deps.mk
+$(libs.a): src/$(NAME).go src/version.go
-src/$(NAME).a: src/$(NAME).go src/version.go
- go tool compile $(GOCFLAGS) -o $@ -p $(*F) -I $(@D) $*.go src/version.go
-src/main.a: src/main.go src/$(NAME).a
-tests/main.a: tests/main.go tests/$(NAME).a
-src/main.a tests/main.a:
- go tool compile $(GOCFLAGS) -o $@ -p $(*F) -I $(@D) $*.go
+$(fuzz-targets/lib.a):
+ go tool compile $(GOCFLAGS) -o $@ -p $(NAME) -d=libfuzzer \
+ $*.go src/$(NAME).go src/version.go
-tests/$(NAME).a: tests/$(NAME).go src/$(NAME).go src/version.go
- go tool compile $(GOCFLAGS) -o $@ -p $(*F) $*.go src/$(*F).go src/version.go
-
-src/main.bin: src/main.a
-tests/main.bin: tests/main.a
-src/main.bin tests/main.bin:
- go tool link $(GOLDFLAGS) -o $@ -L $(@D) --extldflags '$(LDLIBS)' $*.a
+src/version.go: Makefile
+ echo 'package $(NAME); const Version = "$(VERSION)"' > $@
$(NAME).bin: src/main.bin
ln -fs $? $@
-src/version.go: Makefile
- echo 'package $(NAME); var version = "$(VERSION)"' > $@
+.PRECIOUS: tests/queries.sql
+tests/queries.sql: tests/main.bin ALWAYS
+ env TESTING_DUMP_SQL_QUERIES=1 $(EXEC)tests/main.bin | diff -U10 $@ -
tests.bin-check = \
- tests/main.bin-check \
+ tests/main.bin-check \
+ $(functional-tests/main.go:.go=.bin-check) \
-tests/main.bin-check: tests/main.bin
$(tests.bin-check):
$(EXEC)$*.bin
check-unit: $(tests.bin-check)
+check-unit: tests/queries.sql
integration-tests = \
@@ -107,6 +114,7 @@ $(integration-tests): ALWAYS
sh $@
check-integration: $(integration-tests)
+check-integration: fuzz
## Run all tests. Each test suite is isolated, so that a parallel
@@ -116,6 +124,27 @@ check: check-unit check-integration
+FUZZSEC=1
+fuzz-targets/main.bin-check = $(fuzz-targets/main.go:.go=.bin-check)
+$(fuzz-targets/main.bin-check):
+ $(EXEC)$*.bin --test.fuzztime=$(FUZZSEC)s \
+ --test.fuzz='.*' --test.fuzzcachedir=tests/fuzz/corpus
+
+fuzz: $(fuzz-targets/main.bin-check)
+
+
+
+benchmarks/main.bin-check = $(benchmarks/main.go:.go=.bin-check)
+$(benchmarks/main.bin-check):
+ rm -f $*.txt
+ printf '%s\n' '$(EXEC)$*.bin' >> $*.txt
+ LANG=POSIX.UTF-8 time -p $(EXEC)$*.bin 2>> $*.txt
+ printf '%s\n' '$*.txt'
+
+bench: $(benchmarks/main.bin-check)
+
+
+
## Remove *all* derived artifacts produced during the build.
## A dedicated test asserts that this is always true.
clean:
diff --git a/deps.mk b/deps.mk
index e69de29..3a3310f 100644
--- a/deps.mk
+++ b/deps.mk
@@ -0,0 +1,264 @@
+libs.go = \
+ src/q.go \
+ tests/benchmarks/deadletters/q.go \
+ tests/benchmarks/lookup/q.go \
+ tests/benchmarks/multiple-consumers/q.go \
+ tests/benchmarks/multiple-produces/q.go \
+ tests/benchmarks/reaper/q.go \
+ tests/benchmarks/replay/q.go \
+ tests/benchmarks/single-consumer/q.go \
+ tests/benchmarks/single-producer/q.go \
+ tests/benchmarks/subscribe/q.go \
+ tests/benchmarks/unsubscribe/q.go \
+ tests/benchmarks/waiter/q.go \
+ tests/functional/consume-one-produce-many/q.go \
+ tests/functional/consumer-with-deadletter/q.go \
+ tests/functional/custom-prefix/q.go \
+ tests/functional/distinct-consumers-separate-instances/q.go \
+ tests/functional/flow-id/q.go \
+ tests/functional/idempotency/q.go \
+ tests/functional/new-instance-takeover/q.go \
+ tests/functional/wait-after-publish/q.go \
+ tests/functional/waiter/q.go \
+ tests/fuzz/api-check/q.go \
+ tests/fuzz/cli-check/q.go \
+ tests/fuzz/equal-produced-consumed-order-check/q.go \
+ tests/fuzz/exactly-once-check/q.go \
+ tests/fuzz/queries-check/q.go \
+ tests/fuzz/total-order-check/q.go \
+ tests/q.go \
+
+mains.go = \
+ src/main.go \
+ tests/benchmarks/deadletters/main.go \
+ tests/benchmarks/lookup/main.go \
+ tests/benchmarks/multiple-consumers/main.go \
+ tests/benchmarks/multiple-produces/main.go \
+ tests/benchmarks/reaper/main.go \
+ tests/benchmarks/replay/main.go \
+ tests/benchmarks/single-consumer/main.go \
+ tests/benchmarks/single-producer/main.go \
+ tests/benchmarks/subscribe/main.go \
+ tests/benchmarks/unsubscribe/main.go \
+ tests/benchmarks/waiter/main.go \
+ tests/functional/consume-one-produce-many/main.go \
+ tests/functional/consumer-with-deadletter/main.go \
+ tests/functional/custom-prefix/main.go \
+ tests/functional/distinct-consumers-separate-instances/main.go \
+ tests/functional/flow-id/main.go \
+ tests/functional/idempotency/main.go \
+ tests/functional/new-instance-takeover/main.go \
+ tests/functional/wait-after-publish/main.go \
+ tests/functional/waiter/main.go \
+ tests/fuzz/api-check/main.go \
+ tests/fuzz/cli-check/main.go \
+ tests/fuzz/equal-produced-consumed-order-check/main.go \
+ tests/fuzz/exactly-once-check/main.go \
+ tests/fuzz/queries-check/main.go \
+ tests/fuzz/total-order-check/main.go \
+ tests/main.go \
+
+functional-tests/libs.go = \
+ tests/functional/consume-one-produce-many/q.go \
+ tests/functional/consumer-with-deadletter/q.go \
+ tests/functional/custom-prefix/q.go \
+ tests/functional/distinct-consumers-separate-instances/q.go \
+ tests/functional/flow-id/q.go \
+ tests/functional/idempotency/q.go \
+ tests/functional/new-instance-takeover/q.go \
+ tests/functional/wait-after-publish/q.go \
+ tests/functional/waiter/q.go \
+
+functional-tests/main.go = \
+ tests/functional/consume-one-produce-many/main.go \
+ tests/functional/consumer-with-deadletter/main.go \
+ tests/functional/custom-prefix/main.go \
+ tests/functional/distinct-consumers-separate-instances/main.go \
+ tests/functional/flow-id/main.go \
+ tests/functional/idempotency/main.go \
+ tests/functional/new-instance-takeover/main.go \
+ tests/functional/wait-after-publish/main.go \
+ tests/functional/waiter/main.go \
+
+fuzz-targets/lib.go = \
+ tests/fuzz/api-check/q.go \
+ tests/fuzz/cli-check/q.go \
+ tests/fuzz/equal-produced-consumed-order-check/q.go \
+ tests/fuzz/exactly-once-check/q.go \
+ tests/fuzz/queries-check/q.go \
+ tests/fuzz/total-order-check/q.go \
+
+fuzz-targets/main.go = \
+ tests/fuzz/api-check/main.go \
+ tests/fuzz/cli-check/main.go \
+ tests/fuzz/equal-produced-consumed-order-check/main.go \
+ tests/fuzz/exactly-once-check/main.go \
+ tests/fuzz/queries-check/main.go \
+ tests/fuzz/total-order-check/main.go \
+
+benchmarks/lib.go = \
+ tests/benchmarks/deadletters/q.go \
+ tests/benchmarks/lookup/q.go \
+ tests/benchmarks/multiple-consumers/q.go \
+ tests/benchmarks/multiple-produces/q.go \
+ tests/benchmarks/reaper/q.go \
+ tests/benchmarks/replay/q.go \
+ tests/benchmarks/single-consumer/q.go \
+ tests/benchmarks/single-producer/q.go \
+ tests/benchmarks/subscribe/q.go \
+ tests/benchmarks/unsubscribe/q.go \
+ tests/benchmarks/waiter/q.go \
+
+benchmarks/main.go = \
+ tests/benchmarks/deadletters/main.go \
+ tests/benchmarks/lookup/main.go \
+ tests/benchmarks/multiple-consumers/main.go \
+ tests/benchmarks/multiple-produces/main.go \
+ tests/benchmarks/reaper/main.go \
+ tests/benchmarks/replay/main.go \
+ tests/benchmarks/single-consumer/main.go \
+ tests/benchmarks/single-producer/main.go \
+ tests/benchmarks/subscribe/main.go \
+ tests/benchmarks/unsubscribe/main.go \
+ tests/benchmarks/waiter/main.go \
+
+src/main.a: src/main.go
+src/q.a: src/q.go
+tests/benchmarks/deadletters/main.a: tests/benchmarks/deadletters/main.go
+tests/benchmarks/deadletters/q.a: tests/benchmarks/deadletters/q.go
+tests/benchmarks/lookup/main.a: tests/benchmarks/lookup/main.go
+tests/benchmarks/lookup/q.a: tests/benchmarks/lookup/q.go
+tests/benchmarks/multiple-consumers/main.a: tests/benchmarks/multiple-consumers/main.go
+tests/benchmarks/multiple-consumers/q.a: tests/benchmarks/multiple-consumers/q.go
+tests/benchmarks/multiple-produces/main.a: tests/benchmarks/multiple-produces/main.go
+tests/benchmarks/multiple-produces/q.a: tests/benchmarks/multiple-produces/q.go
+tests/benchmarks/reaper/main.a: tests/benchmarks/reaper/main.go
+tests/benchmarks/reaper/q.a: tests/benchmarks/reaper/q.go
+tests/benchmarks/replay/main.a: tests/benchmarks/replay/main.go
+tests/benchmarks/replay/q.a: tests/benchmarks/replay/q.go
+tests/benchmarks/single-consumer/main.a: tests/benchmarks/single-consumer/main.go
+tests/benchmarks/single-consumer/q.a: tests/benchmarks/single-consumer/q.go
+tests/benchmarks/single-producer/main.a: tests/benchmarks/single-producer/main.go
+tests/benchmarks/single-producer/q.a: tests/benchmarks/single-producer/q.go
+tests/benchmarks/subscribe/main.a: tests/benchmarks/subscribe/main.go
+tests/benchmarks/subscribe/q.a: tests/benchmarks/subscribe/q.go
+tests/benchmarks/unsubscribe/main.a: tests/benchmarks/unsubscribe/main.go
+tests/benchmarks/unsubscribe/q.a: tests/benchmarks/unsubscribe/q.go
+tests/benchmarks/waiter/main.a: tests/benchmarks/waiter/main.go
+tests/benchmarks/waiter/q.a: tests/benchmarks/waiter/q.go
+tests/functional/consume-one-produce-many/main.a: tests/functional/consume-one-produce-many/main.go
+tests/functional/consume-one-produce-many/q.a: tests/functional/consume-one-produce-many/q.go
+tests/functional/consumer-with-deadletter/main.a: tests/functional/consumer-with-deadletter/main.go
+tests/functional/consumer-with-deadletter/q.a: tests/functional/consumer-with-deadletter/q.go
+tests/functional/custom-prefix/main.a: tests/functional/custom-prefix/main.go
+tests/functional/custom-prefix/q.a: tests/functional/custom-prefix/q.go
+tests/functional/distinct-consumers-separate-instances/main.a: tests/functional/distinct-consumers-separate-instances/main.go
+tests/functional/distinct-consumers-separate-instances/q.a: tests/functional/distinct-consumers-separate-instances/q.go
+tests/functional/flow-id/main.a: tests/functional/flow-id/main.go
+tests/functional/flow-id/q.a: tests/functional/flow-id/q.go
+tests/functional/idempotency/main.a: tests/functional/idempotency/main.go
+tests/functional/idempotency/q.a: tests/functional/idempotency/q.go
+tests/functional/new-instance-takeover/main.a: tests/functional/new-instance-takeover/main.go
+tests/functional/new-instance-takeover/q.a: tests/functional/new-instance-takeover/q.go
+tests/functional/wait-after-publish/main.a: tests/functional/wait-after-publish/main.go
+tests/functional/wait-after-publish/q.a: tests/functional/wait-after-publish/q.go
+tests/functional/waiter/main.a: tests/functional/waiter/main.go
+tests/functional/waiter/q.a: tests/functional/waiter/q.go
+tests/fuzz/api-check/main.a: tests/fuzz/api-check/main.go
+tests/fuzz/api-check/q.a: tests/fuzz/api-check/q.go
+tests/fuzz/cli-check/main.a: tests/fuzz/cli-check/main.go
+tests/fuzz/cli-check/q.a: tests/fuzz/cli-check/q.go
+tests/fuzz/equal-produced-consumed-order-check/main.a: tests/fuzz/equal-produced-consumed-order-check/main.go
+tests/fuzz/equal-produced-consumed-order-check/q.a: tests/fuzz/equal-produced-consumed-order-check/q.go
+tests/fuzz/exactly-once-check/main.a: tests/fuzz/exactly-once-check/main.go
+tests/fuzz/exactly-once-check/q.a: tests/fuzz/exactly-once-check/q.go
+tests/fuzz/queries-check/main.a: tests/fuzz/queries-check/main.go
+tests/fuzz/queries-check/q.a: tests/fuzz/queries-check/q.go
+tests/fuzz/total-order-check/main.a: tests/fuzz/total-order-check/main.go
+tests/fuzz/total-order-check/q.a: tests/fuzz/total-order-check/q.go
+tests/main.a: tests/main.go
+tests/q.a: tests/q.go
+src/main.bin: src/main.a
+tests/benchmarks/deadletters/main.bin: tests/benchmarks/deadletters/main.a
+tests/benchmarks/lookup/main.bin: tests/benchmarks/lookup/main.a
+tests/benchmarks/multiple-consumers/main.bin: tests/benchmarks/multiple-consumers/main.a
+tests/benchmarks/multiple-produces/main.bin: tests/benchmarks/multiple-produces/main.a
+tests/benchmarks/reaper/main.bin: tests/benchmarks/reaper/main.a
+tests/benchmarks/replay/main.bin: tests/benchmarks/replay/main.a
+tests/benchmarks/single-consumer/main.bin: tests/benchmarks/single-consumer/main.a
+tests/benchmarks/single-producer/main.bin: tests/benchmarks/single-producer/main.a
+tests/benchmarks/subscribe/main.bin: tests/benchmarks/subscribe/main.a
+tests/benchmarks/unsubscribe/main.bin: tests/benchmarks/unsubscribe/main.a
+tests/benchmarks/waiter/main.bin: tests/benchmarks/waiter/main.a
+tests/functional/consume-one-produce-many/main.bin: tests/functional/consume-one-produce-many/main.a
+tests/functional/consumer-with-deadletter/main.bin: tests/functional/consumer-with-deadletter/main.a
+tests/functional/custom-prefix/main.bin: tests/functional/custom-prefix/main.a
+tests/functional/distinct-consumers-separate-instances/main.bin: tests/functional/distinct-consumers-separate-instances/main.a
+tests/functional/flow-id/main.bin: tests/functional/flow-id/main.a
+tests/functional/idempotency/main.bin: tests/functional/idempotency/main.a
+tests/functional/new-instance-takeover/main.bin: tests/functional/new-instance-takeover/main.a
+tests/functional/wait-after-publish/main.bin: tests/functional/wait-after-publish/main.a
+tests/functional/waiter/main.bin: tests/functional/waiter/main.a
+tests/fuzz/api-check/main.bin: tests/fuzz/api-check/main.a
+tests/fuzz/cli-check/main.bin: tests/fuzz/cli-check/main.a
+tests/fuzz/equal-produced-consumed-order-check/main.bin: tests/fuzz/equal-produced-consumed-order-check/main.a
+tests/fuzz/exactly-once-check/main.bin: tests/fuzz/exactly-once-check/main.a
+tests/fuzz/queries-check/main.bin: tests/fuzz/queries-check/main.a
+tests/fuzz/total-order-check/main.bin: tests/fuzz/total-order-check/main.a
+tests/main.bin: tests/main.a
+src/main.bin-check: src/main.bin
+tests/benchmarks/deadletters/main.bin-check: tests/benchmarks/deadletters/main.bin
+tests/benchmarks/lookup/main.bin-check: tests/benchmarks/lookup/main.bin
+tests/benchmarks/multiple-consumers/main.bin-check: tests/benchmarks/multiple-consumers/main.bin
+tests/benchmarks/multiple-produces/main.bin-check: tests/benchmarks/multiple-produces/main.bin
+tests/benchmarks/reaper/main.bin-check: tests/benchmarks/reaper/main.bin
+tests/benchmarks/replay/main.bin-check: tests/benchmarks/replay/main.bin
+tests/benchmarks/single-consumer/main.bin-check: tests/benchmarks/single-consumer/main.bin
+tests/benchmarks/single-producer/main.bin-check: tests/benchmarks/single-producer/main.bin
+tests/benchmarks/subscribe/main.bin-check: tests/benchmarks/subscribe/main.bin
+tests/benchmarks/unsubscribe/main.bin-check: tests/benchmarks/unsubscribe/main.bin
+tests/benchmarks/waiter/main.bin-check: tests/benchmarks/waiter/main.bin
+tests/functional/consume-one-produce-many/main.bin-check: tests/functional/consume-one-produce-many/main.bin
+tests/functional/consumer-with-deadletter/main.bin-check: tests/functional/consumer-with-deadletter/main.bin
+tests/functional/custom-prefix/main.bin-check: tests/functional/custom-prefix/main.bin
+tests/functional/distinct-consumers-separate-instances/main.bin-check: tests/functional/distinct-consumers-separate-instances/main.bin
+tests/functional/flow-id/main.bin-check: tests/functional/flow-id/main.bin
+tests/functional/idempotency/main.bin-check: tests/functional/idempotency/main.bin
+tests/functional/new-instance-takeover/main.bin-check: tests/functional/new-instance-takeover/main.bin
+tests/functional/wait-after-publish/main.bin-check: tests/functional/wait-after-publish/main.bin
+tests/functional/waiter/main.bin-check: tests/functional/waiter/main.bin
+tests/fuzz/api-check/main.bin-check: tests/fuzz/api-check/main.bin
+tests/fuzz/cli-check/main.bin-check: tests/fuzz/cli-check/main.bin
+tests/fuzz/equal-produced-consumed-order-check/main.bin-check: tests/fuzz/equal-produced-consumed-order-check/main.bin
+tests/fuzz/exactly-once-check/main.bin-check: tests/fuzz/exactly-once-check/main.bin
+tests/fuzz/queries-check/main.bin-check: tests/fuzz/queries-check/main.bin
+tests/fuzz/total-order-check/main.bin-check: tests/fuzz/total-order-check/main.bin
+tests/main.bin-check: tests/main.bin
+src/main.a: src/$(NAME).a
+tests/benchmarks/deadletters/main.a: tests/benchmarks/deadletters/$(NAME).a
+tests/benchmarks/lookup/main.a: tests/benchmarks/lookup/$(NAME).a
+tests/benchmarks/multiple-consumers/main.a: tests/benchmarks/multiple-consumers/$(NAME).a
+tests/benchmarks/multiple-produces/main.a: tests/benchmarks/multiple-produces/$(NAME).a
+tests/benchmarks/reaper/main.a: tests/benchmarks/reaper/$(NAME).a
+tests/benchmarks/replay/main.a: tests/benchmarks/replay/$(NAME).a
+tests/benchmarks/single-consumer/main.a: tests/benchmarks/single-consumer/$(NAME).a
+tests/benchmarks/single-producer/main.a: tests/benchmarks/single-producer/$(NAME).a
+tests/benchmarks/subscribe/main.a: tests/benchmarks/subscribe/$(NAME).a
+tests/benchmarks/unsubscribe/main.a: tests/benchmarks/unsubscribe/$(NAME).a
+tests/benchmarks/waiter/main.a: tests/benchmarks/waiter/$(NAME).a
+tests/functional/consume-one-produce-many/main.a: tests/functional/consume-one-produce-many/$(NAME).a
+tests/functional/consumer-with-deadletter/main.a: tests/functional/consumer-with-deadletter/$(NAME).a
+tests/functional/custom-prefix/main.a: tests/functional/custom-prefix/$(NAME).a
+tests/functional/distinct-consumers-separate-instances/main.a: tests/functional/distinct-consumers-separate-instances/$(NAME).a
+tests/functional/flow-id/main.a: tests/functional/flow-id/$(NAME).a
+tests/functional/idempotency/main.a: tests/functional/idempotency/$(NAME).a
+tests/functional/new-instance-takeover/main.a: tests/functional/new-instance-takeover/$(NAME).a
+tests/functional/wait-after-publish/main.a: tests/functional/wait-after-publish/$(NAME).a
+tests/functional/waiter/main.a: tests/functional/waiter/$(NAME).a
+tests/fuzz/api-check/main.a: tests/fuzz/api-check/$(NAME).a
+tests/fuzz/cli-check/main.a: tests/fuzz/cli-check/$(NAME).a
+tests/fuzz/equal-produced-consumed-order-check/main.a: tests/fuzz/equal-produced-consumed-order-check/$(NAME).a
+tests/fuzz/exactly-once-check/main.a: tests/fuzz/exactly-once-check/$(NAME).a
+tests/fuzz/queries-check/main.a: tests/fuzz/queries-check/$(NAME).a
+tests/fuzz/total-order-check/main.a: tests/fuzz/total-order-check/$(NAME).a
+tests/main.a: tests/$(NAME).a
diff --git a/mkdeps.sh b/mkdeps.sh
index e5606ff..b1c61b3 100755
--- a/mkdeps.sh
+++ b/mkdeps.sh
@@ -2,3 +2,28 @@
set -eu
export LANG=POSIX.UTF-8
+
+
+libs() {
+ find src tests -name '*.go' | grep -v '/main\.go$' |
+ grep -v '/version\.go$'
+}
+
+mains() {
+ find src tests -name '*.go' | grep '/main\.go$'
+}
+
+libs | varlist 'libs.go'
+mains | varlist 'mains.go'
+
+find tests/functional/*/*.go -not -name main.go | varlist 'functional-tests/libs.go'
+find tests/functional/*/main.go | varlist 'functional-tests/main.go'
+find tests/fuzz/*/*.go -not -name main.go | varlist 'fuzz-targets/lib.go'
+find tests/fuzz/*/main.go | varlist 'fuzz-targets/main.go'
+find tests/benchmarks/*/*.go -not -name main.go | varlist 'benchmarks/lib.go'
+find tests/benchmarks/*/main.go | varlist 'benchmarks/main.go'
+
+{ libs; mains; } | sort | sed 's/^\(.*\)\.go$/\1.a:\t\1.go/'
+mains | sort | sed 's/^\(.*\)\.go$/\1.bin:\t\1.a/'
+mains | sort | sed 's/^\(.*\)\.go$/\1.bin-check:\t\1.bin/'
+mains | sort | sed 's|^\(.*\)/main\.go$|\1/main.a:\t\1/$(NAME).a|'
diff --git a/src/liteq.go b/src/liteq.go
deleted file mode 100644
index 2eeff34..0000000
--- a/src/liteq.go
+++ /dev/null
@@ -1,117 +0,0 @@
-package liteq
-
-import (
- "database/sql"
- "flag"
- "io/ioutil"
- "log/slog"
- "os"
- "sort"
-
- g "gobang"
- "golite"
-)
-
-
-
-func InitMigrations(db *sql.DB) {
- _, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS migrations (
- filename TEXT PRIMARY KEY
- );
- `)
- g.FatalIf(err)
-}
-
-const MIGRATIONS_DIR = "src/sql/migrations/"
-func PendingMigrations(db *sql.DB) []string {
- files, err := ioutil.ReadDir(MIGRATIONS_DIR)
- g.FatalIf(err)
-
- set := make(map[string]bool)
- for _, file := range files {
- set[file.Name()] = true
- }
-
- rows, err := db.Query(`SELECT filename FROM migrations;`)
- g.FatalIf(err)
- defer rows.Close()
-
- for rows.Next() {
- var filename string
- err := rows.Scan(&filename)
- g.FatalIf(err)
- delete(set, filename)
- }
- g.FatalIf(rows.Err())
-
- difference := make([]string, 0)
- for filename := range set {
- difference = append(difference, filename)
- }
-
- sort.Sort(sort.StringSlice(difference))
- return difference
-}
-
-func RunMigrations(db *sql.DB) {
- InitMigrations(db)
-
- stmt, err := db.Prepare(`INSERT INTO migrations (filename) VALUES (?);`)
- g.FatalIf(err)
- defer stmt.Close()
-
- for _, filename := range PendingMigrations(db) {
- g.Info("Running migration file", "exec-migration-file",
- "filename", filename,
- )
-
- tx, err := db.Begin()
- g.FatalIf(err)
-
- sql, err := os.ReadFile(MIGRATIONS_DIR + filename)
- g.FatalIf(err)
-
- _, err = tx.Exec(string(sql))
- g.FatalIf(err)
-
- _, err = tx.Stmt(stmt).Exec(filename)
- g.FatalIf(err)
-
- err = tx.Commit()
- g.FatalIf(err)
- }
-}
-
-func initDB(databasePath string) *sql.DB {
- db, err := sql.Open("sqlite3", databasePath)
- g.FatalIf(err)
- RunMigrations(db)
- return db
-}
-
-func run(db *sql.DB) {
-}
-
-
-
-var (
- databasePath = flag.String(
- "f",
- "q.db",
- "The path to the database file",
- )
-)
-
-
-func Main() {
- g.Init(slog.Group(
- "versions",
- "gobang", g.Version,
- "golite", golite.Version,
- "this", version,
- ))
- flag.Parse()
- db := initDB(*databasePath)
- run(db)
-}
diff --git a/src/main.go b/src/main.go
index 8d9a05e..51faffa 100644
--- a/src/main.go
+++ b/src/main.go
@@ -1,7 +1,7 @@
package main
-import "liteq"
+import "q"
func main() {
- liteq.Main()
+ q.Main()
}
diff --git a/src/q.go b/src/q.go
new file mode 100644
index 0000000..6eeefe6
--- /dev/null
+++ b/src/q.go
@@ -0,0 +1,2495 @@
+package q
+import (
+ "context"
+ "database/sql"
+ "flag"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "sync"
+ "time"
+
+ _ "acudego"
+ "guuid"
+ g "gobang"
+)
+
+
+
+const (
+ defaultPrefix = "q"
+ reaperSkipCount = 1000
+ notOwnerErrorFmt = "%v owns %#v as %#v, not us (%v)"
+ noLongerOwnerErrorFmt = "we (%v) no longer own %#v as %#v, but %v does"
+ rollbackErrorFmt = "rollback error: %w; while executing: %w"
+)
+
+
+
+type dbI interface{
+ findOne(q string, args []any, bindings []any) error
+ exec(q string)
+}
+
+type queryT struct{
+ write string
+ read string
+ owner string
+}
+
+type queriesT struct{
+ take func(string, string) error
+ publish func(UnsentMessage, guuid.UUID) (messageT, error)
+ find func(string, guuid.UUID) (messageT, error)
+ next func(string, string) (messageT, error)
+ pending func(string, string, func(messageT) error) error
+ commit func(string, guuid.UUID) error
+ toDead func(string, guuid.UUID, guuid.UUID) error
+ replay func(guuid.UUID, guuid.UUID) (messageT, error)
+ oneDead func(string, string) (deadletterT, error)
+ allDead func(string, string, func(deadletterT, messageT) error) error
+ size func(string) (int, error)
+ count func(string, string) (int, error)
+ hasData func(string, string) (bool, error)
+ close func() error
+}
+
+type messageT struct{
+ id int64
+ timestamp time.Time
+ uuid guuid.UUID
+ topic string
+ flowID guuid.UUID
+ payload []byte
+}
+
+type UnsentMessage struct{
+ Topic string
+ FlowID guuid.UUID
+ Payload []byte
+}
+
+type Message struct{
+ ID guuid.UUID
+ Timestamp time.Time
+ Topic string
+ FlowID guuid.UUID
+ Payload []byte
+}
+
+type deadletterT struct{
+ uuid guuid.UUID
+ timestamp time.Time
+ consumer string
+ messageID guuid.UUID
+}
+
+type pingerT[T any] struct{
+ tryPing func(T)
+ onPing func(func(T))
+ closed func() bool
+ close func()
+}
+
+type consumerDataT struct{
+ topic string
+ name string
+}
+
+type waiterDataT struct{
+ topic string
+ flowID guuid.UUID
+ name string
+
+}
+
+type consumerT struct{
+ data consumerDataT
+ callback func(Message) error
+ pinger pingerT[struct{}]
+ close *func()
+}
+
+type waiterT struct{
+ data waiterDataT
+ pinger pingerT[[]byte]
+ closed *func() bool
+ close *func()
+}
+
+type topicSubscriptionT struct{
+ consumers map[string]consumerT
+ waiters map[guuid.UUID]map[string]waiterT
+}
+
+type subscriptionsSetM map[string]topicSubscriptionT
+
+type subscriptionsT struct {
+ read func(func(subscriptionsSetM) error) error
+ write func(func(subscriptionsSetM) error) error
+}
+
+type queueT struct{
+ queries queriesT
+ subscriptions subscriptionsT
+ pinger pingerT[struct{}]
+}
+
+type argsT struct{
+ databasePath string
+ prefix string
+ command string
+ allArgs []string
+ args []string
+ topic string
+ consumer string
+}
+
+type commandT struct{
+ name string
+ getopt func(argsT, io.Writer) (argsT, bool)
+ exec func(argsT, queriesT, io.Reader, io.Writer) (int, error)
+}
+
+type IQueue interface{
+ Publish(UnsentMessage) (Message, error)
+ Subscribe( string, string, func(Message) error) error
+ Unsubscribe(string, string)
+ WaitFor(string, guuid.UUID, string) Waiter
+ Close() error
+}
+
+
+
+func closeNoop() error {
+ return nil
+}
+
+func tryRollback(db *sql.DB, ctx context.Context, err error) error {
+ _, rollbackErr := db.ExecContext(ctx, "ROLLBACK;")
+ if rollbackErr != nil {
+ return fmt.Errorf(
+ rollbackErrorFmt,
+ rollbackErr,
+ err,
+ )
+ }
+
+ return err
+}
+
+func inTx(db *sql.DB, fn func(context.Context) error) error {
+ ctx := context.Background()
+
+ _, err := db.ExecContext(ctx, "BEGIN IMMEDIATE;")
+ if err != nil {
+ return err
+ }
+
+ err = fn(ctx)
+ if err != nil {
+ return tryRollback(db, ctx, err)
+ }
+
+ _, err = db.ExecContext(ctx, "COMMIT;")
+ if err != nil {
+ return tryRollback(db, ctx, err)
+ }
+
+ return nil
+}
+
+func createTablesSQL(prefix string) queryT {
+ const tmpl_write = `
+ CREATE TABLE IF NOT EXISTS "%s_payloads" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ topic TEXT NOT NULL,
+ payload BLOB NOT NULL
+ ) STRICT;
+ CREATE INDEX IF NOT EXISTS "%s_payloads_topic"
+ ON "%s_payloads"(topic);
+
+ CREATE TABLE IF NOT EXISTS "%s_messages" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ uuid BLOB NOT NULL UNIQUE,
+ flow_id BLOB NOT NULL,
+ payload_id INTEGER NOT NULL
+ REFERENCES "%s_payloads"(id)
+ ) STRICT;
+ CREATE INDEX IF NOT EXISTS "%s_messages_flow_id"
+ ON "%s_messages"(flow_id);
+
+ CREATE TABLE IF NOT EXISTS "%s_offsets" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ consumer TEXT NOT NULL,
+ message_id INTEGER NOT NULL
+ REFERENCES "%s_messages"(id),
+ UNIQUE (consumer, message_id)
+ ) STRICT;
+ CREATE INDEX IF NOT EXISTS "%s_offsets_consumer"
+ ON "%s_offsets"(consumer);
+
+ CREATE TABLE IF NOT EXISTS "%s_deadletters" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ uuid BLOB NOT NULL UNIQUE,
+ consumer TEXT NOT NULL,
+ message_id INTEGER NOT NULL
+ REFERENCES "%s_messages"(id),
+ UNIQUE (consumer, message_id)
+ ) STRICT;
+ CREATE INDEX IF NOT EXISTS "%s_deadletters_consumer"
+ ON "%s_deadletters"(consumer);
+
+ CREATE TABLE IF NOT EXISTS "%s_replays" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ deadletter_id INTEGER NOT NULL UNIQUE
+ REFERENCES "%s_deadletters"(id) ,
+ message_id INTEGER NOT NULL UNIQUE
+ REFERENCES "%s_messages"(id)
+ ) STRICT;
+
+ CREATE TABLE IF NOT EXISTS "%s_owners" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ topic TEXT NOT NULL,
+ consumer TEXT NOT NULL,
+ owner_id INTEGER NOT NULL,
+ UNIQUE (topic, consumer)
+ ) STRICT;
+ `
+ return queryT{
+ write: fmt.Sprintf(
+ tmpl_write,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ prefix,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+func createTables(db *sql.DB, prefix string) error {
+ q := createTablesSQL(prefix)
+
+ return inTx(db, func(ctx context.Context) error {
+ _, err := db.ExecContext(ctx, q.write)
+ return err
+ })
+}
+
+func takeSQL(prefix string) queryT {
+ const tmpl_write = `
+ INSERT INTO "%s_owners" (topic, consumer, owner_id)
+ VALUES (?, ?, ?)
+ ON CONFLICT (topic, consumer) DO
+ UPDATE SET owner_id=excluded.owner_id;
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func takeStmt(
+ db *sql.DB,
+ prefix string,
+ instanceID int,
+) (func(string, string) error, func() error, error) {
+ q := takeSQL(prefix)
+
+ fn := func(topic string, consumer string) error {
+ return inTx(db, func(ctx context.Context) error {
+ _, err := db.ExecContext(
+ ctx,
+ q.write,
+ topic,
+ consumer,
+ instanceID,
+ )
+ return err
+ })
+ }
+
+ return fn, closeNoop, nil
+}
+
+func publishSQL(prefix string) queryT {
+ const tmpl_write = `
+ INSERT INTO "%s_payloads" (topic, payload)
+ VALUES (?, ?);
+
+ INSERT INTO "%s_messages" (uuid, flow_id, payload_id)
+ VALUES (?, ?, last_insert_rowid());
+ `
+ const tmpl_read = `
+ SELECT id, timestamp FROM "%s_messages"
+ WHERE uuid = ?;
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix, prefix),
+ read: fmt.Sprintf(tmpl_read, prefix),
+ }
+}
+
+func publishStmt(
+ db *sql.DB,
+ prefix string,
+ _ int,
+) (func(UnsentMessage, guuid.UUID) (messageT, error), func() error, error) {
+ q := publishSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(
+ unsentMessage UnsentMessage,
+ messageID guuid.UUID,
+ ) (messageT, error) {
+ message := messageT{
+ uuid: messageID,
+ topic: unsentMessage.Topic,
+ flowID: unsentMessage.FlowID,
+ payload: unsentMessage.Payload,
+ }
+
+ message_id_bytes := messageID[:]
+ flow_id_bytes := unsentMessage.FlowID[:]
+ _, err := db.Exec(
+ q.write,
+ unsentMessage.Topic,
+ unsentMessage.Payload,
+ message_id_bytes,
+ flow_id_bytes,
+ )
+ if err != nil {
+ return messageT{}, err
+ }
+
+ var timestr string
+ err = readStmt.QueryRow(message_id_bytes).Scan(
+ &message.id,
+ &timestr,
+ )
+ if err != nil {
+ return messageT{}, err
+ }
+
+ message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return messageT{}, err
+ }
+
+ return message, nil
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func findSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT
+ "%s_messages".id,
+ "%s_messages".timestamp,
+ "%s_messages".uuid,
+ "%s_payloads".payload
+ FROM "%s_messages"
+ JOIN "%s_payloads" ON
+ "%s_payloads".id = "%s_messages".payload_id
+ WHERE
+ "%s_payloads".topic = ? AND
+ "%s_messages".flow_id = ?
+ ORDER BY "%s_messages".id DESC
+ LIMIT 1;
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+func findStmt(
+ db *sql.DB,
+ prefix string,
+ _ int,
+) (func(string, guuid.UUID) (messageT, error), func() error, error) {
+ q := findSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(topic string, flowID guuid.UUID) (messageT, error) {
+ message := messageT{
+ topic: topic,
+ flowID: flowID,
+ }
+
+ var (
+ timestr string
+ message_id_bytes []byte
+ )
+ flow_id_bytes := flowID[:]
+ err = readStmt.QueryRow(topic, flow_id_bytes).Scan(
+ &message.id,
+ &timestr,
+ &message_id_bytes,
+ &message.payload,
+ )
+ if err != nil {
+ return messageT{}, err
+ }
+ message.uuid = guuid.UUID(message_id_bytes)
+
+ message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return messageT{}, err
+ }
+
+ return message, nil
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func nextSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT
+ "%s_messages".id,
+ "%s_messages".timestamp,
+ "%s_messages".uuid,
+ "%s_messages".flow_id,
+ "%s_payloads".payload
+ FROM "%s_messages"
+ JOIN "%s_payloads" ON
+ "%s_payloads".id = "%s_messages".payload_id
+ WHERE
+ "%s_payloads".topic = ? AND
+ "%s_messages".id NOT IN (
+ SELECT message_id FROM "%s_offsets"
+ WHERE consumer = ?
+ )
+ ORDER BY "%s_messages".id ASC
+ LIMIT 1;
+ `
+ const tmpl_owner = `
+ SELECT owner_id FROM "%s_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?;
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ owner: fmt.Sprintf(tmpl_owner, prefix),
+ }
+}
+
+func nextStmt(
+ db *sql.DB,
+ prefix string,
+ instanceID int,
+) (func(string, string) (messageT, error), func() error, error) {
+ q := nextSQL(prefix)
+
+ fn := func(topic string, consumer string) (messageT, error) {
+ message := messageT{
+ topic: topic,
+ }
+
+ var (
+ err error
+ ownerID int
+ timestr string
+ message_id_bytes []byte
+ flow_id_bytes []byte
+ )
+ tx, err := db.Begin()
+ if err != nil {
+ return messageT{}, err
+ }
+ defer tx.Rollback()
+
+ err = tx.QueryRow(q.owner, topic, consumer).Scan(&ownerID)
+ if err != nil {
+ return messageT{}, err
+ }
+
+ if ownerID != instanceID {
+ err := fmt.Errorf(
+ notOwnerErrorFmt,
+ ownerID,
+ topic,
+ consumer,
+ instanceID,
+ )
+ return messageT{}, err
+ }
+
+ err = tx.QueryRow(q.read, topic, consumer).Scan(
+ &message.id,
+ &timestr,
+ &message_id_bytes,
+ &flow_id_bytes,
+ &message.payload,
+ )
+ if err != nil {
+ return messageT{}, err
+ }
+ message.uuid = guuid.UUID(message_id_bytes)
+ message.flowID = guuid.UUID(flow_id_bytes)
+
+ message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return messageT{}, err
+ }
+
+ return message, nil
+ }
+
+ return fn, closeNoop, nil
+}
+
+func messageEach(rows *sql.Rows, callback func(messageT) error) error {
+ if rows == nil {
+ return nil
+ }
+
+ for rows.Next() {
+ var (
+ message messageT
+ timestr string
+ message_id_bytes []byte
+ flow_id_bytes []byte
+ )
+ err := rows.Scan(
+ &message.id,
+ &timestr,
+ &message_id_bytes,
+ &flow_id_bytes,
+ &message.topic,
+ &message.payload,
+ )
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+ message.uuid = guuid.UUID(message_id_bytes)
+ message.flowID = guuid.UUID(flow_id_bytes)
+
+ message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+
+ err = callback(message)
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+ }
+
+ return g.WrapErrors(rows.Err(), rows.Close())
+}
+
+func pendingSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT
+ "%s_messages".id,
+ "%s_messages".timestamp,
+ "%s_messages".uuid,
+ "%s_messages".flow_id,
+ "%s_payloads".topic,
+ "%s_payloads".payload
+ FROM "%s_messages"
+ JOIN "%s_payloads" ON
+ "%s_payloads".id = "%s_messages".payload_id
+ WHERE
+ "%s_payloads".topic = ? AND
+ "%s_messages".id NOT IN (
+ SELECT message_id FROM "%s_offsets"
+ WHERE consumer = ?
+ )
+ ORDER BY "%s_messages".id ASC;
+ `
+ const tmpl_owner = `
+ SELECT owner_id FROM "%s_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?;
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ owner: fmt.Sprintf(tmpl_owner, prefix),
+ }
+}
+
+func pendingStmt(
+ db *sql.DB,
+ prefix string,
+ instanceID int,
+) (func(string, string) (*sql.Rows, error), func() error, error) {
+ q := pendingSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ ownerStmt, err := db.Prepare(q.owner)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(topic string, consumer string) (*sql.Rows, error) {
+ var ownerID int
+ err := ownerStmt.QueryRow(topic, consumer).Scan(&ownerID)
+ if err != nil {
+ return nil, err
+ }
+
+ // best effort check, the final one is done during
+ // commit within a transaction
+ if ownerID != instanceID {
+ return nil, nil
+ }
+
+ return readStmt.Query(topic, consumer)
+ }
+
+ closeFn := func() error {
+ return g.SomeFnError(readStmt.Close, ownerStmt.Close)
+ }
+
+ return fn, closeFn, nil
+}
+
+func commitSQL(prefix string) queryT {
+ const tmpl_write = `
+ INSERT INTO "%s_offsets" (consumer, message_id)
+ VALUES (?, (SELECT id FROM "%s_messages" WHERE uuid = ?));
+ `
+ const tmpl_read = `
+ SELECT "%s_payloads".topic from "%s_payloads"
+ JOIN "%s_messages" ON
+ "%s_payloads".id = "%s_messages".payload_id
+ WHERE "%s_messages".uuid = ?;
+ `
+ const tmpl_owner = `
+ SELECT owner_id FROM "%s_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?;
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix, prefix),
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ owner: fmt.Sprintf(tmpl_owner, prefix),
+ }
+}
+
+func commitStmt(
+ db *sql.DB,
+ prefix string,
+ instanceID int,
+) (func(string, guuid.UUID) error, func() error, error) {
+ q := commitSQL(prefix)
+
+ fn := func(consumer string, messageID guuid.UUID) error {
+ message_id_bytes := messageID[:]
+ return inTx(db, func(ctx context.Context) error {
+ var topic string
+ err := db.QueryRowContext(
+ ctx,
+ q.read,
+ message_id_bytes,
+ ).Scan(&topic)
+ if err != nil {
+ return err
+ }
+
+ var ownerID int
+ err = db.QueryRowContext(
+ ctx,
+ q.owner,
+ topic,
+ consumer,
+ ).Scan(&ownerID)
+ if err != nil {
+ return err
+ }
+
+ if ownerID != instanceID {
+ return fmt.Errorf(
+ noLongerOwnerErrorFmt,
+ instanceID,
+ topic,
+ consumer,
+ ownerID,
+ )
+ }
+
+ _, err = db.ExecContext(ctx, q.write, consumer, message_id_bytes)
+ return err
+ })
+ }
+
+ return fn, closeNoop, nil
+}
+
+func toDeadSQL(prefix string) queryT {
+ const tmpl_write = `
+ INSERT INTO "%s_offsets" ( consumer, message_id)
+ VALUES ( ?, (SELECT id FROM "%s_messages" WHERE uuid = ?));
+
+ INSERT INTO "%s_deadletters" (uuid, consumer, message_id)
+ VALUES (?, ?, (SELECT id FROM "%s_messages" WHERE uuid = ?));
+ `
+ const tmpl_read = `
+ SELECT "%s_payloads".topic FROM "%s_payloads"
+ JOIN "%s_messages" ON
+ "%s_payloads".id = "%s_messages".payload_id
+ WHERE "%s_messages".uuid = ?;
+ `
+ const tmpl_owner = `
+ SELECT owner_id FROM "%s_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?;
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix, prefix, prefix, prefix),
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ owner: fmt.Sprintf(tmpl_owner, prefix),
+ }
+}
+
+func toDeadStmt(
+ db *sql.DB,
+ prefix string,
+ instanceID int,
+) (
+ func(string, guuid.UUID, guuid.UUID) error,
+ func() error,
+ error,
+) {
+ q := toDeadSQL(prefix)
+
+ fn := func(
+ consumer string,
+ messageID guuid.UUID,
+ deadletterID guuid.UUID,
+ ) error {
+ message_id_bytes := messageID[:]
+ deadletter_id_bytes := deadletterID[:]
+ return inTx(db, func(ctx context.Context) error {
+ var topic string
+ err := db.QueryRowContext(
+ ctx,
+ q.read,
+ message_id_bytes,
+ ).Scan(&topic)
+ if err != nil {
+ return err
+ }
+
+ var ownerID int
+ err = db.QueryRowContext(
+ ctx,
+ q.owner,
+ topic,
+ consumer,
+ ).Scan(&ownerID)
+ if err != nil {
+ return err
+ }
+
+ if ownerID != instanceID {
+ return fmt.Errorf(
+ noLongerOwnerErrorFmt,
+ instanceID,
+ topic,
+ consumer,
+ ownerID,
+ )
+ }
+
+ _, err = db.ExecContext(
+ ctx,
+ q.write,
+ consumer,
+ message_id_bytes,
+ deadletter_id_bytes,
+ consumer,
+ message_id_bytes,
+ )
+ return err
+ })
+ }
+
+ return fn, closeNoop, nil
+}
+
+func replaySQL(prefix string) queryT {
+ const tmpl_write = `
+ INSERT INTO "%s_messages" (uuid, flow_id, payload_id)
+ SELECT
+ ?,
+ "%s_messages".flow_id,
+ "%s_messages".payload_id
+ FROM "%s_messages"
+ JOIN "%s_deadletters" ON
+ "%s_messages".id = "%s_deadletters".message_id
+ WHERE "%s_deadletters".uuid = ?;
+
+ INSERT INTO "%s_replays" (deadletter_id, message_id)
+ VALUES (
+ (SELECT id FROM "%s_deadletters" WHERE uuid = ?),
+ last_insert_rowid()
+ );
+ `
+ const tmpl_read = `
+ SELECT
+ "%s_messages".id,
+ "%s_messages".timestamp,
+ "%s_messages".flow_id,
+ "%s_payloads".topic,
+ "%s_payloads".payload
+ FROM "%s_messages"
+ JOIN "%s_payloads" ON
+ "%s_payloads".id = "%s_messages".payload_id
+ WHERE "%s_messages".uuid = ?;
+ `
+ return queryT{
+ write: fmt.Sprintf(
+ tmpl_write,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+func replayStmt(
+ db *sql.DB,
+ prefix string,
+ _ int,
+) (func(guuid.UUID, guuid.UUID) (messageT, error), func() error, error) {
+ q := replaySQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(
+ deadletterID guuid.UUID,
+ messageID guuid.UUID,
+ ) (messageT, error) {
+ deadletter_id_bytes := deadletterID[:]
+ message_id_bytes := messageID[:]
+ err := inTx(db, func(ctx context.Context) error {
+ _, err := db.ExecContext(
+ ctx,
+ q.write,
+ message_id_bytes,
+ deadletter_id_bytes,
+ deadletter_id_bytes,
+ )
+ return err
+ })
+ if err != nil {
+ return messageT{}, err
+ }
+
+ message := messageT{
+ uuid: messageID,
+ }
+
+ var (
+ timestr string
+ flow_id_bytes []byte
+ )
+ err = readStmt.QueryRow(message_id_bytes).Scan(
+ &message.id,
+ &timestr,
+ &flow_id_bytes,
+ &message.topic,
+ &message.payload,
+ )
+ if err != nil {
+ return messageT{}, err
+ }
+ message.flowID = guuid.UUID(flow_id_bytes)
+
+ message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return messageT{}, err
+ }
+
+ return message, nil
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func oneDeadSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT
+ "%s_deadletters".uuid,
+ "%s_offsets".timestamp,
+ "%s_messages".uuid
+ FROM "%s_deadletters"
+ JOIN "%s_offsets" ON
+ "%s_deadletters".message_id = "%s_offsets".message_id
+ JOIN "%s_messages" ON
+ "%s_deadletters".message_id = "%s_messages".id
+ JOIN "%s_payloads" ON
+ "%s_messages".payload_id = "%s_payloads".id
+ WHERE
+ "%s_payloads".topic = ? AND
+ "%s_deadletters".consumer = ? AND
+ "%s_offsets".consumer = ? AND
+ "%s_deadletters".id NOT IN (
+ SELECT deadletter_id FROM "%s_replays"
+ )
+ ORDER BY "%s_deadletters".id ASC
+ LIMIT 1;
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+func oneDeadStmt(
+ db *sql.DB,
+ prefix string,
+ _ int,
+) (func(string, string) (deadletterT, error), func() error, error) {
+ q := oneDeadSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(topic string, consumer string) (deadletterT, error) {
+ deadletter := deadletterT{
+ consumer: consumer,
+ }
+
+ var (
+ deadletter_id_bytes []byte
+ timestr string
+ message_id_bytes []byte
+ )
+ err := readStmt.QueryRow(topic, consumer, consumer).Scan(
+ &deadletter_id_bytes,
+ &timestr,
+ &message_id_bytes,
+ )
+ if err != nil {
+ return deadletterT{}, err
+ }
+ deadletter.uuid = guuid.UUID(deadletter_id_bytes)
+ deadletter.messageID = guuid.UUID(message_id_bytes)
+
+ deadletter.timestamp, err = time.Parse(
+ time.RFC3339Nano,
+ timestr,
+ )
+ if err != nil {
+ return deadletterT{}, err
+ }
+
+ return deadletter, nil
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func deadletterEach(
+ rows *sql.Rows,
+ callback func(deadletterT, messageT) error,
+) error {
+ for rows.Next() {
+ var (
+ deadletter deadletterT
+ deadletter_id_bytes []byte
+ deadletterTimestr string
+ message messageT
+ messageTimestr string
+ message_id_bytes []byte
+ flow_id_bytes []byte
+ )
+ err := rows.Scan(
+ &deadletter_id_bytes,
+ &message.id,
+ &deadletterTimestr,
+ &deadletter.consumer,
+ &messageTimestr,
+ &message_id_bytes,
+ &flow_id_bytes,
+ &message.topic,
+ &message.payload,
+ )
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+
+ deadletter.uuid = guuid.UUID(deadletter_id_bytes)
+ deadletter.messageID = guuid.UUID(message_id_bytes)
+ message.uuid = guuid.UUID(message_id_bytes)
+ message.flowID = guuid.UUID(flow_id_bytes)
+
+ message.timestamp, err = time.Parse(
+ time.RFC3339Nano,
+ messageTimestr,
+ )
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+
+ deadletter.timestamp, err = time.Parse(
+ time.RFC3339Nano,
+ deadletterTimestr,
+ )
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+
+ err = callback(deadletter, message)
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+ }
+
+ return g.WrapErrors(rows.Err(), rows.Close())
+}
+
+func allDeadSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT
+ "%s_deadletters".uuid,
+ "%s_deadletters".message_id,
+ "%s_offsets".timestamp,
+ "%s_offsets".consumer,
+ "%s_messages".timestamp,
+ "%s_messages".uuid,
+ "%s_messages".flow_id,
+ "%s_payloads".topic,
+ "%s_payloads".payload
+ FROM "%s_deadletters"
+ JOIN "%s_offsets" ON
+ "%s_deadletters".message_id = "%s_offsets".message_id
+ JOIN "%s_messages" ON
+ "%s_deadletters".message_id = "%s_messages".id
+ JOIN "%s_payloads" ON
+ "%s_messages".payload_id = "%s_payloads".id
+ WHERE
+ "%s_payloads".topic = ? AND
+ "%s_deadletters".consumer = ? AND
+ "%s_offsets".consumer = ? AND
+ "%s_deadletters".id NOT IN (
+ SELECT deadletter_id FROM "%s_replays"
+ )
+ ORDER BY "%s_deadletters".id ASC;
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+func allDeadStmt(
+ db *sql.DB,
+ prefix string,
+ _ int,
+) (func(string, string) (*sql.Rows, error), func() error, error) {
+ q := allDeadSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(topic string, consumer string) (*sql.Rows, error) {
+ return readStmt.Query(topic, consumer, consumer)
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func sizeSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT
+ COUNT(1) as size
+ FROM "%s_messages"
+ JOIN "%s_payloads" ON
+ "%s_messages".payload_id = "%s_payloads".id
+ WHERE "%s_payloads".topic = ?;
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+
+func sizeStmt(
+ db *sql.DB,
+ prefix string,
+ _ int,
+) (func(string) (int, error), func() error, error) {
+ q := sizeSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(topic string) (int, error) {
+ var size int
+ err := readStmt.QueryRow(topic).Scan(&size)
+ if err != nil {
+ return -1, err
+ }
+
+ return size, nil
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func countSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT
+ COUNT(1) as count
+ FROM "%s_messages"
+ JOIN "%s_offsets" ON
+ "%s_messages".id = "%s_offsets".message_id
+ JOIN "%s_payloads" ON
+ "%s_messages".payload_id = "%s_payloads".id
+ WHERE
+ "%s_payloads".topic = ? AND
+ "%s_offsets".consumer = ?;
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+func countStmt(
+ db *sql.DB,
+ prefix string,
+ _ int,
+) (func(string, string) (int, error), func() error, error) {
+ q := countSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(topic string, consumer string) (int, error) {
+ var count int
+ err := readStmt.QueryRow(topic, consumer).Scan(&count)
+ if err != nil {
+ return -1, err
+ }
+
+ return count, nil
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func hasDataSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT 1 as data
+ FROM "%s_messages"
+ JOIN "%s_payloads" ON
+ "%s_payloads".id = "%s_messages".payload_id
+ WHERE
+ "%s_payloads".topic = ? AND
+ "%s_messages".id NOT IN (
+ SELECT message_id FROM "%s_offsets"
+ WHERE consumer = ?
+ )
+ LIMIT 1;
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+func hasDataStmt(
+ db *sql.DB,
+ prefix string,
+ _ int,
+) (func(string, string) (bool, error), func() error, error) {
+ q := hasDataSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(topic string, consumer string) (bool, error) {
+ var _x int
+ err := readStmt.QueryRow(topic, consumer).Scan(&_x)
+ if err == sql.ErrNoRows {
+ return false, nil
+ }
+
+ if err != nil {
+ return false, err
+ }
+
+ return true, nil
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func initDB(
+ db *sql.DB,
+ prefix string,
+ notifyFn func(messageT),
+ instanceID int,
+) (queriesT, error) {
+ createTablesErr := createTables(db, prefix)
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ find, findClose, findErr := findStmt(db, prefix, instanceID)
+ next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
+ pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID)
+ count, countClose, countErr := countStmt(db, prefix, instanceID)
+ hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID)
+
+ err := g.SomeError(
+ createTablesErr,
+ takeErr,
+ publishErr,
+ findErr,
+ nextErr,
+ pendingErr,
+ commitErr,
+ toDeadErr,
+ replayErr,
+ oneDeadErr,
+ allDeadErr,
+ sizeErr,
+ countErr,
+ hasDataErr,
+ )
+ if err != nil {
+ return queriesT{}, err
+ }
+
+ close := func() error {
+ return g.SomeFnError(
+ takeClose,
+ publishClose,
+ findClose,
+ nextClose,
+ pendingClose,
+ commitClose,
+ toDeadClose,
+ replayClose,
+ oneDeadClose,
+ allDeadClose,
+ sizeClose,
+ countClose,
+ hasDataClose,
+ )
+ }
+
+ var connMutex sync.RWMutex
+ return queriesT{
+ take: func(a string, b string) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return take(a, b)
+ },
+ publish: func(a UnsentMessage, b guuid.UUID) (messageT, error) {
+ var (
+ err error
+ message messageT
+ )
+ {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ message, err = publish(a, b)
+ }
+ if err != nil {
+ return messageT{}, err
+ }
+
+ go notifyFn(message)
+ return message, nil
+ },
+ find: func(a string, b guuid.UUID) (messageT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return find(a, b)
+ },
+ next: func(a string, b string) (messageT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return next(a, b)
+ },
+ pending: func(
+ a string,
+ b string,
+ callback func(messageT) error,
+ ) error {
+ var (
+ err error
+ rows *sql.Rows
+ )
+ {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ rows, err = pending(a, b)
+ }
+ if err != nil {
+ return err
+ }
+
+ return messageEach(rows, callback)
+ },
+ commit: func(a string, b guuid.UUID) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return commit(a, b)
+ },
+ toDead: func(
+ a string,
+ b guuid.UUID,
+ c guuid.UUID,
+ ) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return toDead(a, b, c)
+ },
+ replay: func(a guuid.UUID, b guuid.UUID) (messageT, error) {
+ var (
+ err error
+ message messageT
+ )
+ {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ message, err = replay(a, b)
+ }
+ if err != nil {
+ return messageT{}, err
+ }
+
+ go notifyFn(message)
+ return message, nil
+ },
+ oneDead: func(a string, b string) (deadletterT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return oneDead(a, b)
+ },
+ allDead: func(
+ a string,
+ b string,
+ callback func(deadletterT, messageT) error,
+ ) error {
+ var (
+ err error
+ rows *sql.Rows
+ )
+ {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ rows, err = allDead(a, b)
+ }
+ if err != nil {
+ return err
+ }
+
+ return deadletterEach(rows, callback)
+ },
+ size: func(a string) (int, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return size(a)
+ },
+ count: func(a string, b string) (int, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return count(a, b)
+ },
+ hasData: func(a string, b string) (bool, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return hasData(a, b)
+ },
+ close: func() error {
+ connMutex.Lock()
+ defer connMutex.Unlock()
+ return close()
+ },
+ }, nil
+}
+
+
+func newPinger[T any]() pingerT[T] {
+ channel := make(chan T, 1)
+ closed := false
+ var rwmutex sync.RWMutex
+ return pingerT[T]{
+ tryPing: func(x T) {
+ rwmutex.RLock()
+ defer rwmutex.RUnlock()
+ if closed {
+ return
+ }
+ select {
+ case channel <- x:
+ default:
+ }
+ },
+ onPing: func(cb func(T)) {
+ for x := range channel {
+ cb(x)
+ }
+ },
+ closed: func() bool {
+ rwmutex.RLock()
+ defer rwmutex.RUnlock()
+ return closed
+ },
+ close: func() {
+ rwmutex.Lock()
+ defer rwmutex.Unlock()
+ if closed {
+ return
+ }
+ close(channel)
+ closed = true
+ },
+ }
+}
+
+func makeSubscriptionsFuncs() subscriptionsT {
+ var rwmutex sync.RWMutex
+ subscriptions := subscriptionsSetM{}
+ return subscriptionsT{
+ read: func(callback func(subscriptionsSetM) error) error {
+ rwmutex.RLock()
+ defer rwmutex.RUnlock()
+ return callback(subscriptions)
+ },
+ write: func(callback func(subscriptionsSetM) error) error {
+ rwmutex.Lock()
+ defer rwmutex.Unlock()
+ return callback(subscriptions)
+ },
+ }
+}
+
+// Try notifying the consumer that they have data to work with. If they're
+// already full, we simply drop the notification, as on each they'll look for
+// all pending messages and process them all. So dropping the event here
+// doesn't mean not notifying the consumer, but simply acknoledging that the
+// existing notifications are enough for them to work with, without letting any
+// message slip through.
+func makeNotifyFn(
+ readFn func(func(subscriptionsSetM) error) error,
+ pinger pingerT[struct{}],
+) func(messageT) {
+ return func(message messageT) {
+ readFn(func(set subscriptionsSetM) error {
+ topicSub, ok := set[message.topic]
+ if !ok {
+ return nil
+ }
+
+ for _, consumer := range topicSub.consumers {
+ consumer.pinger.tryPing(struct{}{})
+ }
+ waiters := topicSub.waiters[message.flowID]
+ for _, waiter := range waiters {
+ waiter.pinger.tryPing(message.payload)
+ }
+ return nil
+ })
+ pinger.tryPing(struct{}{})
+ }
+}
+
+func collectClosedWaiters(
+ set subscriptionsSetM,
+) map[string]map[guuid.UUID][]string {
+ waiters := map[string]map[guuid.UUID][]string{}
+ for topic, topicSub := range set {
+ waiters[topic] = map[guuid.UUID][]string{}
+ for flowID, waitersByName := range topicSub.waiters {
+ names := []string{}
+ for name, waiter := range waitersByName {
+ if (*waiter.closed)() {
+ names = append(names, name)
+ }
+ }
+ waiters[topic][flowID] = names
+ }
+ }
+
+ return waiters
+}
+
+func trimEmptyLeaves(closedWaiters map[string]map[guuid.UUID][]string) {
+ for topic, waiters := range closedWaiters {
+ for flowID, names := range waiters {
+ if len(names) == 0 {
+ delete(closedWaiters[topic], flowID)
+ }
+ }
+ if len(waiters) == 0 {
+ delete(closedWaiters, topic)
+ }
+ }
+}
+
+func deleteIfEmpty(set subscriptionsSetM, topic string) {
+ topicSub, ok := set[topic]
+ if !ok {
+ return
+ }
+
+ emptyConsumers := len(topicSub.consumers) == 0
+ emptyWaiters := len(topicSub.waiters) == 0
+ if emptyConsumers && emptyWaiters {
+ delete(set, topic)
+ }
+}
+
+func deleteEmptyTopics(set subscriptionsSetM) {
+ for topic, _ := range set {
+ deleteIfEmpty(set, topic)
+ }
+}
+
+func removeClosedWaiters(
+ set subscriptionsSetM,
+ closedWaiters map[string]map[guuid.UUID][]string,
+) {
+ for topic, waiters := range closedWaiters {
+ _, ok := set[topic]
+ if !ok {
+ continue
+ }
+
+ for flowID, names := range waiters {
+ if set[topic].waiters[flowID] == nil {
+ continue
+ }
+ for _, name := range names {
+ delete(set[topic].waiters[flowID], name)
+ }
+ if len(set[topic].waiters[flowID]) == 0 {
+ delete(set[topic].waiters, flowID)
+ }
+ }
+ }
+
+ deleteEmptyTopics(set)
+}
+
+func reapClosedWaiters(
+ readFn func(func(subscriptionsSetM) error) error,
+ writeFn func(func(subscriptionsSetM) error) error,
+) {
+ var closedWaiters map[string]map[guuid.UUID][]string
+ readFn(func(set subscriptionsSetM) error {
+ closedWaiters = collectClosedWaiters(set)
+ return nil
+ })
+
+ trimEmptyLeaves(closedWaiters)
+ if len(closedWaiters) == 0 {
+ return
+ }
+
+ writeFn(func(set subscriptionsSetM) error {
+ removeClosedWaiters(set, closedWaiters)
+ return nil
+ })
+}
+
+func everyNthCall[T any](n int, fn func(T)) func(T) {
+ i := 0
+ return func(x T) {
+ i++
+ if i == n {
+ i = 0
+ fn(x)
+ }
+ }
+}
+
+func runReaper(
+ onPing func(func(struct{})),
+ readFn func(func(subscriptionsSetM) error) error,
+ writeFn func(func(subscriptionsSetM) error) error,
+) {
+ onPing(everyNthCall(reaperSkipCount, func(struct{}) {
+ reapClosedWaiters(readFn, writeFn)
+ }))
+}
+
+func NewWithPrefix(db *sql.DB, prefix string) (IQueue, error) {
+ err := g.ValidateSQLTablePrefix(prefix)
+ if err != nil {
+ return queueT{}, err
+ }
+
+ subscriptions := makeSubscriptionsFuncs()
+ pinger := newPinger[struct{}]()
+ notifyFn := makeNotifyFn(subscriptions.read, pinger)
+ queries, err := initDB(db, prefix, notifyFn, os.Getpid())
+ if err != nil {
+ return queueT{}, err
+ }
+
+ go runReaper(pinger.onPing, subscriptions.read, subscriptions.write)
+
+ return queueT{
+ queries: queries,
+ subscriptions: subscriptions,
+ pinger: pinger,
+ }, nil
+}
+
+func New(db *sql.DB) (IQueue, error) {
+ return NewWithPrefix(db, defaultPrefix)
+}
+
+func asPublicMessage(message messageT) Message {
+ return Message{
+ ID: message.uuid,
+ Timestamp: message.timestamp,
+ Topic: message.topic,
+ FlowID: message.flowID,
+ Payload: message.payload,
+ }
+}
+
+func (queue queueT) Publish(unsent UnsentMessage) (Message, error) {
+ message, err := queue.queries.publish(unsent, guuid.New())
+ return asPublicMessage(message), err
+}
+
+func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error {
+ topicSub := topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ }
+
+ return func(set subscriptionsSetM) error {
+ topic := consumer.data.topic
+ _, ok := set[topic]
+ if !ok {
+ set[topic] = topicSub
+ }
+ set[topic].consumers[consumer.data.name] = consumer
+
+ return nil
+ }
+}
+
+func registerWaiterFn(waiter waiterT) func(subscriptionsSetM) error {
+ topicSub := topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ }
+ waiters := map[string]waiterT{}
+
+ return func(set subscriptionsSetM) error {
+ var (
+ topic = waiter.data.topic
+ flowID = waiter.data.flowID
+ )
+ _, ok := set[topic]
+ if !ok {
+ set[topic] = topicSub
+ }
+ if set[topic].waiters[flowID] == nil {
+ set[topic].waiters[flowID] = waiters
+ }
+ set[topic].waiters[flowID][waiter.data.name] = waiter
+ return nil
+ }
+}
+
+func makeConsumeOneFn(
+ data consumerDataT,
+ callback func(Message) error,
+ successFn func(string, guuid.UUID) error,
+ errorFn func(string, guuid.UUID, guuid.UUID) error,
+) func(messageT) error {
+ return func(message messageT) error {
+ err := callback(asPublicMessage(message))
+ if err != nil {
+ g.Info(
+ "consumer failed", "q-consumer",
+ "topic", data.topic,
+ "consumer", data.name,
+ "error", err,
+ slog.Group(
+ "message",
+ "id", message.id,
+ "flow-id", message.flowID.String(),
+ ),
+ )
+
+ return errorFn(data.name, message.uuid, guuid.New())
+ }
+
+ return successFn(data.name, message.uuid)
+ }
+}
+
+func makeConsumeAllFn(
+ data consumerDataT,
+ consumeOneFn func(messageT) error,
+ eachFn func(string, string, func(messageT) error) error,
+) func(struct{}) {
+ return func(struct{}) {
+ err := eachFn(data.topic, data.name, consumeOneFn)
+ if err != nil {
+ g.Warning(
+ "eachFn failed", "q-consume-all",
+ "topic", data.topic,
+ "consumer", data.name,
+ "error", err,
+ "circuit-breaker-enabled?", false,
+ )
+ }
+ }
+}
+
+func makeWaitFn(channel chan []byte, closeFn func()) func([]byte) {
+ closed := false
+ var mutex sync.Mutex
+ return func(payload []byte) {
+ mutex.Lock()
+ defer mutex.Unlock()
+ if closed {
+ return
+ }
+
+ closeFn()
+ channel <- payload
+ close(channel)
+ closed = true
+ }
+}
+
+func runConsumer(onPing func(func(struct{})), consumeAllFn func(struct{})) {
+ consumeAllFn(struct{}{})
+ onPing(consumeAllFn)
+}
+
+func tryFinding(
+ findFn func(string, guuid.UUID) (messageT, error),
+ topic string,
+ flowID guuid.UUID,
+ waitFn func([]byte),
+) {
+ message, err := findFn(topic, flowID)
+ if err != nil {
+ return
+ }
+
+ waitFn(message.payload)
+}
+
+func (queue queueT) Subscribe(
+ topic string,
+ name string,
+ callback func(Message) error,
+) error {
+ data := consumerDataT{
+ topic: topic,
+ name: name,
+ }
+ pinger := newPinger[struct{}]()
+ consumer := consumerT{
+ data: data,
+ callback: callback,
+ pinger: pinger,
+ close: &pinger.close,
+ }
+ consumeOneFn := makeConsumeOneFn(
+ consumer.data,
+ consumer.callback,
+ queue.queries.commit,
+ queue.queries.toDead,
+ )
+ consumeAllFn := makeConsumeAllFn(
+ consumer.data,
+ consumeOneFn,
+ queue.queries.pending,
+ )
+
+ err := queue.queries.take(topic, name)
+ if err != nil {
+ return err
+ }
+
+ queue.subscriptions.write(registerConsumerFn(consumer))
+ go runConsumer(pinger.onPing, consumeAllFn)
+ return nil
+}
+
+type Waiter struct{
+ Channel <-chan []byte
+ Close func()
+}
+
+func (queue queueT) WaitFor(
+ topic string,
+ flowID guuid.UUID,
+ name string,
+) Waiter {
+ data := waiterDataT{
+ topic: topic,
+ flowID: flowID,
+ name: name,
+ }
+ pinger := newPinger[[]byte]()
+ waiter := waiterT{
+ data: data,
+ pinger: pinger,
+ closed: &pinger.closed,
+ close: &pinger.close,
+ }
+ channel := make(chan []byte, 1)
+ waitFn := makeWaitFn(channel, (*waiter.close))
+ closeFn := func() {
+ queue.subscriptions.read(func(set subscriptionsSetM) error {
+ (*set[topic].waiters[flowID][name].close)()
+ return nil
+ })
+ }
+
+ queue.subscriptions.write(registerWaiterFn(waiter))
+ tryFinding(queue.queries.find, topic, flowID, waitFn)
+ go pinger.onPing(waitFn)
+ return Waiter{channel, closeFn}
+}
+
+func unsubscribeIfExistsFn(
+ topic string,
+ name string,
+) func(subscriptionsSetM) error {
+ return func(set subscriptionsSetM) error {
+ topicSub, ok := set[topic]
+ if !ok {
+ return nil
+ }
+
+ consumer, ok := topicSub.consumers[name]
+ if !ok {
+ return nil
+ }
+
+ (*consumer.close)()
+ delete(set[topic].consumers, name)
+ deleteIfEmpty(set, topic)
+ return nil
+ }
+}
+
+func (queue queueT) Unsubscribe(topic string, name string) {
+ queue.subscriptions.write(unsubscribeIfExistsFn(topic, name))
+}
+
+func cleanSubscriptions(set subscriptionsSetM) error {
+ for _, topicSub := range set {
+ for _, consumer := range topicSub.consumers {
+ (*consumer.close)()
+ }
+ for _, waiters := range topicSub.waiters {
+ for _, waiter := range waiters {
+ (*waiter.close)()
+ }
+ }
+ }
+ return nil
+}
+
+func (queue queueT) Close() error {
+ queue.pinger.close()
+ return g.WrapErrors(
+ queue.subscriptions.write(cleanSubscriptions),
+ queue.queries.close(),
+ )
+}
+
+
+func topicGetopt(args argsT, w io.Writer) (argsT, bool) {
+ if len(args.args) == 0 {
+ fmt.Fprintf(w, "Missing TOPIC.\n")
+ return args, false
+ }
+
+ args.topic = args.args[0]
+ return args, true
+}
+
+func topicConsumerGetopt(args argsT, w io.Writer) (argsT, bool) {
+ fs := flag.NewFlagSet("", flag.ContinueOnError)
+ fs.Usage = func() {}
+ fs.SetOutput(w)
+
+ consumer := fs.String(
+ "C",
+ "default-consumer",
+ "The name of the consumer to be used",
+ )
+
+ if fs.Parse(args.args) != nil {
+ return args, false
+ }
+
+ subArgs := fs.Args()
+ if len(subArgs) == 0 {
+ fmt.Fprintf(w, "Missing TOPIC.\n")
+ return args, false
+ }
+
+ args.consumer = *consumer
+ args.topic = subArgs[0]
+ return args, true
+}
+
+func inExec(
+ args argsT,
+ queries queriesT,
+ r io.Reader,
+ w io.Writer,
+) (int, error) {
+ payload, err := io.ReadAll(r)
+ if err != nil {
+ return 1, err
+ }
+
+ unsent := UnsentMessage{
+ Topic: args.topic,
+ FlowID: guuid.New(),
+ Payload: payload,
+ }
+ message, err := queries.publish(unsent, guuid.New())
+ if err != nil {
+ return 1, err
+ }
+
+ fmt.Fprintf(w, "%s\n", message.uuid.String())
+
+ return 0, nil
+}
+
+func outExec(
+ args argsT,
+ queries queriesT,
+ _ io.Reader,
+ w io.Writer,
+) (int, error) {
+ err := queries.take(args.topic, args.consumer)
+ if err != nil {
+ return 1, err
+ }
+
+ message, err := queries.next(args.topic, args.consumer)
+
+ if err == sql.ErrNoRows {
+ return 3, nil
+ }
+
+ if err != nil {
+ return 1, err
+ }
+
+ fmt.Fprintln(w, string(message.payload))
+
+ return 0, nil
+}
+
+func commitExec(
+ args argsT,
+ queries queriesT,
+ r io.Reader,
+ w io.Writer,
+) (int, error) {
+ err := queries.take(args.topic, args.consumer)
+ if err != nil {
+ return 1, err
+ }
+
+ message, err := queries.next(args.topic, args.consumer)
+ if err != nil {
+ return 1, err
+ }
+
+ err = queries.commit(args.consumer, message.uuid)
+ if err != nil {
+ return 1, err
+ }
+
+ return 0, nil
+}
+
+func deadExec(
+ args argsT,
+ queries queriesT,
+ r io.Reader,
+ w io.Writer,
+) (int, error) {
+ err := queries.take(args.topic, args.consumer)
+ if err != nil {
+ return 1, err
+ }
+
+ message, err := queries.next(args.topic, args.consumer)
+ if err != nil {
+ return 1, err
+ }
+
+ err = queries.toDead(args.consumer, message.uuid, guuid.New())
+ if err != nil {
+ return 1, err
+ }
+
+ return 0, nil
+}
+
+func listDeadExec(
+ args argsT,
+ queries queriesT,
+ r io.Reader,
+ w io.Writer,
+) (int, error) {
+ eachFn := func(deadletter deadletterT, _ messageT) error {
+ fmt.Fprintf(
+ w,
+ "%s\t%s\t%s\n",
+ deadletter.uuid.String(),
+ deadletter.timestamp.Format(time.RFC3339),
+ deadletter.consumer,
+ )
+ return nil
+ }
+
+ err := queries.allDead(args.topic, args.consumer, eachFn)
+ if err != nil {
+ return 1, err
+ }
+
+ return 0, nil
+}
+
+func replayExec(
+ args argsT,
+ queries queriesT,
+ r io.Reader,
+ w io.Writer,
+) (int, error) {
+ deadletter, err := queries.oneDead(args.topic, args.consumer)
+ if err != nil {
+ return 1, err
+ }
+
+ _, err = queries.replay(deadletter.uuid, guuid.New())
+ if err != nil {
+ return 1, err
+ }
+
+ return 0, nil
+}
+
+func sizeExec(
+ args argsT,
+ queries queriesT,
+ r io.Reader,
+ w io.Writer,
+) (int, error) {
+ size, err := queries.size(args.topic)
+ if err != nil {
+ return 1, err
+ }
+
+ fmt.Fprintln(w, size)
+
+ return 0, nil
+}
+
+func countExec(
+ args argsT,
+ queries queriesT,
+ r io.Reader,
+ w io.Writer,
+) (int, error) {
+ count, err := queries.count(args.topic, args.consumer)
+ if err != nil {
+ return 1, err
+ }
+
+ fmt.Fprintln(w, count)
+
+ return 0, nil
+}
+
+func hasDataExec(
+ args argsT,
+ queries queriesT,
+ r io.Reader,
+ w io.Writer,
+) (int, error) {
+ hasData, err := queries.hasData(args.topic, args.consumer)
+ if err != nil {
+ return 1, err
+ }
+
+ if hasData {
+ return 0, nil
+ } else {
+ return 1, nil
+ }
+}
+
+func usage(argv0 string, w io.Writer) {
+ fmt.Fprintf(
+ w,
+ "Usage: %s [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n",
+ argv0,
+ )
+}
+
+func getopt(
+ allArgs []string,
+ commandsMap map[string]commandT,
+ w io.Writer,
+) (argsT, commandT, int) {
+ argv0 := allArgs[0]
+ argv := allArgs[1:]
+ fs := flag.NewFlagSet("", flag.ContinueOnError)
+ fs.Usage = func() {}
+ fs.SetOutput(w)
+ databasePath := fs.String(
+ "f",
+ "q.db",
+ "The path to the file where the queue is kept",
+ )
+ prefix := fs.String(
+ "p",
+ defaultPrefix,
+ "The q prefix of the table names",
+ )
+ if fs.Parse(argv) != nil {
+ usage(argv0, w)
+ return argsT{}, commandT{}, 2
+ }
+
+ subArgs := fs.Args()
+ if len(subArgs) == 0 {
+ fmt.Fprintf(w, "Missing COMMAND.\n")
+ usage(argv0, w)
+ return argsT{}, commandT{}, 2
+ }
+
+ args := argsT{
+ databasePath: *databasePath,
+ prefix: *prefix,
+ command: subArgs[0],
+ allArgs: allArgs,
+ args: subArgs[1:],
+ }
+
+ command := commandsMap[args.command]
+ if command.name == "" {
+ fmt.Fprintf(w, "Bad COMMAND: \"%s\".\n", args.command)
+ usage(allArgs[0], w)
+ return argsT{}, commandT{}, 2
+ }
+
+ args, ok := command.getopt(args, w)
+ if !ok {
+ usage(allArgs[0], w)
+ return argsT{}, commandT{}, 2
+ }
+
+ return args, command, 0
+}
+
+func runCommand(
+ args argsT,
+ command commandT,
+ stdin io.Reader,
+ stdout io.Writer,
+ stderr io.Writer,
+) int {
+ db, err := sql.Open("acude", args.databasePath)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ return 1
+ }
+ defer db.Close()
+
+ iqueue, err := NewWithPrefix(db, args.prefix)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ return 1
+ }
+ defer iqueue.Close()
+
+ rc, err := command.exec(args, iqueue.(queueT).queries, stdin, stdout)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ }
+
+ return rc
+}
+
+var commands = map[string]commandT {
+ "in": commandT{
+ name: "in",
+ getopt: topicGetopt,
+ exec: inExec,
+ },
+ "out": commandT{
+ name: "out",
+ getopt: topicConsumerGetopt,
+ exec: outExec,
+ },
+ "commit": commandT{
+ name: "commit",
+ getopt: topicConsumerGetopt,
+ exec: commitExec,
+ },
+ "dead": commandT{
+ name: "dead",
+ getopt: topicConsumerGetopt,
+ exec: deadExec,
+ },
+ "ls-dead": commandT{
+ name: "ls-dead",
+ getopt: topicConsumerGetopt,
+ exec: listDeadExec,
+ },
+ "replay": commandT{
+ name: "replay",
+ getopt: topicConsumerGetopt,
+ exec: replayExec,
+ },
+ "size": commandT{
+ name: "size",
+ getopt: topicGetopt,
+ exec: sizeExec,
+ },
+ "count": commandT{
+ name: "count",
+ getopt: topicConsumerGetopt,
+ exec: countExec,
+ },
+ "has-data": commandT{
+ name: "has-data",
+ getopt: topicConsumerGetopt,
+ exec: hasDataExec,
+ },
+}
+
+
+
+func Main() {
+ g.Init()
+ args, command, rc := getopt(os.Args, commands, os.Stderr)
+ if rc != 0 {
+ os.Exit(2)
+ }
+ os.Exit(runCommand(args, command, os.Stdin, os.Stdout, os.Stderr))
+}
diff --git a/tests/benchmarks/deadletters/main.go b/tests/benchmarks/deadletters/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/deadletters/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/deadletters/q.go b/tests/benchmarks/deadletters/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/deadletters/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/lookup/main.go b/tests/benchmarks/lookup/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/lookup/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/lookup/q.go b/tests/benchmarks/lookup/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/lookup/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/multiple-consumers/main.go b/tests/benchmarks/multiple-consumers/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/multiple-consumers/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/multiple-consumers/q.go b/tests/benchmarks/multiple-consumers/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/multiple-consumers/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/multiple-produces/main.go b/tests/benchmarks/multiple-produces/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/multiple-produces/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/multiple-produces/q.go b/tests/benchmarks/multiple-produces/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/multiple-produces/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/reaper/main.go b/tests/benchmarks/reaper/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/reaper/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/reaper/q.go b/tests/benchmarks/reaper/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/reaper/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/replay/main.go b/tests/benchmarks/replay/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/replay/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/replay/q.go b/tests/benchmarks/replay/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/replay/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/single-consumer/main.go b/tests/benchmarks/single-consumer/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/single-consumer/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/single-consumer/q.go b/tests/benchmarks/single-consumer/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/single-consumer/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/single-producer/main.go b/tests/benchmarks/single-producer/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/single-producer/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/single-producer/q.go b/tests/benchmarks/single-producer/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/single-producer/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/subscribe/main.go b/tests/benchmarks/subscribe/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/subscribe/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/subscribe/q.go b/tests/benchmarks/subscribe/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/subscribe/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/unsubscribe/main.go b/tests/benchmarks/unsubscribe/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/unsubscribe/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/unsubscribe/q.go b/tests/benchmarks/unsubscribe/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/unsubscribe/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/benchmarks/waiter/main.go b/tests/benchmarks/waiter/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/benchmarks/waiter/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/waiter/q.go b/tests/benchmarks/waiter/q.go
new file mode 100644
index 0000000..0c203bb
--- /dev/null
+++ b/tests/benchmarks/waiter/q.go
@@ -0,0 +1,24 @@
+package q
+
+import (
+ "flag"
+ "time"
+)
+
+
+
+var nFlag = flag.Int(
+ "n",
+ 1_000,
+ "The number of iterations to execute",
+)
+
+func MainTest() {
+ // FIXME
+ flag.Parse()
+ n := *nFlag
+
+ for i := 0; i < n; i++ {
+ time.Sleep(time.Millisecond * 1)
+ }
+}
diff --git a/tests/functional/consume-one-produce-many/main.go b/tests/functional/consume-one-produce-many/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/consume-one-produce-many/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/consume-one-produce-many/q.go b/tests/functional/consume-one-produce-many/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/consume-one-produce-many/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/consumer-with-deadletter/main.go b/tests/functional/consumer-with-deadletter/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/consumer-with-deadletter/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go
new file mode 100644
index 0000000..e1462d7
--- /dev/null
+++ b/tests/functional/consumer-with-deadletter/q.go
@@ -0,0 +1,97 @@
+package q
+
+import (
+ "database/sql"
+ "errors"
+ "os"
+ "runtime"
+
+ _ "acudego"
+ g "gobang"
+ "guuid"
+)
+
+
+
+const (
+ topicX = "new-event-x"
+ topicY = "new-event-y"
+)
+
+var forbidden3Err = errors.New("we don't like 3")
+
+
+
+func processNewEventXToY(message Message) (UnsentMessage, error) {
+ payload := string(message.Payload)
+ if payload == "event 3" {
+ return UnsentMessage{}, forbidden3Err
+ }
+
+ newPayload := []byte("processed " + payload)
+ unsent := UnsentMessage{
+ Topic: topicY,
+ FlowID: message.FlowID,
+ Payload: newPayload,
+ }
+ return unsent, nil
+}
+
+
+
+func MainTest() {
+ g.SetLevel(g.LevelNone)
+
+ _, file, _, ok := runtime.Caller(0)
+ g.TAssertEqualS(ok, true, "can't get filename")
+
+ databasePath := file + ".db"
+ os.Remove(databasePath)
+ os.Remove(databasePath + "-shm")
+ os.Remove(databasePath + "-wal")
+
+ db, err := sql.Open("acude", databasePath)
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ defer queue.Close()
+
+
+ pub := func(payload []byte, flowID guuid.UUID) {
+ unsent := UnsentMessage{
+ Topic: topicX,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ _, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+ }
+
+
+ g.Testing("we can WaitFor() a message after a deadletter", func() {
+ flowID := guuid.New()
+
+ handlerFn := func(message Message) error {
+ messageY, err := processNewEventXToY(message)
+ if err != nil {
+ return err
+ }
+
+ _, err = queue.Publish(messageY)
+ return err
+ }
+ queue.Subscribe(topicX, "main-worker", handlerFn)
+ defer queue.Unsubscribe(topicX, "main-worker")
+
+ pub([]byte("event 1"), guuid.New())
+ pub([]byte("event 2"), guuid.New())
+ pub([]byte("event 3"), guuid.New())
+ pub([]byte("event 4"), guuid.New())
+ pub([]byte("event 5"), flowID)
+
+ given := <- queue.WaitFor(topicY, flowID, "waiter").Channel
+ g.TAssertEqual(given, []byte("processed event 5"))
+ })
+}
diff --git a/tests/functional/custom-prefix/main.go b/tests/functional/custom-prefix/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/custom-prefix/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/custom-prefix/q.go b/tests/functional/custom-prefix/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/custom-prefix/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/distinct-consumers-separate-instances/main.go b/tests/functional/distinct-consumers-separate-instances/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/distinct-consumers-separate-instances/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/distinct-consumers-separate-instances/q.go b/tests/functional/distinct-consumers-separate-instances/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/distinct-consumers-separate-instances/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/flow-id/main.go b/tests/functional/flow-id/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/flow-id/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/flow-id/q.go b/tests/functional/flow-id/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/flow-id/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/idempotency/main.go b/tests/functional/idempotency/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/idempotency/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/idempotency/q.go b/tests/functional/idempotency/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/idempotency/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/new-instance-takeover/main.go b/tests/functional/new-instance-takeover/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/new-instance-takeover/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go
new file mode 100644
index 0000000..a678415
--- /dev/null
+++ b/tests/functional/new-instance-takeover/q.go
@@ -0,0 +1,127 @@
+package q
+
+import (
+"fmt"
+ "database/sql"
+ "runtime"
+ "os"
+
+ g "gobang"
+ "guuid"
+)
+
+
+
+const topic = "topic"
+
+
+
+func pub(queue IQueue, topic string, flowID guuid.UUID) {
+ unsent := UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: []byte{},
+ }
+ _, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+}
+
+func handlerFn(publish func(guuid.UUID)) func(Message) error {
+ return func(message Message) error {
+ publish(message.FlowID)
+ return nil
+ }
+}
+
+func startInstance(
+ databasePath string,
+ instanceID int,
+ name string,
+) (*sql.DB, IQueue, error) {
+ db, err := sql.Open("acude", databasePath)
+ g.TErrorIf(err)
+
+ iqueue, err := New(db)
+ g.TErrorIf(err)
+ queue := iqueue.(queueT)
+
+ notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
+ queries, err := initDB(db, defaultPrefix, notifyFn, instanceID)
+ g.TErrorIf(err)
+
+ err = queue.queries.close()
+ g.TErrorIf(err)
+
+ queue.queries = queries
+
+ pub_ := func(topic string) func(guuid.UUID) {
+ return func(flowID guuid.UUID) {
+ pub(queue, topic, flowID)
+ }
+ }
+
+ individual := "individual-" + name
+ shared := "shared"
+
+ queue.Subscribe(topic, individual, handlerFn(pub_(individual)))
+ queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name)))
+
+ return db, queue, nil
+}
+
+
+
+func MainTest() {
+ // https://sqlite.org/forum/forumpost/2507664507
+ g.Init()
+
+ _, file, _, ok := runtime.Caller(0)
+ g.TAssertEqualS(ok, true, "can't get filename")
+
+ dbpath := file + ".db"
+ dbpath = "/mnt/dois/andreh/t.db"
+ os.Remove(dbpath)
+ os.Remove(dbpath + "-shm")
+ os.Remove(dbpath + "-wal")
+
+ instanceID1 := os.Getpid()
+ instanceID2 := instanceID1 + 1
+
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+
+ g.Testing("new instances take ownership of topic+name combo", func() {
+ if false {
+ fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1)
+ }
+
+ db, q1, err := startInstance(dbpath, instanceID1, "first")
+ g.TErrorIf(err)
+ defer db.Close()
+ defer q1.Close()
+
+ pub(q1, topic, guuid.New())
+ pub(q1, topic, guuid.New())
+ pub(q1, topic, flowID1)
+
+ <- q1.WaitFor("individual-first", flowID1, "w").Channel
+ <- q1.WaitFor( "shared-first", flowID1, "w").Channel
+ // println("waited 1")
+
+ db, q2, err := startInstance(dbpath, instanceID2, "second")
+ g.TErrorIf(err)
+ defer db.Close()
+ defer q2.Close()
+
+ <- q2.WaitFor("individual-second", flowID1, "w").Channel
+
+ pub(q2, topic, guuid.New())
+ pub(q2, topic, guuid.New())
+ pub(q2, topic, flowID2)
+
+ // FIXME: notify multiple instances so we can add this:
+ // <- q2.WaitFor("individual-first", flowID2, "w").Channel
+ <- q2.WaitFor("individual-second", flowID2, "w").Channel
+ <- q2.WaitFor( "shared-second", flowID2, "w").Channel
+ })
+}
diff --git a/tests/functional/wait-after-publish/main.go b/tests/functional/wait-after-publish/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/wait-after-publish/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go
new file mode 100644
index 0000000..701258a
--- /dev/null
+++ b/tests/functional/wait-after-publish/q.go
@@ -0,0 +1,64 @@
+package q
+
+import (
+ "database/sql"
+ "os"
+ "runtime"
+
+ g "gobang"
+ "guuid"
+)
+
+
+
+const topic = "topic"
+
+
+
+func MainTest() {
+ _, file, _, ok := runtime.Caller(0)
+ g.TAssertEqualS(ok, true, "can't get filename")
+
+ databasePath := file + ".db"
+ os.Remove(databasePath)
+ os.Remove(databasePath + "-shm")
+ os.Remove(databasePath + "-wal")
+
+ db, err := sql.Open("acude", databasePath)
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ defer queue.Close()
+
+ pub := func(flowID guuid.UUID, payload []byte) {
+ unsent := UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ _, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+ }
+
+
+ g.Testing("we can WaitFor() a message before its publishing", func() {
+ flowID := guuid.New()
+ waiter := queue.WaitFor(topic, flowID, "waiter").Channel
+
+ pub(flowID, []byte("payload before"))
+
+ given := <- waiter
+ g.TAssertEqual(given, []byte("payload before"))
+ })
+
+ g.Testing("we can also do it after its publishing", func() {
+ flowID := guuid.New()
+
+ pub(flowID, []byte("payload after"))
+
+ given := <- queue.WaitFor(topic, flowID, "waiter").Channel
+ g.TAssertEqual(given, []byte("payload after"))
+ })
+}
diff --git a/tests/functional/waiter/main.go b/tests/functional/waiter/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/waiter/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/waiter/q.go b/tests/functional/waiter/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/waiter/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/fuzz/api-check/main.go b/tests/fuzz/api-check/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/fuzz/api-check/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/fuzz/api-check/q.go b/tests/fuzz/api-check/q.go
new file mode 100644
index 0000000..f8b2ab4
--- /dev/null
+++ b/tests/fuzz/api-check/q.go
@@ -0,0 +1,35 @@
+package q
+
+import (
+ "os"
+ "testing"
+ "testing/internal/testdeps"
+)
+
+
+
+func api(f *testing.F) {
+ f.Fuzz(func(t *testing.T, n int) {
+ // FIXME
+ if n > 1 {
+ if n < 2 {
+ t.Errorf("Failed n: %v\n", n)
+ }
+ }
+ })
+}
+
+
+
+func MainTest() {
+ fuzzTargets := []testing.InternalFuzzTarget{
+ { "api", api },
+ }
+
+ deps := testdeps.TestDeps{}
+ tests := []testing.InternalTest {}
+ benchmarks := []testing.InternalBenchmark{}
+ examples := []testing.InternalExample {}
+ m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
+ os.Exit(m.Run())
+}
diff --git a/tests/fuzz/cli-check/main.go b/tests/fuzz/cli-check/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/fuzz/cli-check/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/fuzz/cli-check/q.go b/tests/fuzz/cli-check/q.go
new file mode 100644
index 0000000..5c872e5
--- /dev/null
+++ b/tests/fuzz/cli-check/q.go
@@ -0,0 +1,35 @@
+package q
+
+import (
+ "os"
+ "testing"
+ "testing/internal/testdeps"
+)
+
+
+
+func queries(f *testing.F) {
+ f.Fuzz(func(t *testing.T, n int) {
+ if n > 154 {
+ if n < 155 {
+ t.Errorf("Failed n: %v\n", n)
+ }
+ }
+ })
+}
+
+
+
+func MainTest() {
+ // FIXME
+ fuzzTargets := []testing.InternalFuzzTarget{
+ { "queries", queries },
+ }
+
+ deps := testdeps.TestDeps{}
+ tests := []testing.InternalTest {}
+ benchmarks := []testing.InternalBenchmark{}
+ examples := []testing.InternalExample {}
+ m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
+ os.Exit(m.Run())
+}
diff --git a/tests/fuzz/equal-produced-consumed-order-check/main.go b/tests/fuzz/equal-produced-consumed-order-check/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/fuzz/equal-produced-consumed-order-check/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/fuzz/equal-produced-consumed-order-check/q.go b/tests/fuzz/equal-produced-consumed-order-check/q.go
new file mode 100644
index 0000000..8b95aef
--- /dev/null
+++ b/tests/fuzz/equal-produced-consumed-order-check/q.go
@@ -0,0 +1,35 @@
+package q
+
+import (
+ "os"
+ "testing"
+ "testing/internal/testdeps"
+)
+
+
+
+func queries(f *testing.F) {
+ f.Fuzz(func(t *testing.T, n int) {
+ if n > 154 {
+ if n < 155 {
+ t.Errorf("Failed n: %v\n", n)
+ }
+ }
+ })
+}
+
+
+
+func MainTest() {
+ // FIXME: produced order is identical to consumed order
+ fuzzTargets := []testing.InternalFuzzTarget{
+ { "queries", queries },
+ }
+
+ deps := testdeps.TestDeps{}
+ tests := []testing.InternalTest {}
+ benchmarks := []testing.InternalBenchmark{}
+ examples := []testing.InternalExample {}
+ m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
+ os.Exit(m.Run())
+}
diff --git a/tests/fuzz/exactly-once-check/main.go b/tests/fuzz/exactly-once-check/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/fuzz/exactly-once-check/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/fuzz/exactly-once-check/q.go b/tests/fuzz/exactly-once-check/q.go
new file mode 100644
index 0000000..d483ad8
--- /dev/null
+++ b/tests/fuzz/exactly-once-check/q.go
@@ -0,0 +1,35 @@
+package q
+
+import (
+ "os"
+ "testing"
+ "testing/internal/testdeps"
+)
+
+
+
+func queries(f *testing.F) {
+ f.Fuzz(func(t *testing.T, n int) {
+ if n > 154 {
+ if n < 155 {
+ t.Errorf("Failed n: %v\n", n)
+ }
+ }
+ })
+}
+
+
+
+func MainTest() {
+ // FIXME: a message is consumed exactly once
+ fuzzTargets := []testing.InternalFuzzTarget{
+ { "queries", queries },
+ }
+
+ deps := testdeps.TestDeps{}
+ tests := []testing.InternalTest {}
+ benchmarks := []testing.InternalBenchmark{}
+ examples := []testing.InternalExample {}
+ m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
+ os.Exit(m.Run())
+}
diff --git a/tests/fuzz/queries-check/main.go b/tests/fuzz/queries-check/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/fuzz/queries-check/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/fuzz/queries-check/q.go b/tests/fuzz/queries-check/q.go
new file mode 100644
index 0000000..5c872e5
--- /dev/null
+++ b/tests/fuzz/queries-check/q.go
@@ -0,0 +1,35 @@
+package q
+
+import (
+ "os"
+ "testing"
+ "testing/internal/testdeps"
+)
+
+
+
+func queries(f *testing.F) {
+ f.Fuzz(func(t *testing.T, n int) {
+ if n > 154 {
+ if n < 155 {
+ t.Errorf("Failed n: %v\n", n)
+ }
+ }
+ })
+}
+
+
+
+func MainTest() {
+ // FIXME
+ fuzzTargets := []testing.InternalFuzzTarget{
+ { "queries", queries },
+ }
+
+ deps := testdeps.TestDeps{}
+ tests := []testing.InternalTest {}
+ benchmarks := []testing.InternalBenchmark{}
+ examples := []testing.InternalExample {}
+ m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
+ os.Exit(m.Run())
+}
diff --git a/tests/fuzz/total-order-check/main.go b/tests/fuzz/total-order-check/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/fuzz/total-order-check/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/fuzz/total-order-check/q.go b/tests/fuzz/total-order-check/q.go
new file mode 100644
index 0000000..67af544
--- /dev/null
+++ b/tests/fuzz/total-order-check/q.go
@@ -0,0 +1,35 @@
+package q
+
+import (
+ "os"
+ "testing"
+ "testing/internal/testdeps"
+)
+
+
+
+func queries(f *testing.F) {
+ f.Fuzz(func(t *testing.T, n int) {
+ if n > 154 {
+ if n < 155 {
+ t.Errorf("Failed n: %v\n", n)
+ }
+ }
+ })
+}
+
+
+
+func MainTest() {
+ // FIXME: a consumer gets the messages in total order
+ fuzzTargets := []testing.InternalFuzzTarget{
+ { "queries", queries },
+ }
+
+ deps := testdeps.TestDeps{}
+ tests := []testing.InternalTest {}
+ benchmarks := []testing.InternalBenchmark{}
+ examples := []testing.InternalExample {}
+ m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
+ os.Exit(m.Run())
+}
diff --git a/tests/liteq.go b/tests/liteq.go
deleted file mode 100644
index 86d1bef..0000000
--- a/tests/liteq.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package q
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-
- g "gobang"
-)
-
-
-
-func TestX(t *testing.T) {
- g.AssertEqual(1, 1)
-}
-
-
-
-func MainTest() {
- tests := []testing.InternalTest {
- { "TestX", TestX },
- }
-
- benchmarks := []testing.InternalBenchmark {}
- fuzzTargets := []testing.InternalFuzzTarget {}
- examples := []testing.InternalExample {}
- m := testing.MainStart(
- testdeps.TestDeps {},
- tests,
- benchmarks,
- fuzzTargets,
- examples,
- )
- os.Exit(m.Run())
-}
diff --git a/tests/q.go b/tests/q.go
new file mode 100644
index 0000000..6b9e422
--- /dev/null
+++ b/tests/q.go
@@ -0,0 +1,5776 @@
+package q
+
+import (
+ "bytes"
+ "database/sql"
+ "errors"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "reflect"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "acudego"
+ "guuid"
+ g "gobang"
+)
+
+
+
+func test_defaultPrefix() {
+ g.TestStart("defaultPrefix")
+
+ g.Testing("the defaultPrefix is valid", func() {
+ g.TErrorIf(g.ValidateSQLTablePrefix(defaultPrefix))
+ })
+}
+
+func test_inTx() {
+ /*
+ // FIXME
+ g.TestStart("inTx()")
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+
+ g.Testing("when fn() errors, we propagate it", func() {
+ myError := errors.New("to be propagated")
+ err := inTx(db, func(tx *sql.Tx) error {
+ return myError
+ })
+ g.TAssertEqual(err, myError)
+ })
+
+ g.Testing("on nil error we get nil", func() {
+ err := inTx(db, func(tx *sql.Tx) error {
+ return nil
+ })
+ g.TErrorIf(err)
+ })
+ */
+}
+
+func test_createTables() {
+ g.TestStart("createTables()")
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+
+ g.Testing("tables exist afterwards", func() {
+ tmpl := `
+ SELECT id FROM "%s_messages" LIMIT 1;
+ `
+ q := fmt.Sprintf(tmpl, defaultPrefix)
+
+ _, err := db.Exec(q)
+ g.TErrorNil(err)
+
+ err = createTables(db, defaultPrefix)
+ g.TErrorIf(err)
+
+ _, err = db.Exec(q)
+ g.TErrorIf(err)
+ })
+
+ g.Testing("we can do it multiple times", func() {
+ g.TErrorIf(g.SomeError(
+ createTables(db, defaultPrefix),
+ createTables(db, defaultPrefix),
+ createTables(db, defaultPrefix),
+ ))
+ })
+}
+
+func test_takeStmt() {
+ g.TestStart("takeStmt()")
+
+ const (
+ topic = "take() topic"
+ consumer = "take() consumer"
+ prefix = defaultPrefix
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ g.TErrorIf(takeErr)
+ defer g.SomeFnError(
+ takeClose,
+ db.Close,
+ )
+
+ const tmpl = `
+ SELECT owner_id from "%s_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?;
+ `
+ sqlOwner := fmt.Sprintf(tmpl, prefix)
+
+
+ g.Testing("when there is no owner, we become it", func() {
+ var ownerID int
+ err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
+ g.TAssertEqual(err, sql.ErrNoRows)
+
+ err = take(topic, consumer)
+ g.TErrorIf(err)
+
+ err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
+ g.TErrorIf(err)
+ g.TAssertEqual(ownerID, instanceID)
+ })
+
+ g.Testing("if there is already an owner, we overtake it", func() {
+ otherID := instanceID + 1
+
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ var ownerID int
+ err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
+ g.TErrorIf(err)
+ g.TAssertEqual(ownerID, instanceID)
+
+ err = take(topic, consumer)
+ g.TErrorIf(err)
+
+ err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
+ g.TErrorIf(err)
+ g.TAssertEqual(ownerID, otherID)
+ })
+ g.Testing("no error if closed more than once", func() {
+ g.TErrorIf(g.SomeError(
+ takeClose(),
+ takeClose(),
+ takeClose(),
+ ))
+ })
+}
+
+func test_publishStmt() {
+ g.TestStart("publishStmt()")
+
+ const (
+ topic = "publish() topic"
+ payloadStr = "publish() payload"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ publishErr,
+ ))
+ defer g.SomeFnError(
+ publishClose,
+ db.Close,
+ )
+
+
+ g.Testing("we can publish a message", func() {
+ messageID := guuid.New()
+ message, err := publish(unsent, messageID)
+ g.TErrorIf(err)
+
+ g.TAssertEqual(message.id, int64(1))
+ g.TAssertEqual(message.uuid, messageID)
+ g.TAssertEqual(message.topic, topic)
+ g.TAssertEqual(message.flowID, flowID)
+ g.TAssertEqual(message.payload, payload)
+ })
+
+ g.Testing("we can publish the same message repeatedly", func() {
+ messageID1 := guuid.New()
+ messageID2 := guuid.New()
+ message1, err1 := publish(unsent, messageID1)
+ message2, err2 := publish(unsent, messageID2)
+ g.TErrorIf(g.SomeError(err1, err2))
+
+ g.TAssertEqual(message1.id, message2.id - 1)
+ g.TAssertEqual(message1.topic, message2.topic)
+ g.TAssertEqual(message1.flowID, message2.flowID)
+ g.TAssertEqual(message1.payload, message2.payload)
+
+ g.TAssertEqual(message1.uuid, messageID1)
+ g.TAssertEqual(message2.uuid, messageID2)
+ })
+
+ g.Testing("publishing a message with the same UUID errors", func() {
+ messageID := guuid.New()
+ message1, err1 := publish(unsent, messageID)
+ _, err2 := publish(unsent, messageID)
+ g.TErrorIf(err1)
+
+ g.TAssertEqual(message1.uuid, messageID)
+ g.TAssertEqual(message1.topic, topic)
+ g.TAssertEqual(message1.flowID, flowID)
+ g.TAssertEqual(message1.payload, payload)
+
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ publishClose(),
+ publishClose(),
+ publishClose(),
+ ))
+ })
+}
+
+func test_findStmt() {
+ g.TestStart("findStmt()")
+
+ const (
+ topic = "find() topic"
+ payloadStr = "find() payload"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ find, findClose, findErr := findStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ publishErr,
+ findErr,
+ ))
+ defer g.SomeFnError(
+ publishClose,
+ findClose,
+ db.Close,
+ )
+
+ pub := func(flowID guuid.UUID) guuid.UUID {
+ unsentWithFlowID := unsent
+ unsentWithFlowID.FlowID = flowID
+ messageID := guuid.New()
+ _, err := publish(unsentWithFlowID, messageID)
+ g.TErrorIf(err)
+ return messageID
+ }
+
+
+ g.Testing("we can find a message by topic and flowID", func() {
+ flowID := guuid.New()
+ messageID := pub(flowID)
+ message, err := find(topic, flowID)
+ g.TErrorIf(err)
+
+ g.TAssertEqual(message.uuid, messageID)
+ g.TAssertEqual(message.topic, topic)
+ g.TAssertEqual(message.flowID, flowID)
+ g.TAssertEqual(message.payload, payload)
+ })
+
+ g.Testing("a non-existent message gives us an error", func() {
+ message, err := find(topic, guuid.New())
+ g.TAssertEqual(message, messageT{})
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("findig twice yields the exact same message", func() {
+ flowID := guuid.New()
+ messageID := pub(flowID)
+ message1, err1 := find(topic, flowID)
+ message2, err2 := find(topic, flowID)
+ g.TErrorIf(g.SomeError(err1, err2))
+
+ g.TAssertEqual(message1.uuid, messageID)
+ g.TAssertEqual(message1, message2)
+ })
+
+ g.Testing("returns the latest entry if multiple are available", func() {
+ flowID := guuid.New()
+
+ _ , err0 := find(topic, flowID)
+ pub(flowID)
+ message1, err1 := find(topic, flowID)
+ pub(flowID)
+ message2, err2 := find(topic, flowID)
+
+ g.TAssertEqual(err0, sql.ErrNoRows)
+ g.TErrorIf(g.SomeError(err1, err2))
+ g.TAssertEqual(message1.uuid == message2.uuid, false)
+ g.TAssertEqual(message1.id < message2.id, true)
+ })
+
+ g.Testing("no error if closed more than once", func() {
+ g.TErrorIf(g.SomeError(
+ findClose(),
+ findClose(),
+ findClose(),
+ ))
+ })
+}
+
+func test_nextStmt() {
+ g.TestStart("nextStmt()")
+
+ const (
+ topic = "next() topic"
+ payloadStr = "next() payload"
+ consumer = "next() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ nextErr,
+ commitErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ nextClose,
+ commitClose,
+ db.Close,
+ )
+
+ pub := func(topic string) messageT {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message
+ }
+
+
+ g.Testing("we get an error on empty topic", func() {
+ _, err := next(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("we don't get messages from other topics", func() {
+ pub("other topic")
+ _, err := next(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("we can get the next message", func() {
+ expectedMessage := pub(topic)
+ pub(topic)
+ pub(topic)
+ message, err := next(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(message, expectedMessage)
+ })
+
+ g.Testing("we keep getting the next until we commit", func() {
+ message1, err1 := next(topic, consumer)
+ message2, err2 := next(topic, consumer)
+ g.TErrorIf(commit(consumer, message1.uuid))
+ message3, err3 := next(topic, consumer)
+ g.TErrorIf(g.SomeError(err1, err2, err3))
+
+ g.TAssertEqual(message1, message2)
+ g.TAssertEqual(message2.uuid != message3.uuid, true)
+ })
+
+ g.Testing("each consumer has its own next message", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ message1, err1 := next(topic, consumer)
+ message2, err2 := next(topic, "other consumer")
+ g.TErrorIf(g.SomeError(err1, err2))
+ g.TAssertEqual(message1.uuid != message2.uuid, true)
+ })
+
+ g.Testing("error when we're not the owner", func() {
+ otherID := instanceID + 1
+
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ _, err := next(topic, consumer)
+ g.TErrorIf(err)
+
+ err = take(topic, consumer)
+ g.TErrorIf(err)
+
+ _, err = next(topic, consumer)
+ g.TAssertEqual(err, fmt.Errorf(
+ notOwnerErrorFmt,
+ otherID,
+ topic,
+ consumer,
+ instanceID,
+ ))
+ })
+
+ g.Testing("we can close more than once", func() {
+ g.TErrorIf(g.SomeError(
+ nextClose(),
+ nextClose(),
+ nextClose(),
+ ))
+ })
+}
+
+func test_messageEach() {
+ g.TestStart("messageEach()")
+
+ const (
+ topic = "messageEach() topic"
+ payloadStr = "messageEach() payload"
+ consumer = "messageEach() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ pendingErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ pendingClose,
+ db.Close,
+ )
+
+ pub := func() guuid.UUID {
+ message, err := publish(unsent, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+ g.TErrorIf(take(topic, consumer))
+
+
+ g.Testing("not called on empty set", func() {
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ messageEach(rows, func(messageT) error {
+ g.Unreachable()
+ return nil
+ })
+ })
+
+ g.Testing("the callback is called once for each entry", func() {
+ messageIDs := []guuid.UUID{
+ pub(),
+ pub(),
+ pub(),
+ }
+
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ var collectedIDs []guuid.UUID
+ err = messageEach(rows, func(message messageT) error {
+ collectedIDs = append(collectedIDs, message.uuid)
+ return nil
+ })
+ g.TErrorIf(err)
+
+ g.TAssertEqual(collectedIDs, messageIDs)
+ })
+
+ g.Testing("we halt if the timestamp is ill-formatted", func() {
+ messageID := pub()
+ message_id_bytes := messageID[:]
+ pub()
+ pub()
+ pub()
+
+ const tmplUpdate = `
+ UPDATE "%s_messages"
+ SET timestamp = '01/01/1970'
+ WHERE uuid = ?;
+ `
+ sqlUpdate := fmt.Sprintf(tmplUpdate, prefix)
+ _, err := db.Exec(sqlUpdate, message_id_bytes)
+ g.TErrorIf(err)
+
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ n := 0
+ err = messageEach(rows, func(messageT) error {
+ n++
+ return nil
+ })
+
+ g.TAssertEqual(
+ err,
+ &time.ParseError{
+ Layout: time.RFC3339Nano,
+ Value: "01/01/1970",
+ LayoutElem: "2006",
+ ValueElem: "01/01/1970",
+ Message: "",
+ },
+ )
+ g.TAssertEqual(n, 3)
+
+ const tmplDelete = `
+ DELETE FROM "%s_messages"
+ WHERE uuid = ?;
+ `
+ sqlDelete := fmt.Sprintf(tmplDelete, prefix)
+ _, err = db.Exec(sqlDelete, message_id_bytes)
+ g.TErrorIf(err)
+ })
+
+ g.Testing("we halt if the callback returns an error", func() {
+ myErr := errors.New("callback error early return")
+
+ rows1, err1 := pending(topic, consumer)
+ g.TErrorIf(err1)
+
+ n1 := 0
+ err1 = messageEach(rows1, func(messageT) error {
+ n1++
+ if n1 == 4 {
+ return myErr
+ }
+ return nil
+ })
+
+ rows2, err2 := pending(topic, consumer)
+ g.TErrorIf(err2)
+
+ n2 := 0
+ err2 = messageEach(rows2, func(messageT) error {
+ n2++
+ return nil
+ })
+
+ g.TAssertEqual(err1, myErr)
+ g.TErrorIf(err2)
+ g.TAssertEqual(n1, 4)
+ g.TAssertEqual(n2, 6)
+ })
+
+ g.Testing("noop when given nil for *sql.Rows", func() {
+ err := messageEach(nil, func(messageT) error {
+ g.Unreachable()
+ return nil
+ })
+ g.TErrorIf(err)
+ })
+}
+
+func test_pendingStmt() {
+ g.TestStart("pendingStmt()")
+
+ const (
+ topic = "pending() topic"
+ payloadStr = "pending() payload"
+ consumer = "pending() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ pendingErr,
+ commitErr,
+ toDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ pendingClose,
+ commitClose,
+ toDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) messageT {
+ g.TErrorIf(take(topic, consumer))
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message
+ }
+ g.TErrorIf(take(topic, consumer))
+
+ collectPending := func(topic string, consumer string) []messageT {
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ var messages []messageT
+ err = messageEach(rows, func(message messageT) error {
+ messages = append(messages, message)
+ return nil
+ })
+ g.TErrorIf(err)
+ return messages
+ }
+
+
+ g.Testing("an empty database has 0 pending items", func() {
+ g.TAssertEqual(len(collectPending(topic, consumer)), 0)
+ })
+
+ g.Testing("after publishing we get all messages", func() {
+ expected := []messageT{
+ pub(topic),
+ pub(topic),
+ }
+
+ g.TAssertEqualI(collectPending(topic, consumer), expected)
+ })
+
+ g.Testing("we get the same messages when calling again", func() {
+ messages1 := collectPending(topic, consumer)
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+ g.TAssertEqualI(messages1, messages2)
+ })
+
+ g.Testing("we don't get messages from other topics", func() {
+ pub("other topic")
+
+ g.TAssertEqual(len(collectPending(topic, consumer)), 2)
+ g.TAssertEqual(len(collectPending("other topic", consumer)), 1)
+ })
+
+ g.Testing("after others commit, pending still returns them", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ messages1 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+ g.TErrorIf(
+ commit("other consumer", messages1[0].uuid),
+ )
+
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqualI(messages1, messages2)
+ })
+
+ g.Testing("committing other topic doesn't change current", func() {
+ messages1 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+
+ message := pub("other topic")
+
+ g.TErrorIf(commit(consumer, message.uuid))
+
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqualI(messages1, messages2)
+ })
+
+ g.Testing("after commiting, pending doesn't return them again", func() {
+ messages1 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+
+ g.TErrorIf(commit(consumer, messages1[0].uuid))
+
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages2), 1)
+ g.TAssertEqual(messages2[0], messages1[1])
+
+ g.TErrorIf(commit(consumer, messages1[1].uuid))
+
+ messages3 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages3), 0)
+ })
+
+ g.Testing("on deadletter, pending also doesn't return them", func() {
+ messages0 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages0), 0)
+
+ message1 := pub(topic)
+ message2 := pub(topic)
+
+ messages1 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+
+ err = toDead(consumer, message1.uuid, guuid.New())
+ g.TErrorIf(err)
+
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages2), 1)
+ g.TAssertEqual(messages2[0], message2)
+
+ err = toDead(consumer, message2.uuid, guuid.New())
+ g.TErrorIf(err)
+
+ messages3 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages3), 0)
+ })
+
+ g.Testing("if commits are unordered, pending is still sorted", func() {
+ message1 := pub(topic)
+ message2 := pub(topic)
+ message3 := pub(topic)
+
+ g.TAssertEqual(collectPending(topic, consumer), []messageT{
+ message1,
+ message2,
+ message3,
+ })
+
+ g.TErrorIf(commit(consumer, message2.uuid))
+ g.TAssertEqual(collectPending(topic, consumer), []messageT{
+ message1,
+ message3,
+ })
+
+ g.TErrorIf(commit(consumer, message1.uuid))
+ g.TAssertEqual(collectPending(topic, consumer), []messageT{
+ message3,
+ })
+
+ g.TErrorIf(commit(consumer, message3.uuid))
+ g.TAssertEqual(len(collectPending(topic, consumer)), 0)
+ })
+
+ g.Testing("when we're not the owners we get nothing", func() {
+ otherID := instanceID + 1
+
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ message1 := pub(topic)
+ message2 := pub(topic)
+ message3 := pub(topic)
+ message4 := pub(topic)
+ message5 := pub(topic)
+
+ expected := []messageT{
+ message1,
+ message2,
+ message3,
+ message4,
+ message5,
+ }
+
+ g.TAssertEqual(collectPending(topic, consumer), expected)
+
+ err := take(topic, consumer)
+ g.TErrorIf(err)
+
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ err = messageEach(rows, func(messageT) error {
+ g.Unreachable()
+ return nil
+ })
+ g.TErrorIf(err)
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ pendingClose(),
+ pendingClose(),
+ pendingClose(),
+ ))
+ })
+}
+
+func test_commitStmt() {
+ g.TestStart("commitStmt()")
+
+ const (
+ topic = "commit() topic"
+ payloadStr = "commit() payload"
+ consumer = "commit() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ commitErr,
+ toDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ commitClose,
+ toDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+ cmt := func(consumer string, messageID guuid.UUID) error {
+ g.TErrorIf(take(topic, consumer))
+
+ return commit(consumer, messageID)
+ }
+
+
+ g.Testing("we can't commit twice", func() {
+ messageID := pub(topic)
+
+ err1 := cmt(consumer, messageID)
+ err2 := cmt(consumer, messageID)
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("we can't commit non-existent messages", func() {
+ err := cmt(consumer, guuid.New())
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("multiple consumers may commit a message", func() {
+ messageID := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ cmt(consumer, messageID),
+ cmt("other consumer", messageID),
+ cmt("yet another consumer", messageID),
+ ))
+ })
+
+ g.Testing("a consumer can commit to multiple topics", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub("other topic")
+ messageID3 := pub("yet another topic")
+
+ g.TErrorIf(g.SomeError(
+ cmt(consumer, messageID1),
+ cmt(consumer, messageID2),
+ cmt(consumer, messageID3),
+ ))
+ })
+
+ g.Testing("a consumer can consume many messages from a topic", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+ messageID3 := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ cmt(consumer, messageID1),
+ cmt(consumer, messageID2),
+ cmt(consumer, messageID3),
+ ))
+ })
+
+ g.Testing("we can't commit a dead message", func() {
+ messageID := pub(topic)
+
+ err1 := toDead(consumer, messageID, guuid.New())
+ err2 := cmt(consumer, messageID)
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("error if we don't own the topic/consumer", func() {
+ otherID := instanceID + 1
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ messageID := pub(topic)
+
+ err := take(topic, consumer)
+ g.TErrorIf(err)
+
+ err = commit(consumer, messageID)
+ g.TAssertEqual(err, fmt.Errorf(
+ noLongerOwnerErrorFmt,
+ instanceID,
+ topic,
+ consumer,
+ otherID,
+ ))
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ commitClose(),
+ commitClose(),
+ commitClose(),
+ ))
+ })
+}
+
+func test_toDeadStmt() {
+ g.TestStart("toDeadStmt()")
+
+ const (
+ topic = "toDead() topic"
+ payloadStr = "toDead() payload"
+ consumer = "toDead() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ commitErr,
+ toDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ commitClose,
+ toDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+ asDead := func(
+ consumer string,
+ messageID guuid.UUID,
+ deadletterID guuid.UUID,
+ ) error {
+ g.TErrorIf(take(topic, consumer))
+ return toDead(consumer, messageID, deadletterID)
+ }
+
+
+ g.Testing("we can't mark as dead twice", func() {
+ messageID := pub(topic)
+
+ err1 := asDead(consumer, messageID, guuid.New())
+ err2 := asDead(consumer, messageID, guuid.New())
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("we can't reuse a deadletter id", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+ deadletterID := guuid.New()
+
+ err1 := asDead(consumer, messageID1, deadletterID)
+ err2 := asDead(consumer, messageID2, deadletterID)
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+
+ })
+
+ g.Testing("we can't mark as dead non-existent messages", func() {
+ err := asDead(consumer, guuid.New(), guuid.New())
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("multiple consumers may mark a message as dead", func() {
+ messageID := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ asDead(consumer, messageID, guuid.New()),
+ asDead("another consumer", messageID, guuid.New()),
+ asDead("yet another consumer", messageID, guuid.New()),
+ ))
+ })
+
+ g.Testing("a consumer can mark as dead in multiple topics", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub("other topic")
+ messageID3 := pub("yet other topic")
+
+ g.TErrorIf(g.SomeError(
+ asDead(consumer, messageID1, guuid.New()),
+ asDead(consumer, messageID2, guuid.New()),
+ asDead(consumer, messageID3, guuid.New()),
+ ))
+ })
+
+ g.Testing("a consumer can produce many deadletters in a topic", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+ messageID3 := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ asDead(consumer, messageID1, guuid.New()),
+ asDead(consumer, messageID2, guuid.New()),
+ asDead(consumer, messageID3, guuid.New()),
+ ))
+ })
+
+ g.Testing("a consumer can intercalate commits and deadletters", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+ messageID3 := pub(topic)
+ messageID4 := pub(topic)
+ messageID5 := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ asDead(consumer, messageID1, guuid.New()),
+ commit(consumer, messageID2),
+ commit(consumer, messageID3),
+ asDead(consumer, messageID4, guuid.New()),
+ commit(consumer, messageID5),
+ ))
+ })
+
+ g.Testing("we can't mark a committed message as dead", func() {
+ messageID := pub(topic)
+
+ err1 := commit(consumer, messageID)
+ err2 := asDead(consumer, messageID, guuid.New())
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("error if we don't own the message's consumer/topic", func() {
+ otherID := instanceID + 1
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ err := toDead(consumer, messageID1, guuid.New())
+ g.TErrorIf(err)
+
+ err = take(topic, consumer)
+ g.TErrorIf(err)
+
+ err = toDead(consumer, messageID2, guuid.New())
+ g.TAssertEqual(err, fmt.Errorf(
+ noLongerOwnerErrorFmt,
+ instanceID,
+ topic,
+ consumer,
+ otherID,
+ ))
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ toDeadClose(),
+ toDeadClose(),
+ toDeadClose(),
+ ))
+ })
+}
+
+func test_replayStmt() {
+ g.TestStart("replayStmt()")
+
+ const (
+ topic = "replay() topic"
+ payloadStr = "replay() payload"
+ consumer = "replay() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ replayErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ replayClose,
+ db.Close,
+ )
+
+ pub := func() messageT {
+ message, err := publish(unsent, guuid.New())
+ g.TErrorIf(err)
+ return message
+ }
+ g.TErrorIf(take(topic, consumer))
+
+
+ g.Testing("we can replay a message", func() {
+ message := pub()
+ deadletterID := guuid.New()
+ replayedID := guuid.New()
+
+ err1 := toDead(consumer, message.uuid, deadletterID)
+ replayed, err2 := replay(deadletterID, replayedID)
+ g.TErrorIf(g.SomeError(err1, err2))
+
+ g.TAssertEqual(replayed.uuid, replayedID)
+ g.TAssertEqual(replayed.id == message.id, false)
+ g.TAssertEqual(replayed.uuid == message.uuid, false)
+ })
+
+ g.Testing("a replayed message keeps its payload", func() {
+ message := pub()
+ deadletterID := guuid.New()
+ err := toDead(consumer, message.uuid, deadletterID)
+ g.TErrorIf(err)
+
+ replayed, err := replay(deadletterID, guuid.New())
+ g.TErrorIf(err)
+ g.TAssertEqual(message.flowID, replayed.flowID)
+ g.TAssertEqual(message.payload, replayed.payload)
+ })
+
+ g.Testing("we can't replay a dead message twice", func() {
+ message := pub()
+ deadletterID := guuid.New()
+
+ err := toDead(consumer, message.uuid, deadletterID)
+ g.TErrorIf(err)
+
+ _, err1 := replay(deadletterID, guuid.New())
+ _, err2 := replay(deadletterID, guuid.New())
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("we cant replay non-existent messages", func() {
+ _, err := replay(guuid.New(), guuid.New())
+ g.TAssertEqual(
+ err.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintNotNull,
+ )
+ })
+
+ g.Testing("messages can die and then be replayed many times", func() {
+ message := pub()
+ deadletterID1 := guuid.New()
+ deadletterID2 := guuid.New()
+
+ err := toDead(consumer, message.uuid, deadletterID1)
+ g.TErrorIf(err)
+
+ replayed1, err := replay(deadletterID1, guuid.New())
+ g.TErrorIf(err)
+
+ err = toDead(consumer, replayed1.uuid, deadletterID2)
+ g.TErrorIf(err)
+
+ replayed2, err := replay(deadletterID2, guuid.New())
+ g.TErrorIf(err)
+
+ g.TAssertEqual(message.flowID, replayed1.flowID)
+ g.TAssertEqual(replayed1.flowID, replayed2.flowID)
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ replayClose(),
+ replayClose(),
+ replayClose(),
+ ))
+ })
+}
+
+func test_oneDeadStmt() {
+ g.TestStart("oneDeadStmt()")
+
+ const (
+ topic = "oneDead() topic"
+ payloadStr = "oneDead() payload"
+ consumer = "oneDead() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ replayErr,
+ oneDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ replayClose,
+ oneDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+
+ g.Testing("error on missing deadletters", func() {
+ _, err := oneDead(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("deadletters on other topics don't show for us", func() {
+ err := toDead(consumer, pub("other topic"), guuid.New())
+ g.TErrorIf(err)
+
+ _, err = oneDead(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("deadletters for other consumers don't show for use", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ err := toDead("other consumer", pub(topic), guuid.New())
+ g.TErrorIf(err)
+
+ _, err = oneDead(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("after being replayed deadletters aren't returned", func() {
+ messageID1 := guuid.New()
+ messageID2 := guuid.New()
+ messageID3 := guuid.New()
+
+ err1 := toDead(consumer, pub(topic), messageID1)
+ err2 := toDead(consumer, pub(topic), messageID2)
+ err3 := toDead(consumer, pub(topic), messageID3)
+ g.TErrorIf(g.SomeError(err1, err2, err3))
+
+ deadletter, err := oneDead(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(deadletter.uuid, messageID1)
+
+ _, err = replay(messageID2, guuid.New())
+ g.TErrorIf(err)
+
+ deadletter, err = oneDead(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(deadletter.uuid, messageID1)
+
+ _, err = replay(messageID1, guuid.New())
+ g.TErrorIf(err)
+
+ deadletter, err = oneDead(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(deadletter.uuid, messageID3)
+
+ _, err = replay(messageID3, guuid.New())
+ g.TErrorIf(err)
+
+ _, err = oneDead(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+}
+
+func test_deadletterEach() {
+ g.TestStart("deadletterEach")
+
+ const (
+ topic = "deadletterEach() topic"
+ payloadStr = "deadletterEach() payload"
+ consumer = "deadletterEach() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ allDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ allDeadClose,
+ db.Close,
+ )
+
+ pub := func() guuid.UUID {
+ message, err := publish(unsent, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+ dead := func(messageID guuid.UUID) guuid.UUID {
+ deadletterID := guuid.New()
+ err := toDead(consumer, messageID, deadletterID)
+ g.TErrorIf(err)
+
+ return deadletterID
+ }
+ g.TErrorIf(take(topic, consumer))
+
+
+ g.Testing("not called on empty set", func() {
+ rows, err := allDead(topic, consumer)
+ g.TErrorIf(err)
+
+ n := 0
+ deadletterEach(rows, func(deadletterT, messageT) error {
+ n++
+ return nil
+ })
+ })
+
+ g.Testing("the callback is called once for each entry", func() {
+ expected := []guuid.UUID{
+ dead(pub()),
+ dead(pub()),
+ dead(pub()),
+ }
+
+ rows, err := allDead(topic, consumer)
+ g.TErrorIf(err)
+
+ var deadletterIDs []guuid.UUID
+ deadletterEach(rows, func(
+ deadletter deadletterT,
+ _ messageT,
+ ) error {
+ deadletterIDs = append(deadletterIDs, deadletter.uuid)
+ return nil
+ })
+
+ g.TAssertEqual(deadletterIDs, expected)
+ })
+
+ g.Testing("we halt if the timestamp is ill-formatted", func() {
+ messageID := pub()
+ message_id_bytes := messageID[:]
+ dead(messageID)
+ dead(pub())
+ dead(pub())
+ dead(pub())
+ dead(pub())
+
+ const tmplUpdate = `
+ UPDATE "%s_offsets"
+ SET timestamp = '01-01-1970'
+ WHERE message_id IN (
+ SELECT id FROM "%s_messages" WHERE uuid = ?
+ );
+ `
+ sqlUpdate := fmt.Sprintf(tmplUpdate, prefix, prefix)
+ _, err := db.Exec(sqlUpdate, message_id_bytes)
+ g.TErrorIf(err)
+
+ rows, err := allDead(topic, consumer)
+ g.TErrorIf(err)
+
+ n := 0
+ err = deadletterEach(rows, func(deadletterT, messageT) error {
+ n++
+ return nil
+ })
+
+ g.TAssertEqual(
+ err,
+ &time.ParseError{
+ Layout: time.RFC3339Nano,
+ Value: "01-01-1970",
+ LayoutElem: "2006",
+ ValueElem: "01-01-1970",
+ Message: "",
+ },
+ )
+ g.TAssertEqual(n, 3)
+
+ const tmplDelete = `
+ DELETE FROM "%s_offsets"
+ WHERE message_id IN (
+ SELECT id FROM "%s_messages" WHERE uuid = ?
+ );
+ `
+ sqlDelete := fmt.Sprintf(tmplDelete, prefix, prefix)
+ _, err = db.Exec(sqlDelete, message_id_bytes)
+ g.TErrorIf(err)
+ })
+
+ g.Testing("we halt if the callback returns an error", func() {
+ myErr := errors.New("early return error")
+
+ rows1, err1 := allDead(topic, consumer)
+ g.TErrorIf(err1)
+
+ n1 := 0
+ err1 = deadletterEach(rows1, func(deadletterT, messageT) error {
+ n1++
+ if n1 == 1 {
+ return myErr
+ }
+ return nil
+ })
+
+ rows2, err2 := allDead(topic, consumer)
+ g.TErrorIf(err2)
+
+ n2 := 0
+ err = deadletterEach(rows2, func(deadletterT, messageT) error {
+ n2++
+ return nil
+ })
+
+ g.TAssertEqual(err1, myErr)
+ g.TErrorIf(err2)
+ g.TAssertEqual(n1, 1)
+ g.TAssertEqual(n2, 7)
+ })
+}
+
+func test_allDeadStmt() {
+ g.TestStart("allDeadStmt()")
+
+ const (
+ topic = "allDead() topic"
+ payloadStr = "allDead() payload"
+ consumer = "allDead() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ replayErr,
+ allDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ replayClose,
+ allDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+ collectAll := func(
+ topic string,
+ consumer string,
+ ) ([]deadletterT, []messageT) {
+ var (
+ deadletters []deadletterT
+ messages []messageT
+ )
+ eachFn := func(
+ deadletter deadletterT,
+ message messageT,
+ ) error {
+ deadletters = append(deadletters, deadletter)
+ messages = append(messages, message)
+ return nil
+ }
+
+ rows, err := allDead(topic, consumer)
+ g.TErrorIf(err)
+
+ err = deadletterEach(rows, eachFn)
+ g.TErrorIf(err)
+
+ return deadletters, messages
+ }
+
+
+ g.Testing("no entry on empty deadletters", func() {
+ deadletterIDs, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(deadletterIDs), 0)
+ })
+
+ g.Testing("deadletters on other topics don't show up", func() {
+ err := toDead(consumer, pub("other topic"), guuid.New())
+ g.TErrorIf(err)
+
+ deadletters, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(deadletters), 0)
+ })
+
+ g.Testing("deadletters of other consumers don't show up", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ err := toDead("other consumer", pub(topic), guuid.New())
+ g.TErrorIf(err)
+
+ deadletterIDs, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(deadletterIDs), 0)
+ })
+
+ g.Testing("deadletters are given in order", func() {
+ deadletterIDs := []guuid.UUID{
+ guuid.New(),
+ guuid.New(),
+ guuid.New(),
+ }
+ messageIDs := []guuid.UUID{
+ pub(topic),
+ pub(topic),
+ pub(topic),
+ }
+
+ err1 := toDead(consumer, messageIDs[0], deadletterIDs[0])
+ err2 := toDead(consumer, messageIDs[1], deadletterIDs[1])
+ err3 := toDead(consumer, messageIDs[2], deadletterIDs[2])
+ g.TErrorIf(g.SomeError(err1, err2, err3))
+
+ deadletters, messages := collectAll(topic, consumer)
+ g.TAssertEqual(deadletters[0].uuid, deadletterIDs[0])
+ g.TAssertEqual(deadletters[1].uuid, deadletterIDs[1])
+ g.TAssertEqual(deadletters[2].uuid, deadletterIDs[2])
+ g.TAssertEqual( messages[0].uuid, messageIDs[0])
+ g.TAssertEqual( messages[1].uuid, messageIDs[1])
+ g.TAssertEqual( messages[2].uuid, messageIDs[2])
+ g.TAssertEqual(len(deadletters), 3)
+ g.TAssertEqual(len(messages), 3)
+ })
+
+ g.Testing("after being replayed, they stop appearing", func() {
+ deadletters, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(deadletters), 3)
+
+ _, err := replay(deadletters[0].uuid, guuid.New())
+ g.TErrorIf(err)
+ collecteds, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(collecteds), 2)
+
+ _, err = replay(deadletters[1].uuid, guuid.New())
+ g.TErrorIf(err)
+ collecteds, _ = collectAll(topic, consumer)
+ g.TAssertEqual(len(collecteds), 1)
+
+ _, err = replay(deadletters[2].uuid, guuid.New())
+ g.TErrorIf(err)
+ collecteds, _ = collectAll(topic, consumer)
+ g.TAssertEqual(len(collecteds), 0)
+ })
+}
+
+func test_sizeStmt() {
+ g.TestStart("sizeStmt()")
+
+ const (
+ topic = "size() topic"
+ payloadStr = "size() payload"
+ consumer = "size() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
+ size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ replayErr,
+ oneDeadErr,
+ sizeErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ replayClose,
+ sizeClose,
+ oneDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+
+ g.Testing("0 on empty topic", func() {
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("other topics don't fall into our count", func() {
+ pub("other topic")
+
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("otherwise we just get the sum", func() {
+ pub(topic)
+ pub(topic)
+ pub(topic)
+ pub(topic)
+ pub(topic)
+
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 5)
+ })
+
+ g.Testing("deadletters aren't taken into account", func() {
+ sixthMessageID := pub(topic)
+ err := toDead(consumer, sixthMessageID, guuid.New())
+ g.TErrorIf(err)
+
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 6)
+ })
+
+ g.Testing("after replay, deadletters increases the size", func() {
+ deadletter, err := oneDead(topic, consumer)
+ g.TErrorIf(err)
+
+ _, err = replay(deadletter.uuid, guuid.New())
+ g.TErrorIf(err)
+
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 7)
+ })
+}
+
+func test_countStmt() {
+ g.TestStart("countStmt()")
+
+ const (
+ topic = "count() topic"
+ payloadStr = "count() payload"
+ consumer = "count() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ count, countClose, countErr := countStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ nextErr,
+ commitErr,
+ toDeadErr,
+ countErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ nextClose,
+ commitClose,
+ toDeadClose,
+ countClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+
+ g.Testing("0 on empty topic", func() {
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("other topics don't add to our count", func() {
+ err := commit(consumer, pub("other topic"))
+ g.TErrorIf(err)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("other consumers don't influence our count", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ err := commit("other consumer", pub(topic))
+ g.TErrorIf(err)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("unconsumed messages don't count", func() {
+ pub(topic)
+ pub(topic)
+ pub(topic)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("consumed messages do count", func() {
+ message, err := next(topic, consumer)
+ g.TErrorIf(err)
+ err = commit(consumer, message.uuid)
+ g.TErrorIf(err)
+
+ message, err = next(topic, consumer)
+ g.TErrorIf(err)
+ err = commit(consumer, message.uuid)
+ g.TErrorIf(err)
+
+ message, err = next(topic, consumer)
+ g.TErrorIf(err)
+ err = commit(consumer, message.uuid)
+ g.TErrorIf(err)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 3)
+ })
+
+ g.Testing("deadletters count as consumed", func() {
+ message, err := next(topic, consumer)
+ g.TErrorIf(err)
+
+ err = toDead(consumer, message.uuid, guuid.New())
+ g.TErrorIf(err)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 4)
+ })
+}
+
+func test_hasDataStmt() {
+ g.TestStart("hasDataStmt()")
+
+ const (
+ topic = "hasData() topic"
+ payloadStr = "hasData() payload"
+ consumer = "hasData() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ nextErr,
+ commitErr,
+ toDeadErr,
+ hasDataErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ nextClose,
+ commitClose,
+ toDeadClose,
+ hasDataClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+
+ g.Testing("false on empty topic", func() {
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, false)
+ })
+
+ g.Testing("other topics don't change the response", func() {
+ pub("other topic")
+
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, false)
+ })
+
+ g.Testing("published messages flip the flag", func() {
+ pub(topic)
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, true)
+ })
+
+ g.Testing("other consumers don't influence us", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ message, err := next(topic, "other consumer")
+ g.TErrorIf(err)
+
+ err = commit("other consumer", message.uuid)
+ g.TErrorIf(err)
+
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, true)
+ })
+
+ g.Testing("consuming messages unflips the result", func() {
+ message, err := next(topic, consumer)
+ g.TErrorIf(err)
+
+ err = commit(consumer, message.uuid)
+ g.TErrorIf(err)
+
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, false)
+ })
+
+ g.Testing("same for deadletters", func() {
+ has0, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has0, false)
+
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+
+ has1, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has1, true)
+
+ err = toDead(consumer, messageID1, guuid.New())
+ g.TErrorIf(err)
+ err = commit(consumer, messageID2)
+ g.TErrorIf(err)
+
+ has2, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has2, false)
+ })
+}
+
+func test_initDB() {
+ g.TestStart("initDB()")
+
+ const (
+ topic = "initDB() topic"
+ payloadStr = "initDB() payload"
+ consumer = "initDB() consumer"
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ var messages []messageT
+ notifyFn := func(message messageT) {
+ messages = append(messages, message)
+ }
+
+ instanceID := os.Getpid()
+ queries, err := initDB(db, defaultPrefix, notifyFn, instanceID)
+ g.TErrorIf(err)
+ defer queries.close()
+
+ g.TErrorIf(queries.take(topic, consumer))
+
+
+ g.Testing("we can perform all the wrapped operations", func() {
+ messageID := guuid.New()
+ newMessageID := guuid.New()
+ deadletterID := guuid.New()
+
+ messageV1, err := queries.publish(unsent, messageID)
+ g.TErrorIf(err)
+
+ messageV2, err := queries.next(topic, consumer)
+ g.TErrorIf(err)
+
+ var messagesV3 []messageT
+ pendingFn := func(message messageT) error {
+ messagesV3 = append(messagesV3, message)
+ return nil
+ }
+ err = queries.pending(topic, consumer, pendingFn)
+ g.TErrorIf(err)
+ g.TAssertEqual(len(messagesV3), 1)
+ messageV3 := messagesV3[0]
+
+ err = queries.toDead(consumer, messageID, deadletterID)
+ g.TErrorIf(err)
+
+ deadletterV1, err := queries.oneDead(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(deadletterV1.uuid, deadletterID)
+
+ var (
+ deadlettersV2 []deadletterT
+ messagesV4 []messageT
+ )
+ deadletterFn := func(
+ deadletter deadletterT,
+ message messageT,
+ ) error {
+ deadlettersV2 = append(deadlettersV2, deadletter)
+ messagesV4 = append(messagesV4, message)
+ return nil
+ }
+ err = queries.allDead(topic, consumer, deadletterFn)
+ g.TErrorIf(err)
+ g.TAssertEqual(len(deadlettersV2), 1)
+ g.TAssertEqual(deadlettersV2[0].uuid, deadletterID)
+ g.TAssertEqual(len(messagesV4), 1)
+ messageV4 := messagesV4[0]
+
+ g.TAssertEqual(messageV1, messageV2)
+ g.TAssertEqual(messageV1, messageV3)
+ g.TAssertEqual(messageV1, messageV4)
+
+ newMessageV1, err := queries.replay(deadletterID, newMessageID)
+ g.TErrorIf(err)
+
+ err = queries.commit(consumer, newMessageID)
+ g.TErrorIf(err)
+
+ newMessageV0 := messageV1
+ newMessageV0.id = newMessageV1.id
+ newMessageV0.uuid = newMessageID
+ newMessageV0.timestamp = newMessageV1.timestamp
+ g.TAssertEqual(newMessageV1, newMessageV0)
+
+ size, err := queries.size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(size, 2)
+
+ count, err := queries.count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(count, 2)
+
+ hasData, err := queries.hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(hasData, false)
+ })
+}
+
+func test_queriesTclose() {
+ g.TestStart("queriesT.close()")
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ instanceID := os.Getpid()
+ queries, err := initDB(db, defaultPrefix, func(messageT) {}, instanceID)
+ g.TErrorIf(err)
+
+
+ g.Testing("after closing, we can't run queries", func() {
+ unsent := UnsentMessage{ Payload: []byte{}, }
+ _, err := queries.publish(unsent, guuid.New())
+ g.TErrorIf(err)
+ g.TErrorIf(db.Close())
+
+ err = queries.close()
+ g.TErrorIf(err)
+
+ _, err = queries.publish(unsent, guuid.New())
+ g.TAssertEqual(
+ err.Error(),
+ "sql: database is closed",
+ )
+ })
+
+ g.Testing("closing mre than once does not error", func() {
+ g.TErrorIf(g.SomeError(
+ queries.close(),
+ queries.close(),
+ ))
+ })
+}
+
+
+func test_newPinger() {
+ g.TestStart("newPinger()")
+
+ g.Testing("onPing() on a closed pinger is a noop", func() {
+ pinger := newPinger[string]()
+ pinger.close()
+ pinger.onPing(func(s string) {
+ panic(s)
+ })
+ pinger.tryPing("ignored")
+ })
+
+ g.Testing("onPing() on a closed pinger with data gets that", func() {
+ pinger := newPinger[string]()
+ pinger.tryPing("received")
+ pinger.tryPing("ignored")
+ g.TAssertEqual(pinger.closed(), false)
+
+ pinger.close()
+ g.TAssertEqual(pinger.closed(), true)
+
+ c := make(chan string)
+ count := 0
+ go pinger.onPing(func(s string) {
+ if count > 0 {
+ panic(s)
+ }
+ count++
+
+ c <- s
+ })
+
+ given := <- c
+ g.TAssertEqual(given, "received")
+ })
+
+ g.Testing("when onPing is late, we loose messages", func() {
+ pinger := newPinger[string]()
+ pinger.tryPing("first seen")
+ pinger.tryPing("second dropped")
+ pinger.tryPing("third dropped")
+
+ c := make(chan string)
+ go pinger.onPing(func(s string) {
+ c <- s
+ })
+ s := <- c
+
+ close(c)
+ pinger.close()
+ g.TAssertEqual(s, "first seen")
+ })
+
+ g.Testing("if onPing is on time, it may not loose", func() {
+ pinger := newPinger[string]()
+ pinger.tryPing("first seen")
+
+ c := make(chan string)
+ go pinger.onPing(func(s string) {
+ c <- s
+ })
+
+ s1 := <- c
+ pinger.tryPing("second seen")
+ s2 := <- c
+
+ close(c)
+ pinger.close()
+ g.TAssertEqual(s1, "first seen")
+ g.TAssertEqual(s2, "second seen")
+ })
+
+ g.Testing("if onPing takes too long, it still looses messages", func() {
+ pinger := newPinger[string]()
+ pinger.tryPing("first seen")
+
+ c1 := make(chan string)
+ c2 := make(chan struct{})
+ go pinger.onPing(func(s string) {
+ c1 <- s
+ c2 <- struct{}{}
+ })
+
+ s1 := <- c1
+ pinger.tryPing("second seen")
+ pinger.tryPing("third dropped")
+ <- c2
+ s2 := <- c1
+ <- c2
+
+ close(c2)
+ close(c1)
+ pinger.close()
+ g.TAssertEqual(s1, "first seen")
+ g.TAssertEqual(s2, "second seen")
+ })
+}
+
+func test_makeSubscriptionsFunc() {
+ g.TestStart("makeSubscriptionsFunc()")
+
+ g.Testing("we can have multiple readers", func() {
+ subscriptions := makeSubscriptionsFuncs()
+
+ var (
+ readStarted sync.WaitGroup
+ readFinished sync.WaitGroup
+ )
+ c := make(chan struct{})
+ readFn := func(subscriptionsSetM) error {
+ readStarted.Done()
+ <- c
+ return nil
+ }
+ addReader := func() {
+ readStarted.Add(1)
+ readFinished.Add(1)
+ go func() {
+ subscriptions.read(readFn)
+ readFinished.Done()
+ }()
+ }
+
+ addReader()
+ addReader()
+ addReader()
+ readStarted.Wait()
+ c <- struct{}{}
+ c <- struct{}{}
+ c <- struct{}{}
+ readFinished.Wait()
+ })
+
+ g.Testing("writers are exclusive", func() {
+ subscriptions := makeSubscriptionsFuncs()
+
+ var (
+ readStarted sync.WaitGroup
+ readFinished sync.WaitGroup
+ writeWillStart sync.WaitGroup
+ writeFinished sync.WaitGroup
+ )
+ c := make(chan string)
+ readFn := func(subscriptionsSetM) error {
+ readStarted.Done()
+ c <- "reader"
+ return nil
+ }
+ addReader := func() {
+ readStarted.Add(1)
+ readFinished.Add(1)
+ go func() {
+ subscriptions.read(readFn)
+ readFinished.Done()
+ }()
+ }
+
+ writeFn := func(subscriptionsSetM) error {
+ c <- "writer"
+ return nil
+ }
+ addWriter := func() {
+ writeWillStart.Add(1)
+ writeFinished.Add(1)
+ go func() {
+ writeWillStart.Done()
+ subscriptions.write(writeFn)
+ writeFinished.Done()
+ }()
+ }
+
+ addReader()
+ addReader()
+ addReader()
+ readStarted.Wait()
+ addWriter()
+ writeWillStart.Wait()
+
+ g.TAssertEqual(<-c, "reader")
+ g.TAssertEqual(<-c, "reader")
+ g.TAssertEqual(<-c, "reader")
+
+ readFinished.Wait()
+ g.TAssertEqual(<-c, "writer")
+ writeFinished.Wait()
+ })
+}
+
+func test_makeNotifyFn() {
+ g.TestStart("makeNotifyFn()")
+
+ g.Testing("when topic is nil only top pinger gets pinged", func() {
+ pinger1 := newPinger[struct{}]()
+ pinger2 := newPinger[[]byte]()
+ defer pinger1.close()
+ defer pinger2.close()
+
+ go pinger1.onPing(func(struct{}) {
+ panic("consumer pinger")
+ })
+ go pinger2.onPing(func(payload []byte) {
+ panic("waiter pinger")
+ })
+
+ flowID := guuid.New()
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer-1": consumerT{
+ pinger: pinger1,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter-1": waiterT{
+ pinger: pinger2,
+ },
+ },
+ },
+ },
+ }
+ subsFn := func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ }
+ topPinger := newPinger[struct{}]()
+ defer topPinger.close()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go topPinger.onPing(func(struct{}) {
+ wg.Done()
+ })
+
+ notifyFn := makeNotifyFn(subsFn, topPinger)
+
+ message := messageT{
+ uuid: guuid.New(),
+ topic: "nobody is subscribed to this one",
+ payload: []byte("nobody with get this payload"),
+ }
+ notifyFn(message)
+ wg.Wait()
+ })
+
+ g.Testing("otherwise all interested subscribers get pinged", func() {
+ const topic = "the topic name"
+
+ pinger1 := newPinger[struct{}]()
+ pinger2 := newPinger[[]byte]()
+ defer pinger1.close()
+ defer pinger2.close()
+
+ var wg sync.WaitGroup
+ go pinger1.onPing(func(struct{}) {
+ wg.Done()
+ })
+ go pinger2.onPing(func([]byte) {
+ wg.Done()
+ })
+ wg.Add(2)
+
+ flowID := guuid.New()
+
+ set := subscriptionsSetM{
+ topic: topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer-1": consumerT{
+ pinger: pinger1,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter-1": waiterT{
+ pinger: pinger2,
+ },
+ },
+ },
+ },
+ }
+
+ subsFn := func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ }
+
+ topPinger := newPinger[struct{}]()
+ defer topPinger.close()
+ go topPinger.onPing(func(struct{}) {
+ wg.Done()
+ })
+ wg.Add(1)
+
+ notifyFn := makeNotifyFn(subsFn, topPinger)
+
+ message := messageT{
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: flowID,
+ payload: []byte("ignored in this test"),
+ }
+ notifyFn(message)
+ wg.Wait()
+ })
+}
+
+func test_collectClosedWaiters() {
+ g.TestStart("collectClosedWaiter()")
+
+ g.Testing("collects all the reports to be closed", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+ flowID4 := guuid.New()
+ flowID5 := guuid.New()
+
+ mkwaiter := func(closed bool) waiterT {
+ fn := func() bool {
+ return closed
+ }
+ return waiterT{
+ closed: &fn,
+ }
+ }
+
+ set := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": mkwaiter(false),
+ "waiter-2": mkwaiter(true),
+ "waiter-3": mkwaiter(true),
+ },
+ flowID2: map[string]waiterT{
+ "waiter-4": mkwaiter(true),
+ "waiter-5": mkwaiter(false),
+ },
+ },
+ },
+ "topic-2": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID3: map[string]waiterT{
+ "waiter-1": mkwaiter(false),
+ "waiter-2": mkwaiter(false),
+ },
+ flowID4: map[string]waiterT{
+ "waiter-3": mkwaiter(true),
+ "waiter-4": mkwaiter(true),
+ "waiter-5": mkwaiter(true),
+ "waiter-6": mkwaiter(true),
+ },
+ },
+ },
+ "topic-3": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID5: map[string]waiterT{},
+ },
+ },
+ "topic-4": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := map[string]map[guuid.UUID][]string{
+ "topic-1": map[guuid.UUID][]string{
+ flowID1: []string{
+ "waiter-2",
+ "waiter-3",
+ },
+ flowID2: []string{
+ "waiter-4",
+ },
+ },
+ "topic-2": map[guuid.UUID][]string{
+ flowID3: []string{},
+ flowID4: []string{
+ "waiter-3",
+ "waiter-4",
+ "waiter-5",
+ "waiter-6",
+ },
+ },
+ "topic-3": map[guuid.UUID][]string{
+ flowID5: []string{},
+ },
+ "topic-4": map[guuid.UUID][]string{},
+ }
+
+ given := collectClosedWaiters(set)
+
+ // sort names for equality
+ for _, waitersIndex := range given {
+ for _, names := range waitersIndex {
+ sort.Strings(names)
+ }
+ }
+ g.TAssertEqual(given, expected)
+ })
+}
+
+func test_trimEmptyLeaves() {
+ g.TestStart("trimEmptyLeaves()")
+
+ g.Testing("noop on an empty index", func() {
+ input := map[string]map[guuid.UUID][]string{}
+ expected := map[string]map[guuid.UUID][]string{}
+
+ trimEmptyLeaves(input)
+ g.TAssertEqual(input, expected)
+ })
+
+ g.Testing("simplifies tree when it can", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+ flowID4 := guuid.New()
+
+ input := map[string]map[guuid.UUID][]string{
+ "topic-1": map[guuid.UUID][]string{
+ flowID1: []string{
+ "waiter-1",
+ },
+ flowID2: []string{},
+ },
+ "topic-2": map[guuid.UUID][]string{
+ flowID3: []string{},
+ flowID4: []string{},
+ },
+ "topic-3": map[guuid.UUID][]string{},
+ }
+
+ expected := map[string]map[guuid.UUID][]string{
+ "topic-1": map[guuid.UUID][]string{
+ flowID1: []string{
+ "waiter-1",
+ },
+ },
+ }
+
+ trimEmptyLeaves(input)
+ g.TAssertEqual(input, expected)
+ })
+
+ g.Testing("fully prune tree if possible", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+ flowID4 := guuid.New()
+ flowID5 := guuid.New()
+
+ input := map[string]map[guuid.UUID][]string{
+ "topic-1": map[guuid.UUID][]string{},
+ "topic-2": map[guuid.UUID][]string{},
+ "topic-3": map[guuid.UUID][]string{},
+ "topic-4": map[guuid.UUID][]string{
+ flowID1: []string{},
+ },
+ "topic-5": map[guuid.UUID][]string{
+ flowID2: []string{},
+ flowID3: []string{},
+ flowID4: []string{},
+ flowID5: []string{},
+ },
+ }
+
+ expected := map[string]map[guuid.UUID][]string{}
+
+ trimEmptyLeaves(input)
+ g.TAssertEqual(input, expected)
+ })
+}
+
+func test_deleteIfEmpty() {
+ g.TestStart("deleteIfEmpty()")
+
+ g.Testing("noop if there are consumers", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ },
+ }
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ },
+ }
+ expected2 := subscriptionsSetM{}
+
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected1)
+
+ delete(set["topic"].consumers, "consumer")
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected2)
+ })
+
+ g.Testing("noop if there are waiters", func() {
+ flowID := guuid.New()
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: nil,
+ },
+ },
+ }
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: nil,
+ },
+ },
+ }
+ expected2 := subscriptionsSetM{}
+
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected1)
+
+ delete(set["topic"].waiters, flowID)
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected2)
+ })
+
+ g.Testing("otherwise deletes when empty", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{}
+
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("unrelated topics are left untouched", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ deleteIfEmpty(set, "another-topic")
+ g.TAssertEqual(set, expected)
+ })
+}
+
+func test_deleteEmptyTopics() {
+ g.TestStart("deleteEmptyTopics()")
+
+ g.Testing("cleans up all empty topics from the set", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+
+ set := subscriptionsSetM{
+ "empty": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ "has-consumers": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ "has-waiters": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: nil,
+ },
+ },
+ "has-both": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID2: nil,
+ },
+ },
+ "has-neither": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "has-consumers": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ "has-waiters": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: nil,
+ },
+ },
+ "has-both": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID2: nil,
+ },
+ },
+ }
+
+ deleteEmptyTopics(set)
+ g.TAssertEqual(set, expected)
+ })
+}
+
+func test_removeClosedWaiter() {
+ g.TestStart("removeClosedWaiter()")
+
+ g.Testing("removes from set all that we request", func() {
+ flowID0 := guuid.New()
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+
+ set := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": waiterT{},
+ "waiter-2": waiterT{},
+ },
+ flowID2: map[string]waiterT{
+ "waiter-3": waiterT{},
+ "waiter-4": waiterT{},
+ "waiter-5": waiterT{},
+ },
+ },
+ },
+ "topic-2": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID3: map[string]waiterT{
+ "waiter-6": waiterT{},
+ "waiter-7": waiterT{},
+ "waiter-8": waiterT{},
+ },
+ },
+ },
+ "topic-3": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ input := map[string]map[guuid.UUID][]string{
+ "topic-0": map[guuid.UUID][]string{
+ flowID0: []string{
+ "waiter-0",
+ },
+ },
+ "topic-1": map[guuid.UUID][]string{
+ flowID1: []string{
+ "waiter-2",
+ },
+ flowID2: []string{
+ "waiter-3",
+ "waiter-4",
+ "waiter-5",
+ },
+ },
+ "topic-2": map[guuid.UUID][]string{
+ flowID3: []string{
+ "waiter-6",
+ "waiter-7",
+ "waiter-8",
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": waiterT{},
+ },
+ },
+ },
+ }
+
+ removeClosedWaiters(set, input)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("empty flowIDs from input GET LEAKED", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+
+ input := map[string]map[guuid.UUID][]string{
+ "topic-2": map[guuid.UUID][]string{
+ flowID2: []string{
+ "waiter",
+ },
+ },
+ }
+
+ set := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{},
+ },
+ },
+ "topic-2": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID2: map[string]waiterT{
+ "waiter": waiterT{},
+ },
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{},
+ },
+ },
+ }
+
+ removeClosedWaiters(set, input)
+ g.TAssertEqual(set, expected)
+ })
+}
+
+func test_reapClosedWaiters() {
+ g.TestStart("reapClosedWaiters()")
+
+ g.Testing("trimEmptyLeaves() prevents removal of empty flows", func() {
+ var (
+ set subscriptionsSetM
+ readCount = 0
+ writeCount = 0
+ )
+ readFn := func(fn func(subscriptionsSetM) error) error {
+ readCount++
+ return fn(set)
+ }
+ writeFn := func(fn func(subscriptionsSetM) error) error {
+ writeCount++
+ return fn(set)
+ }
+
+ openFn := func() bool {
+ return false
+ }
+ closedFn := func() bool {
+ return true
+ }
+ open := waiterT{ closed: &openFn }
+ closed := waiterT{ closed: &closedFn }
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+
+ set = subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": open,
+ "waiter-2": open,
+ "waiter-3": open,
+ },
+ flowID2: map[string]waiterT{
+ "waiter-4": open,
+ },
+ },
+ },
+ }
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": open,
+ "waiter-2": open,
+ "waiter-3": open,
+ },
+ flowID2: map[string]waiterT{
+ "waiter-4": open,
+ },
+ },
+ },
+ }
+
+ expected2 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-2": open,
+ "waiter-3": open,
+ },
+ flowID2: map[string]waiterT{
+ "waiter-4": open,
+ },
+ },
+ },
+ }
+
+ expected3 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-2": open,
+ "waiter-3": open,
+ },
+ },
+ },
+ }
+
+ reapClosedWaiters(readFn, writeFn)
+ g.TAssertEqual(readCount, 1)
+ g.TAssertEqual(writeCount, 0)
+ g.TAssertEqual(set, expected1)
+
+ set["topic"].waiters[flowID1]["waiter-1"] = closed
+ reapClosedWaiters(readFn, writeFn)
+ g.TAssertEqual(readCount, 2)
+ g.TAssertEqual(writeCount, 1)
+ g.TAssertEqual(set, expected2)
+
+ set["topic"].waiters[flowID2]["waiter-4"] = closed
+ reapClosedWaiters(readFn, writeFn)
+ g.TAssertEqual(readCount, 3)
+ g.TAssertEqual(writeCount, 2)
+ g.TAssertEqual(set, expected3)
+ })
+}
+
+func test_everyNthCall() {
+ g.TestStart("everyNthCall()")
+
+ g.Testing("0 (incorrect) would make fn never be called", func() {
+ never := everyNthCall[int](0, func(int) {
+ panic("unreachable")
+ })
+ for i := 0; i < 100; i++ {
+ never(i)
+ }
+ })
+
+ g.Testing("the first call is delayed to be the nth", func() {
+ count := 0
+ values := []string{}
+ fn := everyNthCall[string](3, func(s string) {
+ count++
+ values = append(values, s)
+ })
+
+ fn("1 ignored")
+ g.TAssertEqual(count, 0)
+ fn("2 ignored")
+ g.TAssertEqual(count, 0)
+ fn("3 not ignored")
+ g.TAssertEqual(count, 1)
+
+ fn("4 ignored")
+ fn("5 ignored")
+ g.TAssertEqual(count, 1)
+
+ fn("6 not ignored")
+ fn("7 ignored")
+ fn("8 ignored")
+ g.TAssertEqual(count, 2)
+
+ fn("9 not ignored")
+ g.TAssertEqual(count, 3)
+
+ expected := []string{
+ "3 not ignored",
+ "6 not ignored",
+ "9 not ignored",
+ }
+ g.TAssertEqual(values, expected)
+ })
+}
+
+func test_runReaper() {
+ g.TestStart("runReaper()")
+
+ g.Testing("debounce reapClosedWaiters `reaperSkipCount` times", func() {
+ set := subscriptionsSetM{}
+
+ var (
+ readCount = 0
+ writeCount = 0
+ )
+ readFn := func(fn func(subscriptionsSetM) error) error {
+ readCount++
+ return fn(set)
+ }
+ writeFn := func(fn func(subscriptionsSetM) error) error {
+ writeCount++
+ return fn(set)
+ }
+
+ var iterCount int
+ onPing := func(fn func(struct{})) {
+ for i := 0; i < iterCount; i++ {
+ fn(struct{}{})
+ }
+ }
+
+ iterCount = reaperSkipCount - 1
+ runReaper(onPing, readFn, writeFn)
+ g.TAssertEqual(readCount, 0)
+ g.TAssertEqual(writeCount, 0)
+
+ iterCount = reaperSkipCount
+ runReaper(onPing, readFn, writeFn)
+ g.TAssertEqual(readCount, 1)
+ g.TAssertEqual(writeCount, 0)
+ })
+}
+
+func test_NewWithPrefix() {
+ g.TestStart("NewWithPrefix()")
+
+ g.Testing("we get an error with a bad prefix", func() {
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ _, err = NewWithPrefix(db, "a bad prefix")
+ g.TAssertEqual(err, g.ErrBadSQLTablePrefix)
+ })
+
+ g.Testing("otherwise we have a queueT and no errors", func() {
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := NewWithPrefix(db, "good")
+ g.TErrorIf(err)
+ queue.Close()
+
+ g.TAssertEqual(queue.(queueT).pinger.closed(), true)
+ })
+}
+
+func test_New() {
+ g.TestStart("New()")
+
+ g.Testing("smoke test that we get a queueT", func() {
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ queue.Close()
+
+ g.TAssertEqual(queue.(queueT).pinger.closed(), true)
+ })
+}
+
+func test_asPublicMessage() {
+ g.TestStart("asPublicMessage()")
+
+ g.Testing("it picks the correct fields 🤷", func() {
+ input := messageT{
+ uuid: guuid.New(),
+ timestamp: time.Now(),
+ topic: "topic",
+ flowID: guuid.New(),
+ payload: []byte("payload"),
+ }
+
+ expected := Message{
+ ID: input.uuid,
+ Timestamp: input.timestamp,
+ Topic: input.topic,
+ FlowID: input.flowID,
+ Payload: input.payload,
+ }
+
+ given := asPublicMessage(input)
+ g.TAssertEqual(given, expected)
+ })
+}
+
+func test_queueT_Publish() {
+ g.TestStart("queueT.Publish()")
+
+ const (
+ topic = "queueT.Publish() topic"
+ payloadStr = "queueT.Publish() payload"
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ defer queue.Close()
+
+
+ g.Testing("it just publishes and returns", func() {
+ message, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+
+ g.TAssertEqual(message.Timestamp == time.Time{}, false)
+ g.TAssertEqual(message.Topic, topic)
+ g.TAssertEqual(message.FlowID, flowID)
+ g.TAssertEqual(message.Payload, payload)
+ })
+
+ queue.Close()
+ g.TAssertEqual(queue.(queueT).pinger.closed(), true)
+}
+
+func test_registerConsumerFn() {
+ g.TestStart("registerConsumerFn()")
+
+ g.Testing("adds a new topicSubscriptionT{} if needed", func() {
+ consumer := consumerT{
+ data: consumerDataT{
+ topic: "topic",
+ name: "consumer",
+ },
+ }
+
+ set := subscriptionsSetM{}
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumer,
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ registerConsumerFn(consumer)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("otherwise it just uses what exists", func() {
+ flowID := guuid.New()
+
+ consumer := consumerT{
+ data: consumerDataT{
+ topic: "topic",
+ name: "consumer",
+ },
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "other-consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{},
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumer,
+ "other-consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{},
+ },
+ },
+ }
+
+ registerConsumerFn(consumer)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("overwrites existing consumer if desired", func() {
+ close1 := func() {}
+ consumer1 := consumerT{
+ data: consumerDataT{
+ topic: "topic",
+ name: "consumer",
+ },
+ close: &close1,
+ }
+
+ close2 := func() {}
+ consumer2 := consumerT{
+ data: consumerDataT{
+ topic: "topic",
+ name: "consumer",
+ },
+ close: &close2,
+ }
+
+ set := subscriptionsSetM{}
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumer1,
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected2 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumer2,
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ registerConsumerFn(consumer1)(set)
+ g.TAssertEqual(set, expected1)
+ g.TAssertEqual(reflect.DeepEqual(set, expected2), false)
+
+ registerConsumerFn(consumer2)(set)
+ g.TAssertEqual(set, expected2)
+ g.TAssertEqual(reflect.DeepEqual(set, expected1), false)
+ })
+}
+
+func test_registerWaiterFn() {
+ g.TestStart("registerWaiterFn()")
+
+ g.Testing("adds a new topicSubscriptionT{} if needed", func() {
+ flowID := guuid.New()
+
+ waiter := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ }
+
+ set := subscriptionsSetM{}
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter,
+ },
+ },
+ },
+ }
+
+ registerWaiterFn(waiter)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("adds a new waiters map if needed", func() {
+ flowID := guuid.New()
+
+ waiter := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter,
+ },
+ },
+ },
+ }
+
+ registerWaiterFn(waiter)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("otherwise it just uses what exists", func() {
+ flowID := guuid.New()
+
+ waiter := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "other-waiter": waiterT{},
+ },
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter,
+ "other-waiter": waiterT{},
+ },
+ },
+ },
+ }
+
+ registerWaiterFn(waiter)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("overwrites existing waiter if desired", func() {
+ flowID := guuid.New()
+
+ close1 := func() {}
+ waiter1 := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ close: &close1,
+ }
+
+ close2 := func() {}
+ waiter2 := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ close: &close2,
+ }
+
+ set := subscriptionsSetM{}
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter1,
+ },
+ },
+ },
+ }
+
+ expected2 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter2,
+ },
+ },
+ },
+ }
+
+ registerWaiterFn(waiter1)(set)
+ g.TAssertEqual(set, expected1)
+ g.TAssertEqual(reflect.DeepEqual(set, expected2), false)
+
+ registerWaiterFn(waiter2)(set)
+ g.TAssertEqual(set, expected2)
+ g.TAssertEqual(reflect.DeepEqual(set, expected1), false)
+ })
+}
+
+func test_makeConsumeOneFn() {
+ g.TestStart("makeConsumeOneFn()")
+
+ savedLogger := slog.Default()
+
+ s := new(strings.Builder)
+ g.SetLoggerOutput(s)
+
+ var (
+ successCount int
+ errorCount int
+ callbackErr error
+ successFnErr error
+ errorFnErr error
+ messages []Message
+ successNames []string
+ successIDs []guuid.UUID
+ errorNames []string
+ errorIDs []guuid.UUID
+ deadletterIDs []guuid.UUID
+ )
+
+ data := consumerDataT{
+ topic: "topic",
+ name: "name",
+ }
+
+ callback := func(message Message) error {
+ messages = append(messages, message)
+ return callbackErr
+ }
+
+ successFn := func(name string, messageID guuid.UUID) error {
+ successCount++
+ successNames = append(successNames, name)
+ successIDs = append(successIDs, messageID)
+ return successFnErr
+ }
+
+ errorFn := func(
+ name string,
+ messageID guuid.UUID,
+ deadletterID guuid.UUID,
+ ) error {
+ errorCount++
+ errorNames = append(errorNames, name)
+ errorIDs = append(errorIDs, messageID)
+ deadletterIDs = append(deadletterIDs, deadletterID)
+
+ return errorFnErr
+ }
+
+ consumeOneFn := makeConsumeOneFn(
+ data,
+ callback,
+ successFn,
+ errorFn,
+ )
+
+ message1 := messageT{ uuid: guuid.New() }
+ message2 := messageT{ uuid: guuid.New() }
+ message3 := messageT{ uuid: guuid.New() }
+ message4 := messageT{ uuid: guuid.New() }
+
+
+ g.Testing("error from successFn() is propagated", func() {
+ err := consumeOneFn(message1)
+ g.TErrorIf(err)
+ g.TAssertEqual(successCount, 1)
+ g.TAssertEqual(errorCount, 0)
+
+ successFnErr = errors.New("successFn() error")
+ err = consumeOneFn(message2)
+ g.TAssertEqual(err, successFnErr)
+ g.TAssertEqual(successCount, 2)
+ g.TAssertEqual(errorCount, 0)
+
+ g.TAssertEqual(s.String(), "")
+ })
+
+ g.Testing("error from callback() is logged and dropped", func() {
+ *s = strings.Builder{}
+
+ callbackErr = errors.New("callback() error")
+ err := consumeOneFn(message3)
+ g.TErrorIf(err)
+ g.TAssertEqual(successCount, 2)
+ g.TAssertEqual(errorCount, 1)
+ g.TAssertEqual(s.String() == "", false)
+ })
+
+ g.Testing("error from errorFn() is propagated", func() {
+ *s = strings.Builder{}
+
+ errorFnErr = errors.New("errorFn() error")
+ err := consumeOneFn(message4)
+ g.TAssertEqual(err, errorFnErr)
+ g.TAssertEqual(successCount, 2)
+ g.TAssertEqual(errorCount, 2)
+ g.TAssertEqual(s.String() == "", false)
+ })
+
+ g.Testing("calls happened with the expected arguments", func() {
+ expectedMessages := []Message{
+ asPublicMessage(message1),
+ asPublicMessage(message2),
+ asPublicMessage(message3),
+ asPublicMessage(message4),
+ }
+
+ g.TAssertEqual(messages, expectedMessages)
+ g.TAssertEqual(successNames, []string{ "name", "name" })
+ g.TAssertEqual(errorNames, []string{ "name", "name" })
+ g.TAssertEqual(successIDs, []guuid.UUID{
+ message1.uuid,
+ message2.uuid,
+ })
+ g.TAssertEqual(errorIDs, []guuid.UUID{
+ message3.uuid,
+ message4.uuid,
+ })
+ g.TAssertEqual(deadletterIDs[0] == message3.uuid, false)
+ g.TAssertEqual(deadletterIDs[1] == message4.uuid, false)
+ })
+
+ slog.SetDefault(savedLogger)
+}
+
+func test_makeConsumeAllFn() {
+ g.TestStart("makeConsumeAllFn()")
+
+ savedLogger := slog.Default()
+
+ s := new(strings.Builder)
+ g.SetLoggerOutput(s)
+
+ data := consumerDataT{}
+
+ consumeOneFn := func(messageT) error {
+ return nil
+ }
+
+ var eachFnErr error
+ eachFn := func(string, string, func(messageT) error) error {
+ return eachFnErr
+ }
+
+ consumeAllFn := makeConsumeAllFn(data, consumeOneFn, eachFn)
+
+
+ g.Testing("silent when eachFn() returns no error", func() {
+ consumeAllFn(struct{}{})
+ g.TAssertEqual(s.String(), "")
+ })
+
+ g.Testing("logs warning otherwise", func() {
+ eachFnErr = errors.New("eachFn() error")
+ consumeAllFn(struct{}{})
+ g.TAssertEqual(s.String() == "", false)
+ })
+
+ slog.SetDefault(savedLogger)
+}
+
+func test_makeWaitFn() {
+ g.TestStart("makeWaitFn()")
+
+ g.Testing("all it does is send the data and close things", func() {
+ callCount := 0
+ closeFn := func() {
+ callCount++
+ }
+
+ c := make(chan []byte, 1)
+ waitFn := makeWaitFn(c, closeFn)
+
+ payload := []byte("payload")
+ waitFn(payload)
+ given := <- c
+
+ g.TAssertEqual(given, payload)
+ g.TAssertEqual(callCount, 1)
+ })
+
+ g.Testing("you can call it twice for cases of race condition", func() {
+ callCount := 0
+ closeFn := func() {
+ callCount++
+ }
+
+ c := make(chan []byte, 1)
+ waitFn := makeWaitFn(c, closeFn)
+
+ payload := []byte("something")
+ waitFn(payload)
+ waitFn(payload)
+ given1 := <- c
+ given2 := <- c
+
+ g.TAssertEqual(given1, payload)
+ g.TAssertEqual(given2, []byte(nil))
+ })
+}
+
+func test_runConsumer() {
+ g.TestStart("runConsumer()")
+
+ g.Testing("calls consumeAllFn() at least one", func() {
+ onPing := func(fn func(struct{})) {}
+
+ count := 0
+ consumeAllFn := func(struct{}) {
+ count++
+ }
+
+ runConsumer(onPing, consumeAllFn)
+ g.TAssertEqual(count, 1)
+ })
+
+ g.Testing("can call consumeAllFn() (onPing + 1) times", func() {
+ onPing := func(fn func(struct{})) {
+ fn(struct{}{})
+ fn(struct{}{})
+ fn(struct{}{})
+ }
+
+ count := 0
+ consumeAllFn := func(struct{}) {
+ count++
+ }
+
+ runConsumer(onPing, consumeAllFn)
+ g.TAssertEqual(count, 4)
+ })
+}
+
+func test_tryFinding() {
+ g.TestStart("tryFinding()")
+
+ g.Testing("noop in case of failure", func() {
+ myErr := errors.New("find() error")
+ findFn := func(string, guuid.UUID) (messageT, error) {
+ return messageT{}, myErr
+ }
+
+ count := 0
+ waitFn := func([]byte) {
+ count++
+ }
+
+
+ tryFinding(findFn, "topic", guuid.New(), waitFn)
+ g.TAssertEqual(count, 0)
+ })
+
+ g.Testing("calls waitFn in case of success", func() {
+ payload := []byte("find() payload")
+
+ findFn := func(string, guuid.UUID) (messageT, error) {
+ return messageT{ payload: payload }, nil
+ }
+
+ payloads := [][]byte{}
+ waitFn := func(payload []byte) {
+ payloads = append(payloads, payload)
+ }
+
+
+ tryFinding(findFn, "topic", guuid.New(), waitFn)
+ g.TAssertEqual(payloads, [][]byte{ payload })
+ })
+}
+
+func test_queueT_Subscribe() {
+ g.TestStart("queueT.Subscribe()")
+
+ set := subscriptionsSetM{}
+ consumed := []guuid.UUID{}
+ messages := []messageT{
+ messageT{ uuid: guuid.New() },
+ messageT{ uuid: guuid.New() },
+ }
+
+ var takeErr error
+ queue := queueT{
+ queries: queriesT{
+ take: func(string, string) error {
+ return takeErr
+ },
+ pending: func(
+ topic string,
+ consumer string,
+ fn func(messageT) error,
+ ) error {
+ for _, message := range messages {
+ consumed = append(
+ consumed,
+ message.uuid,
+ )
+ _ = fn(message)
+ }
+ return nil
+ },
+ commit: func(
+ consumer string,
+ messageID guuid.UUID,
+ ) error {
+ return nil
+ },
+ toDead: func(string, guuid.UUID, guuid.UUID) error {
+ g.Unreachable()
+ return nil
+ },
+ },
+ subscriptions: subscriptionsT{
+ write: func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ },
+ },
+ }
+
+
+ g.Testing("registers our callback in the set and calls it", func() {
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ queue.Subscribe("topic", "consumer-1", func(Message) error {
+ wg.Done()
+ return nil
+ })
+ defer queue.Unsubscribe("topic", "consumer-1")
+
+ wg.Wait()
+ g.TAssertEqual(consumed, []guuid.UUID{
+ messages[0].uuid,
+ messages[1].uuid,
+ })
+ })
+
+ g.Testing("our callback also gets called when pinged", func() {
+ consumed = []guuid.UUID{}
+
+ var wg sync.WaitGroup
+ wg.Add(4)
+
+ queue.Subscribe("topic", "consumer-2", func(m Message) error {
+ wg.Done()
+ return nil
+ })
+ defer queue.Unsubscribe("topic", "consumer-2")
+
+ consumer := set["topic"].consumers["consumer-2"]
+ consumer.pinger.tryPing(struct{}{})
+
+ wg.Wait()
+
+ g.TAssertEqual(consumed, []guuid.UUID{
+ messages[0].uuid,
+ messages[1].uuid,
+ messages[0].uuid,
+ messages[1].uuid,
+ })
+ })
+
+ g.Testing("we try to own the topic", func() {
+ takeErr = errors.New("queueT.Subscribe() 1")
+
+ err := queue.Subscribe("topic", "name", func(Message) error {
+ g.Unreachable()
+ return nil
+ })
+
+ g.TAssertEqual(err, takeErr)
+ takeErr = nil
+ })
+
+ g.Testing("if we can't own the topic, we don't get registered", func() {
+ takeErr = errors.New("queueT.Subscribe() 2")
+
+ err := queue.Subscribe("topic", "name", func(Message) error {
+ g.Unreachable()
+ return nil
+ })
+ g.TErrorNil(err)
+
+ expected := subscriptionsSetM{}
+ g.TAssertEqual(set, expected)
+
+ takeErr = nil
+ })
+}
+
+func test_queueT_WaitFor() {
+ g.TestStart("queueT.WaitFor()")
+
+ findErr := errors.New("find() error")
+ message := messageT{
+ payload: []byte("queueT.WaitFor() payload"),
+ }
+
+ set := subscriptionsSetM{}
+
+ queue := queueT{
+ queries: queriesT{
+ find: func(
+ topic string,
+ flowID guuid.UUID,
+ ) (messageT, error) {
+ return message, findErr
+ },
+ },
+ subscriptions: subscriptionsT{
+ write: func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ },
+ read: func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ },
+ },
+ }
+
+
+ g.Testing("registers the waiter in the set", func() {
+ flowID := guuid.New()
+
+ defer queue.WaitFor("topic", flowID, "waiter-1").Close()
+
+ expected := waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter-1",
+ }
+
+ g.TAssertEqual(
+ set["topic"].waiters[flowID]["waiter-1"].data,
+ expected,
+ )
+ })
+
+ g.Testing("the channel gets a message when waiter is pinged", func() {
+ flowID := guuid.New()
+ payload := []byte("sent payload")
+
+ w := queue.WaitFor("topic", flowID, "waiter-2")
+ defer w.Close()
+
+ waiter := set["topic"].waiters[flowID]["waiter-2"]
+ waiter.pinger.tryPing(payload)
+
+ given := <- w.Channel
+
+ g.TAssertEqual(given, payload)
+ g.TAssertEqual((*waiter.closed)(), true)
+ })
+
+ g.Testing("we can also WaitFor() after publishing the message", func() {
+ findErr = nil
+ flowID := guuid.New()
+
+ w := queue.WaitFor("topic", flowID, "waiter-3")
+ defer w.Close()
+
+ given := <- w.Channel
+
+ waiter := set["topic"].waiters[flowID]["waiter-3"]
+ waiter.pinger.tryPing([]byte("ignored"))
+
+ g.TAssertEqual(given, message.payload)
+ g.TAssertEqual((*waiter.closed)(), true)
+ })
+
+ g.Testing("if the data already exists we get it immediatelly", func() {
+ flowID := guuid.New()
+
+ w := queue.WaitFor("topic", flowID, "waiter-4")
+ defer w.Close()
+
+ waiter := set["topic"].waiters[flowID]["waiter-4"]
+ g.TAssertEqual((*waiter.closed)(), true)
+
+ given := <- w.Channel
+ g.TAssertEqual(given, message.payload)
+ })
+}
+
+func test_unsubscribeIfExistsFn() {
+ g.TestStart("unsubscribeIfExistsFn()")
+
+ g.Testing("noop on empty set", func() {
+ set := subscriptionsSetM{}
+ expected := subscriptionsSetM{}
+ unsubscribeIfExistsFn("topic", "consumer")(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("noop on missing topic", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{},
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{},
+ }
+
+ unsubscribeIfExistsFn("other-topic", "consumer")(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("noop on missing consumer", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ },
+ }
+
+ unsubscribeIfExistsFn("topic", "other-consumer")(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("closes consumer and removes it from set", func() {
+ flowID := guuid.New()
+
+ count := 0
+ close := func() {
+ count++
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{
+ close: &close,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{},
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{},
+ },
+ },
+ }
+
+ unsubscribeIfExistsFn("topic", "consumer")(set)
+ g.TAssertEqual(set, expected)
+ g.TAssertEqual(count, 1)
+ })
+
+ g.Testing("empty topics are also removed", func() {
+ count := 0
+ close := func() {
+ count++
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{
+ close: &close,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{}
+
+ unsubscribeIfExistsFn("topic", "consumer")(set)
+ g.TAssertEqual(set, expected)
+ g.TAssertEqual(count, 1)
+ })
+}
+
+func test_queueT_Unsubscribe() {
+ g.TestStart("queueT.Unsubscribe()")
+
+ g.Testing("calls unsubscribesIfExists() via writeFn()", func() {
+ closed := false
+ close := func() {
+ closed = true
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{
+ close: &close,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{}
+
+ queue := queueT{
+ subscriptions: subscriptionsT{
+ write: func(
+ fn func(subscriptionsSetM) error,
+ ) error {
+ return fn(set)
+ },
+ },
+ }
+
+ queue.Unsubscribe("topic", "consumer")
+ g.TAssertEqual(set, expected)
+ g.TAssertEqual(closed, true)
+ })
+}
+
+func test_cleanSubscriptions() {
+ g.TestStart("cleanSubscriptions()")
+
+ g.Testing("all consumers and waiters get close()'d", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+
+ type pairT struct{
+ closed func() bool
+ fn func()
+ }
+
+ mkclose := func() pairT {
+ closed := false
+ return pairT{
+ closed: func() bool {
+ return closed
+ },
+ fn: func() {
+ closed = true
+ },
+ }
+ }
+
+ c := mkclose()
+ g.TAssertEqual(c.closed(), false)
+ c.fn()
+ var x bool = c.closed()
+ g.TAssertEqual(x, true)
+
+ close1 := mkclose()
+ close2 := mkclose()
+ close3 := mkclose()
+ close4 := mkclose()
+ close5 := mkclose()
+ close6 := mkclose()
+ close7 := mkclose()
+ close8 := mkclose()
+
+ set := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer-1": consumerT{
+ close: &close1.fn,
+ },
+ "consumer-2": consumerT{
+ close: &close2.fn,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": waiterT{
+ close: &close3.fn,
+ },
+ },
+ flowID2: map[string]waiterT{
+ "waiter-2": waiterT{
+ close: &close4.fn,
+ },
+ "waiter-3": waiterT{
+ close: &close5.fn,
+ },
+ },
+ },
+ },
+ "topic-2": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer-3": consumerT{
+ close: &close6.fn,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID3: map[string]waiterT{
+ "waiter-4": waiterT{
+ close: &close7.fn,
+ },
+ },
+ },
+ },
+ "topic-3": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ cleanSubscriptions(set)
+
+ given := []bool{
+ close1.closed(),
+ close2.closed(),
+ close3.closed(),
+ close4.closed(),
+ close5.closed(),
+ close6.closed(),
+ close7.closed(),
+ close8.closed(),
+ }
+
+ expected := []bool{
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ false,
+ }
+
+ g.TAssertEqualI(given, expected)
+ })
+}
+
+func test_queueT_Close() {
+ g.TestStart("queueT.Close()")
+
+ g.Testing("clean pinger, subscriptions and queries", func() {
+ var (
+ pingerCount = 0
+ subscriptionsCount = 0
+ queriesCount = 0
+
+ subscriptionsErr = errors.New("subscriptionsT{} error")
+ queriesErr = errors.New("queriesT{} error")
+ )
+ queue := queueT{
+ queries: queriesT{
+ close: func() error{
+ queriesCount++
+ return queriesErr
+ },
+ },
+ subscriptions: subscriptionsT{
+ write: func(
+ func(subscriptionsSetM) error,
+ ) error {
+ subscriptionsCount++
+ return subscriptionsErr
+ },
+ },
+ pinger: pingerT[struct{}]{
+ close: func() {
+ pingerCount++
+ },
+ },
+ }
+
+ err := queue.Close()
+ g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr))
+ g.TAssertEqual(pingerCount, 1)
+ g.TAssertEqual(subscriptionsCount, 1)
+ g.TAssertEqual(queriesCount, 1)
+ })
+}
+
+
+func test_topicGetopt() {
+ g.TestStart("topicGetopt()")
+
+ g.Testing("checks for required positional argument", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{},
+ }
+
+ argsOut, ok := topicGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "Missing TOPIC.\n")
+ g.TAssertEqual(ok, false)
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("success otherwise", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"a topic"},
+ }
+
+ argsOut, ok := topicGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(ok, true)
+ argsIn.topic = "a topic"
+ g.TAssertEqual(argsOut, argsIn)
+ })
+}
+
+func test_topicConsumerGetopt() {
+ g.TestStart("topicConsumerGetopt()")
+
+ g.Testing("checks for TOPIC argument", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{},
+ }
+
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "Missing TOPIC.\n")
+ g.TAssertEqual(ok, false)
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("we get an error on unsupported flag", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"-Z"},
+ }
+
+ const message = "flag provided but not defined: -Z\n"
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), message)
+ g.TAssertEqual(ok, false)
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("we also get an error on incorrect usage of flags", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"-C"},
+ }
+
+ const message = "flag needs an argument: -C\n"
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), message)
+ g.TAssertEqual(ok, false)
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("we can customize the CONSUMER", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"-C", "custom consumer", "this topic"},
+ }
+
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(ok, true)
+ argsIn.topic = "this topic"
+ argsIn.consumer = "custom consumer"
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("otherwise we get the default one", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"T"},
+ }
+
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(ok, true)
+ argsIn.topic = "T"
+ argsIn.consumer = "default-consumer"
+ g.TAssertEqual(argsOut, argsIn)
+ })
+}
+
+func test_inExec() {
+ g.TestStart("inExec()")
+
+ const (
+ topic = "inExec topic"
+ payloadStr = "inExec payload"
+ )
+ var (
+ payload = []byte(payloadStr)
+ args = argsT{
+ topic: topic,
+ }
+ )
+
+ var (
+ publishErr error
+ messages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ publish: func(
+ unsent UnsentMessage,
+ messageID guuid.UUID,
+ ) (messageT, error) {
+ if publishErr != nil {
+ return messageT{}, publishErr
+ }
+
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: messageID,
+ topic: unsent.Topic,
+ flowID: unsent.FlowID,
+ payload: unsent.Payload,
+ }
+ messages = append(messages, message)
+ return message, nil
+ },
+ }
+
+
+ g.Testing("messageID to output when successful", func() {
+ r := bytes.NewReader(payload)
+ var w strings.Builder
+ rc, err := inExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(messages[0].topic, topic)
+ g.TAssertEqual(messages[0].payload, payload)
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(w.String(), messages[0].uuid.String() + "\n")
+ })
+
+ g.Testing("if reading fails, we return the error", func() {
+ var r *os.File
+ var w strings.Builder
+ rc, err := inExec(args, queries, r, &w)
+ g.TAssertEqual(
+ err.Error(),
+ "invalid argument",
+ )
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+
+ g.Testing("if publishing fails, we return the error", func() {
+ publishErr = errors.New("publish() error")
+ r := strings.NewReader("read but not published")
+ var w strings.Builder
+ rc, err := inExec(args, queries, r, &w)
+ g.TAssertEqual(err, publishErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_outExec() {
+ g.TestStart("outExec()")
+
+ const (
+ topic = "outExec topic"
+ consumer = "outExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ takeErr error
+ nextErr error
+ messages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ take: func(string, string) error {
+ return takeErr
+ },
+ next: func(string, string) (messageT, error) {
+ if nextErr != nil {
+ return messageT{}, nextErr
+ }
+
+ if len(messages) == 0 {
+ return messageT{}, sql.ErrNoRows
+ }
+
+ return messages[0], nil
+ },
+ }
+ pub := func(payload []byte) {
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+
+
+ g.Testing("exit code 1 when we can't take", func() {
+ takeErr = errors.New("outExec() take error")
+
+ var w strings.Builder
+
+ rc, err := outExec(args, queries, r, &w)
+ g.TAssertEqual(err, takeErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+
+ takeErr = nil
+ })
+
+ g.Testing("exit code 3 when no message is available", func() {
+ var w strings.Builder
+
+ rc, err := outExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 3)
+ })
+
+ g.Testing("we get the same message until we commit", func() {
+ var (
+ w1 strings.Builder
+ w2 strings.Builder
+ w3 strings.Builder
+ )
+ args := argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+
+ pub([]byte("first payload"))
+ pub([]byte("second payload"))
+
+ rc1, err1 := outExec(args, queries, r, &w1)
+ rc2, err2 := outExec(args, queries, r, &w2)
+ messages = messages[1:]
+ rc3, err3 := outExec(args, queries, r, &w3)
+
+ g.TErrorIf(g.SomeError(err1, err2, err3))
+ g.TAssertEqual(w1.String(), "first payload\n")
+ g.TAssertEqual(w2.String(), "first payload\n")
+ g.TAssertEqual(w3.String(), "second payload\n")
+ g.TAssertEqual(rc1, 0)
+ g.TAssertEqual(rc2, 0)
+ g.TAssertEqual(rc3, 0)
+ })
+
+ g.Testing("we propagate the error when the query fails", func() {
+ nextErr = errors.New("next() error")
+ var w strings.Builder
+ rc, err := outExec(args, queries, r, &w)
+ g.TAssertEqual(err, nextErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_commitExec() {
+ g.TestStart("commitExec()")
+
+ const (
+ topic = "commitExec topic"
+ consumer = "commitExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ takeErr error
+ nextErr error
+ commitErr error
+ messages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ take: func(string, string) error {
+ return takeErr
+ },
+ next: func(string, string) (messageT, error) {
+ if nextErr != nil {
+ return messageT{}, nextErr
+ }
+
+ if len(messages) == 0 {
+ return messageT{}, sql.ErrNoRows
+ }
+
+ return messages[0], nil
+ },
+ commit: func(string, guuid.UUID) error {
+ if commitErr != nil {
+ return commitErr
+ }
+
+ messages = messages[1:]
+ return nil
+ },
+ }
+ pub := func(payload []byte) {
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+
+
+ g.Testing("error when we can't take", func() {
+ takeErr = errors.New("commitExec() take error")
+
+ var w strings.Builder
+
+ rc, err := commitExec(args, queries, r, &w)
+ g.TAssertEqual(err, takeErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+
+ takeErr = nil
+ })
+
+ g.Testing("error when there is nothing to commit", func() {
+ var w strings.Builder
+
+ rc, err := commitExec(args, queries, r, &w)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+
+ g.Testing("messages get committed in order", func() {
+ var w strings.Builder
+
+ pub([]byte("first payload"))
+ pub([]byte("second payload"))
+ pub([]byte("third payload"))
+
+ message1 := messages[0]
+ g.TAssertEqual(message1.payload, []byte("first payload"))
+
+ rc, err := commitExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ message2 := messages[0]
+ g.TAssertEqual(message2.payload, []byte("second payload"))
+
+ rc, err = commitExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ message3 := messages[0]
+ g.TAssertEqual(message3.payload, []byte("third payload"))
+
+ rc, err = commitExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ g.TAssertEqual(len(messages), 0)
+ })
+
+ g.Testing("when next() query fails, we propagate its result", func() {
+ nextErr = errors.New("next() error")
+ var w strings.Builder
+ rc, err := commitExec(args, queries, r, &w)
+ g.TAssertEqual(err, nextErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ nextErr = nil
+ })
+
+ g.Testing("we also propagate the error on commit() failure", func() {
+ commitErr = errors.New("commit() error")
+ pub([]byte{})
+ var w strings.Builder
+ rc, err := commitExec(args, queries, r, &w)
+ g.TAssertEqual(err, commitErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_deadExec() {
+ g.TestStart("deadExec()")
+
+ const (
+ topic = "deadExec topic"
+ consumer = "deadExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ takeErr error
+ nextErr error
+ toDeadErr error
+ messages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ take: func(string, string) error {
+ return takeErr
+ },
+ next: func(string, string) (messageT, error) {
+ if nextErr != nil {
+ return messageT{}, nextErr
+ }
+
+ if len(messages) == 0 {
+ return messageT{}, sql.ErrNoRows
+ }
+
+ return messages[0], nil
+ },
+ toDead: func(
+ _ string,
+ _ guuid.UUID,
+ deadletterID guuid.UUID,
+ ) error {
+ if toDeadErr != nil {
+ return toDeadErr
+ }
+
+ messages = messages[1:]
+ return nil
+ },
+ }
+ pub := func(payload []byte) {
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+
+
+ g.Testing("error when we can't take", func() {
+ takeErr = errors.New("deadExec() take error")
+
+ var w strings.Builder
+
+ rc, err := deadExec(args, queries, r, &w)
+ g.TAssertEqual(err, takeErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+
+ takeErr = nil
+ })
+
+ g.Testing("error when there is nothing to mark as dead", func() {
+ var w strings.Builder
+
+ rc, err := deadExec(args, queries, r, &w)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+
+ g.Testing("the latest message becomes a deadletter", func() {
+ var w strings.Builder
+
+ pub([]byte("first payload"))
+ pub([]byte("second payload"))
+
+ message1 := messages[0]
+ g.TAssertEqual(message1.payload, []byte("first payload"))
+
+ rc, err := deadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ message2 := messages[0]
+ g.TAssertEqual(message2.payload, []byte("second payload"))
+
+ rc, err = deadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ g.TAssertEqual(len(messages), 0)
+ })
+
+ g.Testing("next() error is propagated", func() {
+ nextErr = errors.New("next() error")
+ var w strings.Builder
+ rc, err := deadExec(args, queries, r, &w)
+ g.TAssertEqual(err, nextErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ nextErr = nil
+ })
+
+ g.Testing("toDead() error is propagated", func() {
+ toDeadErr = errors.New("toDead() error")
+ pub([]byte{})
+ var w strings.Builder
+ rc, err := deadExec(args, queries, r, &w)
+ g.TAssertEqual(err, toDeadErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_listDeadExec() {
+ g.TestStart("listDeadExec()")
+
+ const (
+ topic = "listDeadExec topic"
+ consumer = "listDeadExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ messages []messageT
+ deadletters []deadletterT
+ id int64 = 0
+ errorIndex = -1
+ allDeadErr = errors.New("allDead() error")
+ )
+ queries := queriesT{
+ allDead: func(
+ _ string,
+ _ string,
+ callback func(deadletterT, messageT) error,
+ ) error {
+ for i, deadletter := range deadletters {
+ if i == errorIndex {
+ return allDeadErr
+ }
+
+ callback(deadletter, messageT{})
+ }
+
+ return nil
+ },
+ }
+ pub := func() {
+ payload := []byte("ignored payload for this test")
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+ commit := func() {
+ messages = messages[1:]
+ }
+ dead := func() {
+ message := messages[0]
+ now := time.Now()
+ deadletter := deadletterT{
+ uuid: guuid.New(),
+ timestamp: now,
+ consumer: consumer,
+ messageID: message.uuid,
+ }
+
+ messages = messages[1:]
+ deadletters = append(deadletters, deadletter)
+ }
+ replay := func() {
+ pub()
+ deadletters = deadletters[1:]
+ }
+
+
+ g.Testing("nothing is shown if topic is empty", func() {
+ var w strings.Builder
+
+ rc, err := listDeadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ })
+
+ g.Testing("deadletters are printed in order", func() {
+ var w strings.Builder
+
+ pub()
+ pub()
+ pub()
+
+ rc, err := listDeadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+
+ dead()
+ commit()
+ dead()
+
+ expected := fmt.Sprintf(
+ "%s\t%s\t%s\n%s\t%s\t%s\n",
+ deadletters[0].uuid.String(),
+ deadletters[0].timestamp.Format(time.RFC3339),
+ deadletters[0].consumer,
+ deadletters[1].uuid.String(),
+ deadletters[1].timestamp.Format(time.RFC3339),
+ deadletters[1].consumer,
+ )
+
+ rc, err = listDeadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), expected)
+ g.TAssertEqual(rc, 0)
+ })
+
+ g.Testing("deadletters disappear after being replayed", func() {
+ var (
+ w1 strings.Builder
+ w2 strings.Builder
+ )
+
+ replay()
+
+ deadletter := deadletters[0]
+ expected := fmt.Sprintf(
+ "%s\t%s\t%s\n",
+ deadletter.uuid.String(),
+ deadletter.timestamp.Format(time.RFC3339),
+ deadletter.consumer,
+ )
+
+ rc, err := listDeadExec(args, queries, r, &w1)
+ g.TErrorIf(err)
+ g.TAssertEqual(w1.String(), expected)
+ g.TAssertEqual(rc, 0)
+
+ replay()
+
+ rc, err = listDeadExec(args, queries, r, &w2)
+ g.TErrorIf(err)
+ g.TAssertEqual(w2.String(), "")
+ g.TAssertEqual(rc, 0)
+ })
+
+ g.Testing("a database failure interrupts the output", func() {
+ var w strings.Builder
+
+ pub()
+ pub()
+ pub()
+ dead()
+ dead()
+ dead()
+
+ deadletter := deadletters[0]
+ expected := fmt.Sprintf(
+ "%s\t%s\t%s\n",
+ deadletter.uuid.String(),
+ deadletter.timestamp.Format(time.RFC3339),
+ deadletter.consumer,
+ )
+
+ errorIndex = 1
+ rc, err := listDeadExec(args, queries, r, &w)
+ g.TAssertEqual(err, allDeadErr)
+ g.TAssertEqual(w.String(), expected)
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_replayExec() {
+ g.TestStart("replayExec()")
+
+ const (
+ topic = "replayExec topic"
+ consumer = "replayExec consumer"
+ )
+ var (
+ w strings.Builder
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ oneDeadErr error
+ replayErr error
+ messages []messageT
+ deadletters []deadletterT
+ deadMessages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ oneDead: func(string, string) (deadletterT, error) {
+ if oneDeadErr != nil {
+ return deadletterT{}, oneDeadErr
+ }
+
+ if len(deadletters) == 0 {
+ return deadletterT{}, sql.ErrNoRows
+ }
+
+ return deadletters[0], nil
+ },
+ replay: func(guuid.UUID, guuid.UUID) (messageT, error) {
+ if replayErr != nil {
+ return messageT{}, replayErr
+ }
+
+ message := deadMessages[0]
+ messages = append(messages, message)
+ deadletters = deadletters[1:]
+ deadMessages = deadMessages[1:]
+ return message, nil
+ },
+ }
+ pub := func(payload []byte) {
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+ commit := func() {
+ messages = messages[1:]
+ }
+ dead := func() {
+ message := messages[0]
+ deadletter := deadletterT{ uuid: guuid.New() }
+
+ messages = messages[1:]
+ deadletters = append(deadletters, deadletter)
+ deadMessages = append(deadMessages, message)
+ }
+ next := func() string {
+ return string(messages[0].payload)
+ }
+
+
+ g.Testing("error when there is nothing to replay", func() {
+ rc, err := replayExec(args, queries, r, &w)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+
+ pub([]byte("first payload"))
+ pub([]byte("second payload"))
+ pub([]byte("third payload"))
+ pub([]byte("fourth payload"))
+
+ rc, err = replayExec(args, queries, r, &w)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+
+ g.Testing("deadletters are replayed in order", func() {
+ dead()
+ commit()
+ dead()
+ commit()
+
+ rc, err := replayExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(next(), "first payload")
+
+ commit()
+
+ rc, err = replayExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(next(), "third payload")
+ })
+
+ g.Testing("oneDead() error is forwarded", func() {
+ oneDeadErr = errors.New("oneDead() error")
+ rc, err := replayExec(args, queries, r, &w)
+ g.TAssertEqual(err, oneDeadErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ oneDeadErr = nil
+ })
+
+ g.Testing("replay() error is also forwarded", func() {
+ pub([]byte{})
+ dead()
+
+ replayErr = errors.New("replay() error")
+ rc, err := replayExec(args, queries, r, &w)
+ g.TAssertEqual(err, replayErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_sizeExec() {
+ g.TestStart("sizeExec()")
+
+ const (
+ topic = "sizeExec topic"
+ consumer = "sizeExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var sizeErr error
+ queries := queriesT{
+ size: func(string) (int, error) {
+ if sizeErr != nil {
+ return -1, sizeErr
+ }
+
+ return 123, nil
+ },
+ }
+
+
+ g.Testing("it propagates the error when the query fails", func() {
+ sizeErr = errors.New("size() error")
+ var w strings.Builder
+ rc, err := sizeExec(args, queries, r, &w)
+ g.TAssertEqual(err, sizeErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ sizeErr = nil
+ })
+
+ g.Testing("otherwise it just prints what is was given", func() {
+ var w strings.Builder
+ rc, err := sizeExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "123\n")
+ g.TAssertEqual(rc, 0)
+ })
+}
+
+func test_countExec() {
+ g.TestStart("countExec()")
+
+ const (
+ topic = "countExec topic"
+ consumer = "countExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var countErr error
+ queries := queriesT{
+ count: func(string, string) (int, error) {
+ if countErr != nil {
+ return -1, countErr
+ }
+
+ return 2222, nil
+ },
+ }
+
+
+ g.Testing("it propagates the query error", func() {
+ countErr = errors.New("count() error")
+ var w strings.Builder
+ rc, err := countExec(args, queries, r, &w)
+ g.TAssertEqual(err, countErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ countErr = nil
+ })
+
+ g.Testing("otherwise it prints the given count", func() {
+ var w strings.Builder
+ rc, err := countExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "2222\n")
+ g.TAssertEqual(rc, 0)
+ })
+}
+
+func test_hasDataExec() {
+ g.TestStart("hasData()")
+
+ const (
+ topic = "hasData topic"
+ consumer = "hasData consumer"
+ )
+ var (
+ w strings.Builder
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ hasData := true
+ var hasDataErr error
+ queries := queriesT{
+ hasData: func(string, string) (bool, error) {
+ if hasDataErr != nil {
+ return false, hasDataErr
+ }
+
+ return hasData, nil
+ },
+ }
+
+
+ g.Testing("it propagates the query error", func() {
+ hasDataErr = errors.New("hasData() error")
+ rc, err := hasDataExec(args, queries, r, &w)
+ g.TAssertEqual(err, hasDataErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ hasDataErr = nil
+ })
+
+ g.Testing("otherwise if just returns (not prints) the flag", func() {
+ hasData = true
+ rc, err := hasDataExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ hasData = false
+ rc, err = hasDataExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 1)
+
+ g.TAssertEqual(w.String(), "")
+ })
+}
+
+func test_usage() {
+ g.TestStart("usage()")
+
+ g.Testing("it just writes to io.Writer", func() {
+ var w strings.Builder
+ usage("xxx", &w)
+ const message =
+ "Usage: xxx [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
+ g.TAssertEqual(w.String(), message)
+ })
+
+ g.Testing("noop on io.Discard for io.Writer", func() {
+ usage("AN ERROR IF SEEN ANYWHERE!", io.Discard)
+ })
+}
+
+func test_getopt() {
+ g.TestStart("getopt()")
+
+ const warning = "Missing COMMAND.\n"
+ const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
+
+ execFn := func(argsT, queriesT, io.Reader, io.Writer) (int, error) {
+ return 0, nil
+ }
+ commandsMap := map[string]commandT {
+ "good": commandT{
+ name: "good",
+ getopt: func(args argsT, _ io.Writer) (argsT, bool) {
+ return args, true
+ },
+ exec: execFn,
+ },
+ "bad": commandT{
+ name: "bad",
+ getopt: func(args argsT, w io.Writer) (argsT, bool) {
+ if len(args.args) == 0 {
+ fmt.Fprintln(
+ w,
+ "no required arg",
+ )
+ return args, false
+ }
+
+ if args.args[0] != "required" {
+ fmt.Fprintln(
+ w,
+ "not correct one",
+ )
+ return args, false
+ }
+
+ args.topic = "a topic"
+ args.consumer = "a consumer"
+ return args, true
+ },
+ exec: execFn,
+ },
+ }
+
+
+ g.Testing("we suppress the default error message", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-h"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ g.TAssertEqual(w.String(), usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("we get an error on unsupported flag", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-X"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ const message = "flag provided but not defined: -X\n"
+ g.TAssertEqual(w.String(), message + usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("we also get an error on incorrect usage of flags", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-f"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ const message = "flag needs an argument: -f\n"
+ g.TAssertEqual(w.String(), message + usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("error when not given a command", func() {
+ var w strings.Builder
+ argv := []string{"$0"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ g.TAssertEqual(w.String(), warning + usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("error on unknown command", func() {
+ var w strings.Builder
+ argv := []string{"$0", "unknown"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ const message = `Bad COMMAND: "unknown".` + "\n"
+ g.TAssertEqual(w.String(), message + usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("checks the command usage", func() {
+ var (
+ w1 strings.Builder
+ w2 strings.Builder
+ w3 strings.Builder
+ )
+
+ argv1 := []string{"$0", "bad"}
+ argv2 := []string{"$0", "bad", "arg"}
+ argv3 := []string{"$0", "bad", "required"}
+ _, _, rc1 := getopt(argv1, commandsMap, &w1)
+ _, _, rc2 := getopt(argv2, commandsMap, &w2)
+ args, command, rc3 := getopt(argv3, commandsMap, &w3)
+ expectedArgs := argsT{
+ databasePath: "q.db",
+ prefix: "q",
+ command: "bad",
+ allArgs: argv3,
+ args: argv3[2:],
+ topic: "a topic",
+ consumer: "a consumer",
+ }
+
+ g.TAssertEqual(w1.String(), "no required arg\n" + usage)
+ g.TAssertEqual(w2.String(), "not correct one\n" + usage)
+ g.TAssertEqual(w3.String(), "")
+ g.TAssertEqual(rc1, 2)
+ g.TAssertEqual(rc2, 2)
+ g.TAssertEqual(rc3, 0)
+ g.TAssertEqual(args, expectedArgs)
+ g.TAssertEqual(command.name, "bad")
+ })
+
+ g.Testing("when given a command we the default values", func() {
+ var w strings.Builder
+ args, command, rc := getopt(
+ []string{"$0", "good"},
+ commandsMap,
+ &w,
+ )
+ expectedArgs := argsT{
+ databasePath: "q.db",
+ prefix: "q",
+ command: "good",
+ allArgs: []string{"$0", "good"},
+ args: []string{},
+ }
+
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(args, expectedArgs)
+ g.TAssertEqual(command.name, "good")
+ })
+
+ g.Testing("we can customize both values", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-f", "F", "-p", "P", "good"}
+ args, command, rc := getopt(argv, commandsMap, &w)
+ expectedArgs := argsT{
+ databasePath: "F",
+ prefix: "P",
+ command: "good",
+ allArgs: argv,
+ args: []string{},
+ }
+
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(args, expectedArgs)
+ g.TAssertEqual(command.name, "good")
+ })
+
+ g.Testing("a command can have its own commands and options", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-f", "F", "good", "-f", "-f", "SUB"}
+ args, command, rc := getopt(argv, commandsMap, &w)
+ expectedArgs := argsT{
+ databasePath: "F",
+ prefix: "q",
+ command: "good",
+ allArgs: argv,
+ args: []string{"-f", "-f", "SUB"},
+ }
+
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(args, expectedArgs)
+ g.TAssertEqual(command.name, "good")
+ })
+}
+
+func test_runCommand() {
+ g.TestStart("runCommand()")
+
+ const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
+
+
+ g.Testing("returns an error on bad prefix", func() {
+ stdin := strings.NewReader("")
+ var (
+ stdout strings.Builder
+ stderr strings.Builder
+ )
+ args := argsT{
+ prefix: "a bad prefix",
+ command: "in",
+ allArgs: []string{"$0"},
+ args: []string{"some topic name"},
+ }
+ rc := runCommand(args, commands["in"], stdin, &stdout, &stderr)
+
+ g.TAssertEqual(rc, 1)
+ g.TAssertEqual(stdout.String(), "")
+ g.TAssertEqual(stderr.String(), "Invalid table prefix\n")
+ })
+
+ g.Testing("otherwise it build a queueT and calls command", func() {
+ stdin := strings.NewReader("")
+ var (
+ stdout1 strings.Builder
+ stdout2 strings.Builder
+ stderr1 strings.Builder
+ stderr2 strings.Builder
+ )
+ args1 := argsT{
+ prefix: defaultPrefix,
+ command: "good",
+ }
+ args2 := argsT{
+ prefix: defaultPrefix,
+ command: "bad",
+ }
+ myErr := errors.New("an error")
+ good := commandT{
+ exec: func(
+ argsT,
+ queriesT,
+ io.Reader,
+ io.Writer,
+ ) (int, error) {
+ return 0, nil
+ },
+ }
+ bad := commandT{
+ exec: func(
+ _ argsT,
+ _ queriesT,
+ _ io.Reader,
+ w io.Writer,
+ ) (int, error) {
+ fmt.Fprintf(w, "some text\n")
+ return 1, myErr
+ },
+ }
+ rc1 := runCommand(args1, good, stdin, &stdout1, &stderr1)
+ rc2 := runCommand(args2, bad, stdin, &stdout2, &stderr2)
+
+ g.TAssertEqual(stdout1.String(), "")
+ g.TAssertEqual(stdout2.String(), "some text\n")
+ g.TAssertEqual(stderr1.String(), "")
+ g.TAssertEqual(stderr2.String(), "an error\n")
+ g.TAssertEqual(rc1, 0)
+ g.TAssertEqual(rc2, 1)
+ })
+}
+
+
+func dumpQueries() {
+ queries := []struct{name string; fn func(string) queryT}{
+ { "createTables", createTablesSQL },
+ { "take", takeSQL },
+ { "publish", publishSQL },
+ { "find", findSQL },
+ { "pending", pendingSQL },
+ { "commit", commitSQL },
+ { "toDead", toDeadSQL },
+ { "replay", replaySQL },
+ { "oneDead", oneDeadSQL },
+ { "allDead", allDeadSQL },
+ { "size", sizeSQL },
+ { "count", countSQL },
+ { "hasData", hasDataSQL },
+ }
+ for _, query := range queries {
+ q := query.fn(defaultPrefix)
+ fmt.Printf("\n-- %s.sql:", query.name)
+ fmt.Printf("\n-- write: %s\n", q.write)
+ fmt.Printf("\n-- read: %s\n", q.read)
+ fmt.Printf("\n-- owner: %s\n", q.owner)
+ }
+}
+
+
+
+func MainTest() {
+ if os.Getenv("TESTING_DUMP_SQL_QUERIES") != "" {
+ dumpQueries()
+ return
+ }
+
+ g.Init()
+ test_defaultPrefix()
+ test_inTx()
+ test_createTables()
+ test_takeStmt()
+ test_publishStmt()
+ test_findStmt()
+ test_nextStmt()
+ test_messageEach()
+ test_pendingStmt()
+ test_commitStmt()
+ test_toDeadStmt()
+ test_replayStmt()
+ test_oneDeadStmt()
+ test_deadletterEach()
+ test_allDeadStmt()
+ test_sizeStmt()
+ test_countStmt()
+ test_hasDataStmt()
+ test_initDB()
+ test_queriesTclose()
+ test_newPinger()
+ test_makeSubscriptionsFunc()
+ test_makeNotifyFn()
+ test_collectClosedWaiters()
+ test_trimEmptyLeaves()
+ test_deleteIfEmpty()
+ test_deleteEmptyTopics()
+ test_removeClosedWaiter()
+ test_reapClosedWaiters()
+ test_everyNthCall()
+ test_runReaper()
+ test_NewWithPrefix()
+ test_New()
+ test_asPublicMessage()
+ test_queueT_Publish()
+ test_registerConsumerFn()
+ test_registerWaiterFn()
+ test_makeConsumeOneFn()
+ test_makeConsumeAllFn()
+ test_makeWaitFn()
+ test_runConsumer()
+ test_tryFinding()
+ test_queueT_Subscribe()
+ test_queueT_WaitFor()
+ test_unsubscribeIfExistsFn()
+ test_queueT_Unsubscribe()
+ test_cleanSubscriptions()
+ test_queueT_Close()
+ test_topicGetopt()
+ test_topicConsumerGetopt()
+ test_inExec()
+ test_outExec()
+ test_commitExec()
+ test_deadExec()
+ test_listDeadExec()
+ test_replayExec()
+ test_sizeExec()
+ test_countExec()
+ test_hasDataExec()
+ test_usage()
+ test_getopt()
+ test_runCommand()
+}
diff --git a/tests/queries.sql b/tests/queries.sql
new file mode 100644
index 0000000..2515778
--- /dev/null
+++ b/tests/queries.sql
@@ -0,0 +1,333 @@
+
+-- createTables.sql:
+-- write:
+ CREATE TABLE IF NOT EXISTS "q_payloads" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
+ topic TEXT NOT NULL,
+ payload BLOB NOT NULL
+ ) STRICT;
+ CREATE INDEX IF NOT EXISTS "q_payloads_topic"
+ ON "q_payloads"(topic);
+
+ CREATE TABLE IF NOT EXISTS "q_messages" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
+ uuid BLOB NOT NULL UNIQUE,
+ flow_id BLOB NOT NULL,
+ payload_id INTEGER NOT NULL
+ REFERENCES "q_payloads"(id)
+ ) STRICT;
+ CREATE INDEX IF NOT EXISTS "q_messages_flow_id"
+ ON "q_messages"(flow_id);
+
+ CREATE TABLE IF NOT EXISTS "q_offsets" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
+ consumer TEXT NOT NULL,
+ message_id INTEGER NOT NULL
+ REFERENCES "q_messages"(id),
+ UNIQUE (consumer, message_id)
+ ) STRICT;
+ CREATE INDEX IF NOT EXISTS "q_offsets_consumer"
+ ON "q_offsets"(consumer);
+
+ CREATE TABLE IF NOT EXISTS "q_deadletters" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ uuid BLOB NOT NULL UNIQUE,
+ consumer TEXT NOT NULL,
+ message_id INTEGER NOT NULL
+ REFERENCES "q_messages"(id),
+ UNIQUE (consumer, message_id)
+ ) STRICT;
+ CREATE INDEX IF NOT EXISTS "q_deadletters_consumer"
+ ON "q_deadletters"(consumer);
+
+ CREATE TABLE IF NOT EXISTS "q_replays" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ deadletter_id INTEGER NOT NULL UNIQUE
+ REFERENCES "q_deadletters"(id) ,
+ message_id INTEGER NOT NULL UNIQUE
+ REFERENCES "q_messages"(id)
+ ) STRICT;
+
+ CREATE TABLE IF NOT EXISTS "q_owners" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ topic TEXT NOT NULL,
+ consumer TEXT NOT NULL,
+ owner_id INTEGER NOT NULL,
+ UNIQUE (topic, consumer)
+ ) STRICT;
+
+
+-- read:
+
+-- owner:
+
+-- take.sql:
+-- write:
+ INSERT INTO "q_owners" (topic, consumer, owner_id)
+ VALUES (?, ?, ?)
+ ON CONFLICT (topic, consumer) DO
+ UPDATE SET owner_id=excluded.owner_id;
+
+
+-- read:
+
+-- owner:
+
+-- publish.sql:
+-- write:
+ INSERT INTO "q_payloads" (topic, payload)
+ VALUES (?, ?);
+
+ INSERT INTO "q_messages" (uuid, flow_id, payload_id)
+ VALUES (?, ?, last_insert_rowid());
+
+
+-- read:
+ SELECT id, timestamp FROM "q_messages"
+ WHERE uuid = ?;
+
+
+-- owner:
+
+-- find.sql:
+-- write:
+
+-- read:
+ SELECT
+ "q_messages".id,
+ "q_messages".timestamp,
+ "q_messages".uuid,
+ "q_payloads".payload
+ FROM "q_messages"
+ JOIN "q_payloads" ON
+ "q_payloads".id = "q_messages".payload_id
+ WHERE
+ "q_payloads".topic = ? AND
+ "q_messages".flow_id = ?
+ ORDER BY "q_messages".id DESC
+ LIMIT 1;
+
+
+-- owner:
+
+-- pending.sql:
+-- write:
+
+-- read:
+ SELECT
+ "q_messages".id,
+ "q_messages".timestamp,
+ "q_messages".uuid,
+ "q_messages".flow_id,
+ "q_payloads".topic,
+ "q_payloads".payload
+ FROM "q_messages"
+ JOIN "q_payloads" ON
+ "q_payloads".id = "q_messages".payload_id
+ WHERE
+ "q_payloads".topic = ? AND
+ "q_messages".id NOT IN (
+ SELECT message_id FROM "q_offsets"
+ WHERE consumer = ?
+ )
+ ORDER BY "q_messages".id ASC;
+
+
+-- owner:
+ SELECT owner_id FROM "q_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?;
+
+
+-- commit.sql:
+-- write:
+ INSERT INTO "q_offsets" (consumer, message_id)
+ VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+
+
+-- read:
+ SELECT "q_payloads".topic from "q_payloads"
+ JOIN "q_messages" ON
+ "q_payloads".id = "q_messages".payload_id
+ WHERE "q_messages".uuid = ?;
+
+
+-- owner:
+ SELECT owner_id FROM "q_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?;
+
+
+-- toDead.sql:
+-- write:
+ INSERT INTO "q_offsets" ( consumer, message_id)
+ VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+
+ INSERT INTO "q_deadletters" (uuid, consumer, message_id)
+ VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+
+
+-- read:
+ SELECT "q_payloads".topic FROM "q_payloads"
+ JOIN "q_messages" ON
+ "q_payloads".id = "q_messages".payload_id
+ WHERE "q_messages".uuid = ?;
+
+
+-- owner:
+ SELECT owner_id FROM "q_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?;
+
+
+-- replay.sql:
+-- write:
+ INSERT INTO "q_messages" (uuid, flow_id, payload_id)
+ SELECT
+ ?,
+ "q_messages".flow_id,
+ "q_messages".payload_id
+ FROM "q_messages"
+ JOIN "q_deadletters" ON
+ "q_messages".id = "q_deadletters".message_id
+ WHERE "q_deadletters".uuid = ?;
+
+ INSERT INTO "q_replays" (deadletter_id, message_id)
+ VALUES (
+ (SELECT id FROM "q_deadletters" WHERE uuid = ?),
+ last_insert_rowid()
+ );
+
+
+-- read:
+ SELECT
+ "q_messages".id,
+ "q_messages".timestamp,
+ "q_messages".flow_id,
+ "q_payloads".topic,
+ "q_payloads".payload
+ FROM "q_messages"
+ JOIN "q_payloads" ON
+ "q_payloads".id = "q_messages".payload_id
+ WHERE "q_messages".uuid = ?;
+
+
+-- owner:
+
+-- oneDead.sql:
+-- write:
+
+-- read:
+ SELECT
+ "q_deadletters".uuid,
+ "q_offsets".timestamp,
+ "q_messages".uuid
+ FROM "q_deadletters"
+ JOIN "q_offsets" ON
+ "q_deadletters".message_id = "q_offsets".message_id
+ JOIN "q_messages" ON
+ "q_deadletters".message_id = "q_messages".id
+ JOIN "q_payloads" ON
+ "q_messages".payload_id = "q_payloads".id
+ WHERE
+ "q_payloads".topic = ? AND
+ "q_deadletters".consumer = ? AND
+ "q_offsets".consumer = ? AND
+ "q_deadletters".id NOT IN (
+ SELECT deadletter_id FROM "q_replays"
+ )
+ ORDER BY "q_deadletters".id ASC
+ LIMIT 1;
+
+
+-- owner:
+
+-- allDead.sql:
+-- write:
+
+-- read:
+ SELECT
+ "q_deadletters".uuid,
+ "q_deadletters".message_id,
+ "q_offsets".timestamp,
+ "q_offsets".consumer,
+ "q_messages".timestamp,
+ "q_messages".uuid,
+ "q_messages".flow_id,
+ "q_payloads".topic,
+ "q_payloads".payload
+ FROM "q_deadletters"
+ JOIN "q_offsets" ON
+ "q_deadletters".message_id = "q_offsets".message_id
+ JOIN "q_messages" ON
+ "q_deadletters".message_id = "q_messages".id
+ JOIN "q_payloads" ON
+ "q_messages".payload_id = "q_payloads".id
+ WHERE
+ "q_payloads".topic = ? AND
+ "q_deadletters".consumer = ? AND
+ "q_offsets".consumer = ? AND
+ "q_deadletters".id NOT IN (
+ SELECT deadletter_id FROM "q_replays"
+ )
+ ORDER BY "q_deadletters".id ASC;
+
+
+-- owner:
+
+-- size.sql:
+-- write:
+
+-- read:
+ SELECT
+ COUNT(1) as size
+ FROM "q_messages"
+ JOIN "q_payloads" ON
+ "q_messages".payload_id = "q_payloads".id
+ WHERE "q_payloads".topic = ?;
+
+
+-- owner:
+
+-- count.sql:
+-- write:
+
+-- read:
+ SELECT
+ COUNT(1) as count
+ FROM "q_messages"
+ JOIN "q_offsets" ON
+ "q_messages".id = "q_offsets".message_id
+ JOIN "q_payloads" ON
+ "q_messages".payload_id = "q_payloads".id
+ WHERE
+ "q_payloads".topic = ? AND
+ "q_offsets".consumer = ?;
+
+
+-- owner:
+
+-- hasData.sql:
+-- write:
+
+-- read:
+ SELECT 1 as data
+ FROM "q_messages"
+ JOIN "q_payloads" ON
+ "q_payloads".id = "q_messages".payload_id
+ WHERE
+ "q_payloads".topic = ? AND
+ "q_messages".id NOT IN (
+ SELECT message_id FROM "q_offsets"
+ WHERE consumer = ?
+ )
+ LIMIT 1;
+
+
+-- owner: