diff options
author | EuAndreh <eu@euandre.org> | 2024-09-17 08:01:05 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2024-10-20 07:39:33 -0300 |
commit | ab1795aeb8f00b61c331ac77fdc1011ec14c5253 (patch) | |
tree | 507b72b45f23f8a1bf1a1684a842fef51f1139a8 | |
parent | Init Go project skeleton with golite init (diff) | |
download | fiinha-ab1795aeb8f00b61c331ac77fdc1011ec14c5253.tar.gz fiinha-ab1795aeb8f00b61c331ac77fdc1011ec14c5253.tar.xz |
Initial version: first implementation
62 files changed, 9781 insertions, 184 deletions
@@ -1,7 +1,16 @@ /src/version.go /*.bin -/*.db +/*.db* /src/*.a /src/*.bin /tests/*.a /tests/*.bin +/tests/functional/*/*.a +/tests/functional/*/*.bin +/tests/functional/*/*.go.db* +/tests/fuzz/*/*.a +/tests/fuzz/*/*.bin +/tests/benchmarks/*/*.a +/tests/benchmarks/*/*.bin +/tests/benchmarks/*/*.txt +/tests/fuzz/corpus/ @@ -1,7 +1,7 @@ .POSIX: DATE = 1970-01-01 VERSION = 0.1.0 -NAME = liteq +NAME = q NAME_UC = $(NAME) LANGUAGES = en ## Installation prefix. Defaults to "/usr". @@ -17,7 +17,7 @@ MANDIR = $(SHAREDIR)/man EXEC = ./ ## Where to store the installation. Empty by default. DESTDIR = -LDLIBS = -lsqlite3 +LDLIBS = --static -lsqlite3 -lm GOCFLAGS = -I $(GOLIBDIR) GOLDFLAGS = -L $(GOLIBDIR) @@ -26,17 +26,26 @@ GOLDFLAGS = -L $(GOLIBDIR) .SUFFIXES: .SUFFIXES: .go .a .bin .bin-check +.go.a: + go tool compile $(GOCFLAGS) -I $(@D) -o $@ -p $(*F) \ + `find $< $$(if [ $(*F) != main ]; then \ + echo src/$(NAME).go src/version.go; fi) | uniq` + +.a.bin: + go tool link $(GOLDFLAGS) -L $(@D) -o $@ --extldflags '$(LDLIBS)' $< + all: include deps.mk -objects = \ - src/$(NAME).a \ - src/main.a \ - tests/$(NAME).a \ - tests/main.a \ +libs.a = $(libs.go:.go=.a) +mains.a = $(mains.go:.go=.a) +mains.bin = $(mains.go:.go=.bin) +functional-tests/lib.a = $(functional-tests/lib.go:.go=.a) +fuzz-targets/lib.a = $(fuzz-targets/lib.go:.go=.a) +benchmarks/lib.a = $(benchmarks/lib.go:.go=.a) sources = \ src/$(NAME).go \ @@ -46,13 +55,16 @@ sources = \ derived-assets = \ src/version.go \ - $(objects) \ - src/main.bin \ - tests/main.bin \ + $(libs.a) \ + $(mains.a) \ + $(mains.bin) \ $(NAME).bin \ side-assets = \ - liteq.db \ + $(NAME).db* \ + tests/functional/*/*.go.db* \ + tests/fuzz/corpus/ \ + tests/benchmarks/*/main.txt \ @@ -61,40 +73,35 @@ side-assets = \ all: $(derived-assets) -$(objects): Makefile +$(libs.a): Makefile deps.mk +$(libs.a): src/$(NAME).go src/version.go -src/$(NAME).a: src/$(NAME).go src/version.go - go tool compile $(GOCFLAGS) -o $@ -p $(*F) -I $(@D) $*.go src/version.go -src/main.a: src/main.go src/$(NAME).a -tests/main.a: tests/main.go tests/$(NAME).a -src/main.a tests/main.a: - go tool compile $(GOCFLAGS) -o $@ -p $(*F) -I $(@D) $*.go +$(fuzz-targets/lib.a): + go tool compile $(GOCFLAGS) -o $@ -p $(NAME) -d=libfuzzer \ + $*.go src/$(NAME).go src/version.go -tests/$(NAME).a: tests/$(NAME).go src/$(NAME).go src/version.go - go tool compile $(GOCFLAGS) -o $@ -p $(*F) $*.go src/$(*F).go src/version.go - -src/main.bin: src/main.a -tests/main.bin: tests/main.a -src/main.bin tests/main.bin: - go tool link $(GOLDFLAGS) -o $@ -L $(@D) --extldflags '$(LDLIBS)' $*.a +src/version.go: Makefile + echo 'package $(NAME); const Version = "$(VERSION)"' > $@ $(NAME).bin: src/main.bin ln -fs $? $@ -src/version.go: Makefile - echo 'package $(NAME); var version = "$(VERSION)"' > $@ +.PRECIOUS: tests/queries.sql +tests/queries.sql: tests/main.bin ALWAYS + env TESTING_DUMP_SQL_QUERIES=1 $(EXEC)tests/main.bin | diff -U10 $@ - tests.bin-check = \ - tests/main.bin-check \ + tests/main.bin-check \ + $(functional-tests/main.go:.go=.bin-check) \ -tests/main.bin-check: tests/main.bin $(tests.bin-check): $(EXEC)$*.bin check-unit: $(tests.bin-check) +check-unit: tests/queries.sql integration-tests = \ @@ -107,6 +114,7 @@ $(integration-tests): ALWAYS sh $@ check-integration: $(integration-tests) +check-integration: fuzz ## Run all tests. Each test suite is isolated, so that a parallel @@ -116,6 +124,27 @@ check: check-unit check-integration +FUZZSEC=1 +fuzz-targets/main.bin-check = $(fuzz-targets/main.go:.go=.bin-check) +$(fuzz-targets/main.bin-check): + $(EXEC)$*.bin --test.fuzztime=$(FUZZSEC)s \ + --test.fuzz='.*' --test.fuzzcachedir=tests/fuzz/corpus + +fuzz: $(fuzz-targets/main.bin-check) + + + +benchmarks/main.bin-check = $(benchmarks/main.go:.go=.bin-check) +$(benchmarks/main.bin-check): + rm -f $*.txt + printf '%s\n' '$(EXEC)$*.bin' >> $*.txt + LANG=POSIX.UTF-8 time -p $(EXEC)$*.bin 2>> $*.txt + printf '%s\n' '$*.txt' + +bench: $(benchmarks/main.bin-check) + + + ## Remove *all* derived artifacts produced during the build. ## A dedicated test asserts that this is always true. clean: @@ -0,0 +1,264 @@ +libs.go = \ + src/q.go \ + tests/benchmarks/deadletters/q.go \ + tests/benchmarks/lookup/q.go \ + tests/benchmarks/multiple-consumers/q.go \ + tests/benchmarks/multiple-produces/q.go \ + tests/benchmarks/reaper/q.go \ + tests/benchmarks/replay/q.go \ + tests/benchmarks/single-consumer/q.go \ + tests/benchmarks/single-producer/q.go \ + tests/benchmarks/subscribe/q.go \ + tests/benchmarks/unsubscribe/q.go \ + tests/benchmarks/waiter/q.go \ + tests/functional/consume-one-produce-many/q.go \ + tests/functional/consumer-with-deadletter/q.go \ + tests/functional/custom-prefix/q.go \ + tests/functional/distinct-consumers-separate-instances/q.go \ + tests/functional/flow-id/q.go \ + tests/functional/idempotency/q.go \ + tests/functional/new-instance-takeover/q.go \ + tests/functional/wait-after-publish/q.go \ + tests/functional/waiter/q.go \ + tests/fuzz/api-check/q.go \ + tests/fuzz/cli-check/q.go \ + tests/fuzz/equal-produced-consumed-order-check/q.go \ + tests/fuzz/exactly-once-check/q.go \ + tests/fuzz/queries-check/q.go \ + tests/fuzz/total-order-check/q.go \ + tests/q.go \ + +mains.go = \ + src/main.go \ + tests/benchmarks/deadletters/main.go \ + tests/benchmarks/lookup/main.go \ + tests/benchmarks/multiple-consumers/main.go \ + tests/benchmarks/multiple-produces/main.go \ + tests/benchmarks/reaper/main.go \ + tests/benchmarks/replay/main.go \ + tests/benchmarks/single-consumer/main.go \ + tests/benchmarks/single-producer/main.go \ + tests/benchmarks/subscribe/main.go \ + tests/benchmarks/unsubscribe/main.go \ + tests/benchmarks/waiter/main.go \ + tests/functional/consume-one-produce-many/main.go \ + tests/functional/consumer-with-deadletter/main.go \ + tests/functional/custom-prefix/main.go \ + tests/functional/distinct-consumers-separate-instances/main.go \ + tests/functional/flow-id/main.go \ + tests/functional/idempotency/main.go \ + tests/functional/new-instance-takeover/main.go \ + tests/functional/wait-after-publish/main.go \ + tests/functional/waiter/main.go \ + tests/fuzz/api-check/main.go \ + tests/fuzz/cli-check/main.go \ + tests/fuzz/equal-produced-consumed-order-check/main.go \ + tests/fuzz/exactly-once-check/main.go \ + tests/fuzz/queries-check/main.go \ + tests/fuzz/total-order-check/main.go \ + tests/main.go \ + +functional-tests/libs.go = \ + tests/functional/consume-one-produce-many/q.go \ + tests/functional/consumer-with-deadletter/q.go \ + tests/functional/custom-prefix/q.go \ + tests/functional/distinct-consumers-separate-instances/q.go \ + tests/functional/flow-id/q.go \ + tests/functional/idempotency/q.go \ + tests/functional/new-instance-takeover/q.go \ + tests/functional/wait-after-publish/q.go \ + tests/functional/waiter/q.go \ + +functional-tests/main.go = \ + tests/functional/consume-one-produce-many/main.go \ + tests/functional/consumer-with-deadletter/main.go \ + tests/functional/custom-prefix/main.go \ + tests/functional/distinct-consumers-separate-instances/main.go \ + tests/functional/flow-id/main.go \ + tests/functional/idempotency/main.go \ + tests/functional/new-instance-takeover/main.go \ + tests/functional/wait-after-publish/main.go \ + tests/functional/waiter/main.go \ + +fuzz-targets/lib.go = \ + tests/fuzz/api-check/q.go \ + tests/fuzz/cli-check/q.go \ + tests/fuzz/equal-produced-consumed-order-check/q.go \ + tests/fuzz/exactly-once-check/q.go \ + tests/fuzz/queries-check/q.go \ + tests/fuzz/total-order-check/q.go \ + +fuzz-targets/main.go = \ + tests/fuzz/api-check/main.go \ + tests/fuzz/cli-check/main.go \ + tests/fuzz/equal-produced-consumed-order-check/main.go \ + tests/fuzz/exactly-once-check/main.go \ + tests/fuzz/queries-check/main.go \ + tests/fuzz/total-order-check/main.go \ + +benchmarks/lib.go = \ + tests/benchmarks/deadletters/q.go \ + tests/benchmarks/lookup/q.go \ + tests/benchmarks/multiple-consumers/q.go \ + tests/benchmarks/multiple-produces/q.go \ + tests/benchmarks/reaper/q.go \ + tests/benchmarks/replay/q.go \ + tests/benchmarks/single-consumer/q.go \ + tests/benchmarks/single-producer/q.go \ + tests/benchmarks/subscribe/q.go \ + tests/benchmarks/unsubscribe/q.go \ + tests/benchmarks/waiter/q.go \ + +benchmarks/main.go = \ + tests/benchmarks/deadletters/main.go \ + tests/benchmarks/lookup/main.go \ + tests/benchmarks/multiple-consumers/main.go \ + tests/benchmarks/multiple-produces/main.go \ + tests/benchmarks/reaper/main.go \ + tests/benchmarks/replay/main.go \ + tests/benchmarks/single-consumer/main.go \ + tests/benchmarks/single-producer/main.go \ + tests/benchmarks/subscribe/main.go \ + tests/benchmarks/unsubscribe/main.go \ + tests/benchmarks/waiter/main.go \ + +src/main.a: src/main.go +src/q.a: src/q.go +tests/benchmarks/deadletters/main.a: tests/benchmarks/deadletters/main.go +tests/benchmarks/deadletters/q.a: tests/benchmarks/deadletters/q.go +tests/benchmarks/lookup/main.a: tests/benchmarks/lookup/main.go +tests/benchmarks/lookup/q.a: tests/benchmarks/lookup/q.go +tests/benchmarks/multiple-consumers/main.a: tests/benchmarks/multiple-consumers/main.go +tests/benchmarks/multiple-consumers/q.a: tests/benchmarks/multiple-consumers/q.go +tests/benchmarks/multiple-produces/main.a: tests/benchmarks/multiple-produces/main.go +tests/benchmarks/multiple-produces/q.a: tests/benchmarks/multiple-produces/q.go +tests/benchmarks/reaper/main.a: tests/benchmarks/reaper/main.go +tests/benchmarks/reaper/q.a: tests/benchmarks/reaper/q.go +tests/benchmarks/replay/main.a: tests/benchmarks/replay/main.go +tests/benchmarks/replay/q.a: tests/benchmarks/replay/q.go +tests/benchmarks/single-consumer/main.a: tests/benchmarks/single-consumer/main.go +tests/benchmarks/single-consumer/q.a: tests/benchmarks/single-consumer/q.go +tests/benchmarks/single-producer/main.a: tests/benchmarks/single-producer/main.go +tests/benchmarks/single-producer/q.a: tests/benchmarks/single-producer/q.go +tests/benchmarks/subscribe/main.a: tests/benchmarks/subscribe/main.go +tests/benchmarks/subscribe/q.a: tests/benchmarks/subscribe/q.go +tests/benchmarks/unsubscribe/main.a: tests/benchmarks/unsubscribe/main.go +tests/benchmarks/unsubscribe/q.a: tests/benchmarks/unsubscribe/q.go +tests/benchmarks/waiter/main.a: tests/benchmarks/waiter/main.go +tests/benchmarks/waiter/q.a: tests/benchmarks/waiter/q.go +tests/functional/consume-one-produce-many/main.a: tests/functional/consume-one-produce-many/main.go +tests/functional/consume-one-produce-many/q.a: tests/functional/consume-one-produce-many/q.go +tests/functional/consumer-with-deadletter/main.a: tests/functional/consumer-with-deadletter/main.go +tests/functional/consumer-with-deadletter/q.a: tests/functional/consumer-with-deadletter/q.go +tests/functional/custom-prefix/main.a: tests/functional/custom-prefix/main.go +tests/functional/custom-prefix/q.a: tests/functional/custom-prefix/q.go +tests/functional/distinct-consumers-separate-instances/main.a: tests/functional/distinct-consumers-separate-instances/main.go +tests/functional/distinct-consumers-separate-instances/q.a: tests/functional/distinct-consumers-separate-instances/q.go +tests/functional/flow-id/main.a: tests/functional/flow-id/main.go +tests/functional/flow-id/q.a: tests/functional/flow-id/q.go +tests/functional/idempotency/main.a: tests/functional/idempotency/main.go +tests/functional/idempotency/q.a: tests/functional/idempotency/q.go +tests/functional/new-instance-takeover/main.a: tests/functional/new-instance-takeover/main.go +tests/functional/new-instance-takeover/q.a: tests/functional/new-instance-takeover/q.go +tests/functional/wait-after-publish/main.a: tests/functional/wait-after-publish/main.go +tests/functional/wait-after-publish/q.a: tests/functional/wait-after-publish/q.go +tests/functional/waiter/main.a: tests/functional/waiter/main.go +tests/functional/waiter/q.a: tests/functional/waiter/q.go +tests/fuzz/api-check/main.a: tests/fuzz/api-check/main.go +tests/fuzz/api-check/q.a: tests/fuzz/api-check/q.go +tests/fuzz/cli-check/main.a: tests/fuzz/cli-check/main.go +tests/fuzz/cli-check/q.a: tests/fuzz/cli-check/q.go +tests/fuzz/equal-produced-consumed-order-check/main.a: tests/fuzz/equal-produced-consumed-order-check/main.go +tests/fuzz/equal-produced-consumed-order-check/q.a: tests/fuzz/equal-produced-consumed-order-check/q.go +tests/fuzz/exactly-once-check/main.a: tests/fuzz/exactly-once-check/main.go +tests/fuzz/exactly-once-check/q.a: tests/fuzz/exactly-once-check/q.go +tests/fuzz/queries-check/main.a: tests/fuzz/queries-check/main.go +tests/fuzz/queries-check/q.a: tests/fuzz/queries-check/q.go +tests/fuzz/total-order-check/main.a: tests/fuzz/total-order-check/main.go +tests/fuzz/total-order-check/q.a: tests/fuzz/total-order-check/q.go +tests/main.a: tests/main.go +tests/q.a: tests/q.go +src/main.bin: src/main.a +tests/benchmarks/deadletters/main.bin: tests/benchmarks/deadletters/main.a +tests/benchmarks/lookup/main.bin: tests/benchmarks/lookup/main.a +tests/benchmarks/multiple-consumers/main.bin: tests/benchmarks/multiple-consumers/main.a +tests/benchmarks/multiple-produces/main.bin: tests/benchmarks/multiple-produces/main.a +tests/benchmarks/reaper/main.bin: tests/benchmarks/reaper/main.a +tests/benchmarks/replay/main.bin: tests/benchmarks/replay/main.a +tests/benchmarks/single-consumer/main.bin: tests/benchmarks/single-consumer/main.a +tests/benchmarks/single-producer/main.bin: tests/benchmarks/single-producer/main.a +tests/benchmarks/subscribe/main.bin: tests/benchmarks/subscribe/main.a +tests/benchmarks/unsubscribe/main.bin: tests/benchmarks/unsubscribe/main.a +tests/benchmarks/waiter/main.bin: tests/benchmarks/waiter/main.a +tests/functional/consume-one-produce-many/main.bin: tests/functional/consume-one-produce-many/main.a +tests/functional/consumer-with-deadletter/main.bin: tests/functional/consumer-with-deadletter/main.a +tests/functional/custom-prefix/main.bin: tests/functional/custom-prefix/main.a +tests/functional/distinct-consumers-separate-instances/main.bin: tests/functional/distinct-consumers-separate-instances/main.a +tests/functional/flow-id/main.bin: tests/functional/flow-id/main.a +tests/functional/idempotency/main.bin: tests/functional/idempotency/main.a +tests/functional/new-instance-takeover/main.bin: tests/functional/new-instance-takeover/main.a +tests/functional/wait-after-publish/main.bin: tests/functional/wait-after-publish/main.a +tests/functional/waiter/main.bin: tests/functional/waiter/main.a +tests/fuzz/api-check/main.bin: tests/fuzz/api-check/main.a +tests/fuzz/cli-check/main.bin: tests/fuzz/cli-check/main.a +tests/fuzz/equal-produced-consumed-order-check/main.bin: tests/fuzz/equal-produced-consumed-order-check/main.a +tests/fuzz/exactly-once-check/main.bin: tests/fuzz/exactly-once-check/main.a +tests/fuzz/queries-check/main.bin: tests/fuzz/queries-check/main.a +tests/fuzz/total-order-check/main.bin: tests/fuzz/total-order-check/main.a +tests/main.bin: tests/main.a +src/main.bin-check: src/main.bin +tests/benchmarks/deadletters/main.bin-check: tests/benchmarks/deadletters/main.bin +tests/benchmarks/lookup/main.bin-check: tests/benchmarks/lookup/main.bin +tests/benchmarks/multiple-consumers/main.bin-check: tests/benchmarks/multiple-consumers/main.bin +tests/benchmarks/multiple-produces/main.bin-check: tests/benchmarks/multiple-produces/main.bin +tests/benchmarks/reaper/main.bin-check: tests/benchmarks/reaper/main.bin +tests/benchmarks/replay/main.bin-check: tests/benchmarks/replay/main.bin +tests/benchmarks/single-consumer/main.bin-check: tests/benchmarks/single-consumer/main.bin +tests/benchmarks/single-producer/main.bin-check: tests/benchmarks/single-producer/main.bin +tests/benchmarks/subscribe/main.bin-check: tests/benchmarks/subscribe/main.bin +tests/benchmarks/unsubscribe/main.bin-check: tests/benchmarks/unsubscribe/main.bin +tests/benchmarks/waiter/main.bin-check: tests/benchmarks/waiter/main.bin +tests/functional/consume-one-produce-many/main.bin-check: tests/functional/consume-one-produce-many/main.bin +tests/functional/consumer-with-deadletter/main.bin-check: tests/functional/consumer-with-deadletter/main.bin +tests/functional/custom-prefix/main.bin-check: tests/functional/custom-prefix/main.bin +tests/functional/distinct-consumers-separate-instances/main.bin-check: tests/functional/distinct-consumers-separate-instances/main.bin +tests/functional/flow-id/main.bin-check: tests/functional/flow-id/main.bin +tests/functional/idempotency/main.bin-check: tests/functional/idempotency/main.bin +tests/functional/new-instance-takeover/main.bin-check: tests/functional/new-instance-takeover/main.bin +tests/functional/wait-after-publish/main.bin-check: tests/functional/wait-after-publish/main.bin +tests/functional/waiter/main.bin-check: tests/functional/waiter/main.bin +tests/fuzz/api-check/main.bin-check: tests/fuzz/api-check/main.bin +tests/fuzz/cli-check/main.bin-check: tests/fuzz/cli-check/main.bin +tests/fuzz/equal-produced-consumed-order-check/main.bin-check: tests/fuzz/equal-produced-consumed-order-check/main.bin +tests/fuzz/exactly-once-check/main.bin-check: tests/fuzz/exactly-once-check/main.bin +tests/fuzz/queries-check/main.bin-check: tests/fuzz/queries-check/main.bin +tests/fuzz/total-order-check/main.bin-check: tests/fuzz/total-order-check/main.bin +tests/main.bin-check: tests/main.bin +src/main.a: src/$(NAME).a +tests/benchmarks/deadletters/main.a: tests/benchmarks/deadletters/$(NAME).a +tests/benchmarks/lookup/main.a: tests/benchmarks/lookup/$(NAME).a +tests/benchmarks/multiple-consumers/main.a: tests/benchmarks/multiple-consumers/$(NAME).a +tests/benchmarks/multiple-produces/main.a: tests/benchmarks/multiple-produces/$(NAME).a +tests/benchmarks/reaper/main.a: tests/benchmarks/reaper/$(NAME).a +tests/benchmarks/replay/main.a: tests/benchmarks/replay/$(NAME).a +tests/benchmarks/single-consumer/main.a: tests/benchmarks/single-consumer/$(NAME).a +tests/benchmarks/single-producer/main.a: tests/benchmarks/single-producer/$(NAME).a +tests/benchmarks/subscribe/main.a: tests/benchmarks/subscribe/$(NAME).a +tests/benchmarks/unsubscribe/main.a: tests/benchmarks/unsubscribe/$(NAME).a +tests/benchmarks/waiter/main.a: tests/benchmarks/waiter/$(NAME).a +tests/functional/consume-one-produce-many/main.a: tests/functional/consume-one-produce-many/$(NAME).a +tests/functional/consumer-with-deadletter/main.a: tests/functional/consumer-with-deadletter/$(NAME).a +tests/functional/custom-prefix/main.a: tests/functional/custom-prefix/$(NAME).a +tests/functional/distinct-consumers-separate-instances/main.a: tests/functional/distinct-consumers-separate-instances/$(NAME).a +tests/functional/flow-id/main.a: tests/functional/flow-id/$(NAME).a +tests/functional/idempotency/main.a: tests/functional/idempotency/$(NAME).a +tests/functional/new-instance-takeover/main.a: tests/functional/new-instance-takeover/$(NAME).a +tests/functional/wait-after-publish/main.a: tests/functional/wait-after-publish/$(NAME).a +tests/functional/waiter/main.a: tests/functional/waiter/$(NAME).a +tests/fuzz/api-check/main.a: tests/fuzz/api-check/$(NAME).a +tests/fuzz/cli-check/main.a: tests/fuzz/cli-check/$(NAME).a +tests/fuzz/equal-produced-consumed-order-check/main.a: tests/fuzz/equal-produced-consumed-order-check/$(NAME).a +tests/fuzz/exactly-once-check/main.a: tests/fuzz/exactly-once-check/$(NAME).a +tests/fuzz/queries-check/main.a: tests/fuzz/queries-check/$(NAME).a +tests/fuzz/total-order-check/main.a: tests/fuzz/total-order-check/$(NAME).a +tests/main.a: tests/$(NAME).a @@ -2,3 +2,28 @@ set -eu export LANG=POSIX.UTF-8 + + +libs() { + find src tests -name '*.go' | grep -v '/main\.go$' | + grep -v '/version\.go$' +} + +mains() { + find src tests -name '*.go' | grep '/main\.go$' +} + +libs | varlist 'libs.go' +mains | varlist 'mains.go' + +find tests/functional/*/*.go -not -name main.go | varlist 'functional-tests/libs.go' +find tests/functional/*/main.go | varlist 'functional-tests/main.go' +find tests/fuzz/*/*.go -not -name main.go | varlist 'fuzz-targets/lib.go' +find tests/fuzz/*/main.go | varlist 'fuzz-targets/main.go' +find tests/benchmarks/*/*.go -not -name main.go | varlist 'benchmarks/lib.go' +find tests/benchmarks/*/main.go | varlist 'benchmarks/main.go' + +{ libs; mains; } | sort | sed 's/^\(.*\)\.go$/\1.a:\t\1.go/' +mains | sort | sed 's/^\(.*\)\.go$/\1.bin:\t\1.a/' +mains | sort | sed 's/^\(.*\)\.go$/\1.bin-check:\t\1.bin/' +mains | sort | sed 's|^\(.*\)/main\.go$|\1/main.a:\t\1/$(NAME).a|' diff --git a/src/liteq.go b/src/liteq.go deleted file mode 100644 index 2eeff34..0000000 --- a/src/liteq.go +++ /dev/null @@ -1,117 +0,0 @@ -package liteq - -import ( - "database/sql" - "flag" - "io/ioutil" - "log/slog" - "os" - "sort" - - g "gobang" - "golite" -) - - - -func InitMigrations(db *sql.DB) { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS migrations ( - filename TEXT PRIMARY KEY - ); - `) - g.FatalIf(err) -} - -const MIGRATIONS_DIR = "src/sql/migrations/" -func PendingMigrations(db *sql.DB) []string { - files, err := ioutil.ReadDir(MIGRATIONS_DIR) - g.FatalIf(err) - - set := make(map[string]bool) - for _, file := range files { - set[file.Name()] = true - } - - rows, err := db.Query(`SELECT filename FROM migrations;`) - g.FatalIf(err) - defer rows.Close() - - for rows.Next() { - var filename string - err := rows.Scan(&filename) - g.FatalIf(err) - delete(set, filename) - } - g.FatalIf(rows.Err()) - - difference := make([]string, 0) - for filename := range set { - difference = append(difference, filename) - } - - sort.Sort(sort.StringSlice(difference)) - return difference -} - -func RunMigrations(db *sql.DB) { - InitMigrations(db) - - stmt, err := db.Prepare(`INSERT INTO migrations (filename) VALUES (?);`) - g.FatalIf(err) - defer stmt.Close() - - for _, filename := range PendingMigrations(db) { - g.Info("Running migration file", "exec-migration-file", - "filename", filename, - ) - - tx, err := db.Begin() - g.FatalIf(err) - - sql, err := os.ReadFile(MIGRATIONS_DIR + filename) - g.FatalIf(err) - - _, err = tx.Exec(string(sql)) - g.FatalIf(err) - - _, err = tx.Stmt(stmt).Exec(filename) - g.FatalIf(err) - - err = tx.Commit() - g.FatalIf(err) - } -} - -func initDB(databasePath string) *sql.DB { - db, err := sql.Open("sqlite3", databasePath) - g.FatalIf(err) - RunMigrations(db) - return db -} - -func run(db *sql.DB) { -} - - - -var ( - databasePath = flag.String( - "f", - "q.db", - "The path to the database file", - ) -) - - -func Main() { - g.Init(slog.Group( - "versions", - "gobang", g.Version, - "golite", golite.Version, - "this", version, - )) - flag.Parse() - db := initDB(*databasePath) - run(db) -} diff --git a/src/main.go b/src/main.go index 8d9a05e..51faffa 100644 --- a/src/main.go +++ b/src/main.go @@ -1,7 +1,7 @@ package main -import "liteq" +import "q" func main() { - liteq.Main() + q.Main() } diff --git a/src/q.go b/src/q.go new file mode 100644 index 0000000..6eeefe6 --- /dev/null +++ b/src/q.go @@ -0,0 +1,2495 @@ +package q +import ( + "context" + "database/sql" + "flag" + "fmt" + "io" + "log/slog" + "os" + "sync" + "time" + + _ "acudego" + "guuid" + g "gobang" +) + + + +const ( + defaultPrefix = "q" + reaperSkipCount = 1000 + notOwnerErrorFmt = "%v owns %#v as %#v, not us (%v)" + noLongerOwnerErrorFmt = "we (%v) no longer own %#v as %#v, but %v does" + rollbackErrorFmt = "rollback error: %w; while executing: %w" +) + + + +type dbI interface{ + findOne(q string, args []any, bindings []any) error + exec(q string) +} + +type queryT struct{ + write string + read string + owner string +} + +type queriesT struct{ + take func(string, string) error + publish func(UnsentMessage, guuid.UUID) (messageT, error) + find func(string, guuid.UUID) (messageT, error) + next func(string, string) (messageT, error) + pending func(string, string, func(messageT) error) error + commit func(string, guuid.UUID) error + toDead func(string, guuid.UUID, guuid.UUID) error + replay func(guuid.UUID, guuid.UUID) (messageT, error) + oneDead func(string, string) (deadletterT, error) + allDead func(string, string, func(deadletterT, messageT) error) error + size func(string) (int, error) + count func(string, string) (int, error) + hasData func(string, string) (bool, error) + close func() error +} + +type messageT struct{ + id int64 + timestamp time.Time + uuid guuid.UUID + topic string + flowID guuid.UUID + payload []byte +} + +type UnsentMessage struct{ + Topic string + FlowID guuid.UUID + Payload []byte +} + +type Message struct{ + ID guuid.UUID + Timestamp time.Time + Topic string + FlowID guuid.UUID + Payload []byte +} + +type deadletterT struct{ + uuid guuid.UUID + timestamp time.Time + consumer string + messageID guuid.UUID +} + +type pingerT[T any] struct{ + tryPing func(T) + onPing func(func(T)) + closed func() bool + close func() +} + +type consumerDataT struct{ + topic string + name string +} + +type waiterDataT struct{ + topic string + flowID guuid.UUID + name string + +} + +type consumerT struct{ + data consumerDataT + callback func(Message) error + pinger pingerT[struct{}] + close *func() +} + +type waiterT struct{ + data waiterDataT + pinger pingerT[[]byte] + closed *func() bool + close *func() +} + +type topicSubscriptionT struct{ + consumers map[string]consumerT + waiters map[guuid.UUID]map[string]waiterT +} + +type subscriptionsSetM map[string]topicSubscriptionT + +type subscriptionsT struct { + read func(func(subscriptionsSetM) error) error + write func(func(subscriptionsSetM) error) error +} + +type queueT struct{ + queries queriesT + subscriptions subscriptionsT + pinger pingerT[struct{}] +} + +type argsT struct{ + databasePath string + prefix string + command string + allArgs []string + args []string + topic string + consumer string +} + +type commandT struct{ + name string + getopt func(argsT, io.Writer) (argsT, bool) + exec func(argsT, queriesT, io.Reader, io.Writer) (int, error) +} + +type IQueue interface{ + Publish(UnsentMessage) (Message, error) + Subscribe( string, string, func(Message) error) error + Unsubscribe(string, string) + WaitFor(string, guuid.UUID, string) Waiter + Close() error +} + + + +func closeNoop() error { + return nil +} + +func tryRollback(db *sql.DB, ctx context.Context, err error) error { + _, rollbackErr := db.ExecContext(ctx, "ROLLBACK;") + if rollbackErr != nil { + return fmt.Errorf( + rollbackErrorFmt, + rollbackErr, + err, + ) + } + + return err +} + +func inTx(db *sql.DB, fn func(context.Context) error) error { + ctx := context.Background() + + _, err := db.ExecContext(ctx, "BEGIN IMMEDIATE;") + if err != nil { + return err + } + + err = fn(ctx) + if err != nil { + return tryRollback(db, ctx, err) + } + + _, err = db.ExecContext(ctx, "COMMIT;") + if err != nil { + return tryRollback(db, ctx, err) + } + + return nil +} + +func createTablesSQL(prefix string) queryT { + const tmpl_write = ` + CREATE TABLE IF NOT EXISTS "%s_payloads" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + topic TEXT NOT NULL, + payload BLOB NOT NULL + ) STRICT; + CREATE INDEX IF NOT EXISTS "%s_payloads_topic" + ON "%s_payloads"(topic); + + CREATE TABLE IF NOT EXISTS "%s_messages" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + uuid BLOB NOT NULL UNIQUE, + flow_id BLOB NOT NULL, + payload_id INTEGER NOT NULL + REFERENCES "%s_payloads"(id) + ) STRICT; + CREATE INDEX IF NOT EXISTS "%s_messages_flow_id" + ON "%s_messages"(flow_id); + + CREATE TABLE IF NOT EXISTS "%s_offsets" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + consumer TEXT NOT NULL, + message_id INTEGER NOT NULL + REFERENCES "%s_messages"(id), + UNIQUE (consumer, message_id) + ) STRICT; + CREATE INDEX IF NOT EXISTS "%s_offsets_consumer" + ON "%s_offsets"(consumer); + + CREATE TABLE IF NOT EXISTS "%s_deadletters" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + uuid BLOB NOT NULL UNIQUE, + consumer TEXT NOT NULL, + message_id INTEGER NOT NULL + REFERENCES "%s_messages"(id), + UNIQUE (consumer, message_id) + ) STRICT; + CREATE INDEX IF NOT EXISTS "%s_deadletters_consumer" + ON "%s_deadletters"(consumer); + + CREATE TABLE IF NOT EXISTS "%s_replays" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + deadletter_id INTEGER NOT NULL UNIQUE + REFERENCES "%s_deadletters"(id) , + message_id INTEGER NOT NULL UNIQUE + REFERENCES "%s_messages"(id) + ) STRICT; + + CREATE TABLE IF NOT EXISTS "%s_owners" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + topic TEXT NOT NULL, + consumer TEXT NOT NULL, + owner_id INTEGER NOT NULL, + UNIQUE (topic, consumer) + ) STRICT; + ` + return queryT{ + write: fmt.Sprintf( + tmpl_write, + prefix, + g.SQLiteNow, + prefix, + prefix, + prefix, + g.SQLiteNow, + prefix, + prefix, + prefix, + prefix, + g.SQLiteNow, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + +func createTables(db *sql.DB, prefix string) error { + q := createTablesSQL(prefix) + + return inTx(db, func(ctx context.Context) error { + _, err := db.ExecContext(ctx, q.write) + return err + }) +} + +func takeSQL(prefix string) queryT { + const tmpl_write = ` + INSERT INTO "%s_owners" (topic, consumer, owner_id) + VALUES (?, ?, ?) + ON CONFLICT (topic, consumer) DO + UPDATE SET owner_id=excluded.owner_id; + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func takeStmt( + db *sql.DB, + prefix string, + instanceID int, +) (func(string, string) error, func() error, error) { + q := takeSQL(prefix) + + fn := func(topic string, consumer string) error { + return inTx(db, func(ctx context.Context) error { + _, err := db.ExecContext( + ctx, + q.write, + topic, + consumer, + instanceID, + ) + return err + }) + } + + return fn, closeNoop, nil +} + +func publishSQL(prefix string) queryT { + const tmpl_write = ` + INSERT INTO "%s_payloads" (topic, payload) + VALUES (?, ?); + + INSERT INTO "%s_messages" (uuid, flow_id, payload_id) + VALUES (?, ?, last_insert_rowid()); + ` + const tmpl_read = ` + SELECT id, timestamp FROM "%s_messages" + WHERE uuid = ?; + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix, prefix), + read: fmt.Sprintf(tmpl_read, prefix), + } +} + +func publishStmt( + db *sql.DB, + prefix string, + _ int, +) (func(UnsentMessage, guuid.UUID) (messageT, error), func() error, error) { + q := publishSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func( + unsentMessage UnsentMessage, + messageID guuid.UUID, + ) (messageT, error) { + message := messageT{ + uuid: messageID, + topic: unsentMessage.Topic, + flowID: unsentMessage.FlowID, + payload: unsentMessage.Payload, + } + + message_id_bytes := messageID[:] + flow_id_bytes := unsentMessage.FlowID[:] + _, err := db.Exec( + q.write, + unsentMessage.Topic, + unsentMessage.Payload, + message_id_bytes, + flow_id_bytes, + ) + if err != nil { + return messageT{}, err + } + + var timestr string + err = readStmt.QueryRow(message_id_bytes).Scan( + &message.id, + ×tr, + ) + if err != nil { + return messageT{}, err + } + + message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return messageT{}, err + } + + return message, nil + } + + return fn, readStmt.Close, nil +} + +func findSQL(prefix string) queryT { + const tmpl_read = ` + SELECT + "%s_messages".id, + "%s_messages".timestamp, + "%s_messages".uuid, + "%s_payloads".payload + FROM "%s_messages" + JOIN "%s_payloads" ON + "%s_payloads".id = "%s_messages".payload_id + WHERE + "%s_payloads".topic = ? AND + "%s_messages".flow_id = ? + ORDER BY "%s_messages".id DESC + LIMIT 1; + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + +func findStmt( + db *sql.DB, + prefix string, + _ int, +) (func(string, guuid.UUID) (messageT, error), func() error, error) { + q := findSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(topic string, flowID guuid.UUID) (messageT, error) { + message := messageT{ + topic: topic, + flowID: flowID, + } + + var ( + timestr string + message_id_bytes []byte + ) + flow_id_bytes := flowID[:] + err = readStmt.QueryRow(topic, flow_id_bytes).Scan( + &message.id, + ×tr, + &message_id_bytes, + &message.payload, + ) + if err != nil { + return messageT{}, err + } + message.uuid = guuid.UUID(message_id_bytes) + + message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return messageT{}, err + } + + return message, nil + } + + return fn, readStmt.Close, nil +} + +func nextSQL(prefix string) queryT { + const tmpl_read = ` + SELECT + "%s_messages".id, + "%s_messages".timestamp, + "%s_messages".uuid, + "%s_messages".flow_id, + "%s_payloads".payload + FROM "%s_messages" + JOIN "%s_payloads" ON + "%s_payloads".id = "%s_messages".payload_id + WHERE + "%s_payloads".topic = ? AND + "%s_messages".id NOT IN ( + SELECT message_id FROM "%s_offsets" + WHERE consumer = ? + ) + ORDER BY "%s_messages".id ASC + LIMIT 1; + ` + const tmpl_owner = ` + SELECT owner_id FROM "%s_owners" + WHERE + topic = ? AND + consumer = ?; + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + owner: fmt.Sprintf(tmpl_owner, prefix), + } +} + +func nextStmt( + db *sql.DB, + prefix string, + instanceID int, +) (func(string, string) (messageT, error), func() error, error) { + q := nextSQL(prefix) + + fn := func(topic string, consumer string) (messageT, error) { + message := messageT{ + topic: topic, + } + + var ( + err error + ownerID int + timestr string + message_id_bytes []byte + flow_id_bytes []byte + ) + tx, err := db.Begin() + if err != nil { + return messageT{}, err + } + defer tx.Rollback() + + err = tx.QueryRow(q.owner, topic, consumer).Scan(&ownerID) + if err != nil { + return messageT{}, err + } + + if ownerID != instanceID { + err := fmt.Errorf( + notOwnerErrorFmt, + ownerID, + topic, + consumer, + instanceID, + ) + return messageT{}, err + } + + err = tx.QueryRow(q.read, topic, consumer).Scan( + &message.id, + ×tr, + &message_id_bytes, + &flow_id_bytes, + &message.payload, + ) + if err != nil { + return messageT{}, err + } + message.uuid = guuid.UUID(message_id_bytes) + message.flowID = guuid.UUID(flow_id_bytes) + + message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return messageT{}, err + } + + return message, nil + } + + return fn, closeNoop, nil +} + +func messageEach(rows *sql.Rows, callback func(messageT) error) error { + if rows == nil { + return nil + } + + for rows.Next() { + var ( + message messageT + timestr string + message_id_bytes []byte + flow_id_bytes []byte + ) + err := rows.Scan( + &message.id, + ×tr, + &message_id_bytes, + &flow_id_bytes, + &message.topic, + &message.payload, + ) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + message.uuid = guuid.UUID(message_id_bytes) + message.flowID = guuid.UUID(flow_id_bytes) + + message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + + err = callback(message) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + } + + return g.WrapErrors(rows.Err(), rows.Close()) +} + +func pendingSQL(prefix string) queryT { + const tmpl_read = ` + SELECT + "%s_messages".id, + "%s_messages".timestamp, + "%s_messages".uuid, + "%s_messages".flow_id, + "%s_payloads".topic, + "%s_payloads".payload + FROM "%s_messages" + JOIN "%s_payloads" ON + "%s_payloads".id = "%s_messages".payload_id + WHERE + "%s_payloads".topic = ? AND + "%s_messages".id NOT IN ( + SELECT message_id FROM "%s_offsets" + WHERE consumer = ? + ) + ORDER BY "%s_messages".id ASC; + ` + const tmpl_owner = ` + SELECT owner_id FROM "%s_owners" + WHERE + topic = ? AND + consumer = ?; + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + owner: fmt.Sprintf(tmpl_owner, prefix), + } +} + +func pendingStmt( + db *sql.DB, + prefix string, + instanceID int, +) (func(string, string) (*sql.Rows, error), func() error, error) { + q := pendingSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + ownerStmt, err := db.Prepare(q.owner) + if err != nil { + return nil, nil, err + } + + fn := func(topic string, consumer string) (*sql.Rows, error) { + var ownerID int + err := ownerStmt.QueryRow(topic, consumer).Scan(&ownerID) + if err != nil { + return nil, err + } + + // best effort check, the final one is done during + // commit within a transaction + if ownerID != instanceID { + return nil, nil + } + + return readStmt.Query(topic, consumer) + } + + closeFn := func() error { + return g.SomeFnError(readStmt.Close, ownerStmt.Close) + } + + return fn, closeFn, nil +} + +func commitSQL(prefix string) queryT { + const tmpl_write = ` + INSERT INTO "%s_offsets" (consumer, message_id) + VALUES (?, (SELECT id FROM "%s_messages" WHERE uuid = ?)); + ` + const tmpl_read = ` + SELECT "%s_payloads".topic from "%s_payloads" + JOIN "%s_messages" ON + "%s_payloads".id = "%s_messages".payload_id + WHERE "%s_messages".uuid = ?; + ` + const tmpl_owner = ` + SELECT owner_id FROM "%s_owners" + WHERE + topic = ? AND + consumer = ?; + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix, prefix), + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + owner: fmt.Sprintf(tmpl_owner, prefix), + } +} + +func commitStmt( + db *sql.DB, + prefix string, + instanceID int, +) (func(string, guuid.UUID) error, func() error, error) { + q := commitSQL(prefix) + + fn := func(consumer string, messageID guuid.UUID) error { + message_id_bytes := messageID[:] + return inTx(db, func(ctx context.Context) error { + var topic string + err := db.QueryRowContext( + ctx, + q.read, + message_id_bytes, + ).Scan(&topic) + if err != nil { + return err + } + + var ownerID int + err = db.QueryRowContext( + ctx, + q.owner, + topic, + consumer, + ).Scan(&ownerID) + if err != nil { + return err + } + + if ownerID != instanceID { + return fmt.Errorf( + noLongerOwnerErrorFmt, + instanceID, + topic, + consumer, + ownerID, + ) + } + + _, err = db.ExecContext(ctx, q.write, consumer, message_id_bytes) + return err + }) + } + + return fn, closeNoop, nil +} + +func toDeadSQL(prefix string) queryT { + const tmpl_write = ` + INSERT INTO "%s_offsets" ( consumer, message_id) + VALUES ( ?, (SELECT id FROM "%s_messages" WHERE uuid = ?)); + + INSERT INTO "%s_deadletters" (uuid, consumer, message_id) + VALUES (?, ?, (SELECT id FROM "%s_messages" WHERE uuid = ?)); + ` + const tmpl_read = ` + SELECT "%s_payloads".topic FROM "%s_payloads" + JOIN "%s_messages" ON + "%s_payloads".id = "%s_messages".payload_id + WHERE "%s_messages".uuid = ?; + ` + const tmpl_owner = ` + SELECT owner_id FROM "%s_owners" + WHERE + topic = ? AND + consumer = ?; + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix, prefix, prefix, prefix), + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + owner: fmt.Sprintf(tmpl_owner, prefix), + } +} + +func toDeadStmt( + db *sql.DB, + prefix string, + instanceID int, +) ( + func(string, guuid.UUID, guuid.UUID) error, + func() error, + error, +) { + q := toDeadSQL(prefix) + + fn := func( + consumer string, + messageID guuid.UUID, + deadletterID guuid.UUID, + ) error { + message_id_bytes := messageID[:] + deadletter_id_bytes := deadletterID[:] + return inTx(db, func(ctx context.Context) error { + var topic string + err := db.QueryRowContext( + ctx, + q.read, + message_id_bytes, + ).Scan(&topic) + if err != nil { + return err + } + + var ownerID int + err = db.QueryRowContext( + ctx, + q.owner, + topic, + consumer, + ).Scan(&ownerID) + if err != nil { + return err + } + + if ownerID != instanceID { + return fmt.Errorf( + noLongerOwnerErrorFmt, + instanceID, + topic, + consumer, + ownerID, + ) + } + + _, err = db.ExecContext( + ctx, + q.write, + consumer, + message_id_bytes, + deadletter_id_bytes, + consumer, + message_id_bytes, + ) + return err + }) + } + + return fn, closeNoop, nil +} + +func replaySQL(prefix string) queryT { + const tmpl_write = ` + INSERT INTO "%s_messages" (uuid, flow_id, payload_id) + SELECT + ?, + "%s_messages".flow_id, + "%s_messages".payload_id + FROM "%s_messages" + JOIN "%s_deadletters" ON + "%s_messages".id = "%s_deadletters".message_id + WHERE "%s_deadletters".uuid = ?; + + INSERT INTO "%s_replays" (deadletter_id, message_id) + VALUES ( + (SELECT id FROM "%s_deadletters" WHERE uuid = ?), + last_insert_rowid() + ); + ` + const tmpl_read = ` + SELECT + "%s_messages".id, + "%s_messages".timestamp, + "%s_messages".flow_id, + "%s_payloads".topic, + "%s_payloads".payload + FROM "%s_messages" + JOIN "%s_payloads" ON + "%s_payloads".id = "%s_messages".payload_id + WHERE "%s_messages".uuid = ?; + ` + return queryT{ + write: fmt.Sprintf( + tmpl_write, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + +func replayStmt( + db *sql.DB, + prefix string, + _ int, +) (func(guuid.UUID, guuid.UUID) (messageT, error), func() error, error) { + q := replaySQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func( + deadletterID guuid.UUID, + messageID guuid.UUID, + ) (messageT, error) { + deadletter_id_bytes := deadletterID[:] + message_id_bytes := messageID[:] + err := inTx(db, func(ctx context.Context) error { + _, err := db.ExecContext( + ctx, + q.write, + message_id_bytes, + deadletter_id_bytes, + deadletter_id_bytes, + ) + return err + }) + if err != nil { + return messageT{}, err + } + + message := messageT{ + uuid: messageID, + } + + var ( + timestr string + flow_id_bytes []byte + ) + err = readStmt.QueryRow(message_id_bytes).Scan( + &message.id, + ×tr, + &flow_id_bytes, + &message.topic, + &message.payload, + ) + if err != nil { + return messageT{}, err + } + message.flowID = guuid.UUID(flow_id_bytes) + + message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return messageT{}, err + } + + return message, nil + } + + return fn, readStmt.Close, nil +} + +func oneDeadSQL(prefix string) queryT { + const tmpl_read = ` + SELECT + "%s_deadletters".uuid, + "%s_offsets".timestamp, + "%s_messages".uuid + FROM "%s_deadletters" + JOIN "%s_offsets" ON + "%s_deadletters".message_id = "%s_offsets".message_id + JOIN "%s_messages" ON + "%s_deadletters".message_id = "%s_messages".id + JOIN "%s_payloads" ON + "%s_messages".payload_id = "%s_payloads".id + WHERE + "%s_payloads".topic = ? AND + "%s_deadletters".consumer = ? AND + "%s_offsets".consumer = ? AND + "%s_deadletters".id NOT IN ( + SELECT deadletter_id FROM "%s_replays" + ) + ORDER BY "%s_deadletters".id ASC + LIMIT 1; + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + +func oneDeadStmt( + db *sql.DB, + prefix string, + _ int, +) (func(string, string) (deadletterT, error), func() error, error) { + q := oneDeadSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(topic string, consumer string) (deadletterT, error) { + deadletter := deadletterT{ + consumer: consumer, + } + + var ( + deadletter_id_bytes []byte + timestr string + message_id_bytes []byte + ) + err := readStmt.QueryRow(topic, consumer, consumer).Scan( + &deadletter_id_bytes, + ×tr, + &message_id_bytes, + ) + if err != nil { + return deadletterT{}, err + } + deadletter.uuid = guuid.UUID(deadletter_id_bytes) + deadletter.messageID = guuid.UUID(message_id_bytes) + + deadletter.timestamp, err = time.Parse( + time.RFC3339Nano, + timestr, + ) + if err != nil { + return deadletterT{}, err + } + + return deadletter, nil + } + + return fn, readStmt.Close, nil +} + +func deadletterEach( + rows *sql.Rows, + callback func(deadletterT, messageT) error, +) error { + for rows.Next() { + var ( + deadletter deadletterT + deadletter_id_bytes []byte + deadletterTimestr string + message messageT + messageTimestr string + message_id_bytes []byte + flow_id_bytes []byte + ) + err := rows.Scan( + &deadletter_id_bytes, + &message.id, + &deadletterTimestr, + &deadletter.consumer, + &messageTimestr, + &message_id_bytes, + &flow_id_bytes, + &message.topic, + &message.payload, + ) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + + deadletter.uuid = guuid.UUID(deadletter_id_bytes) + deadletter.messageID = guuid.UUID(message_id_bytes) + message.uuid = guuid.UUID(message_id_bytes) + message.flowID = guuid.UUID(flow_id_bytes) + + message.timestamp, err = time.Parse( + time.RFC3339Nano, + messageTimestr, + ) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + + deadletter.timestamp, err = time.Parse( + time.RFC3339Nano, + deadletterTimestr, + ) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + + err = callback(deadletter, message) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + } + + return g.WrapErrors(rows.Err(), rows.Close()) +} + +func allDeadSQL(prefix string) queryT { + const tmpl_read = ` + SELECT + "%s_deadletters".uuid, + "%s_deadletters".message_id, + "%s_offsets".timestamp, + "%s_offsets".consumer, + "%s_messages".timestamp, + "%s_messages".uuid, + "%s_messages".flow_id, + "%s_payloads".topic, + "%s_payloads".payload + FROM "%s_deadletters" + JOIN "%s_offsets" ON + "%s_deadletters".message_id = "%s_offsets".message_id + JOIN "%s_messages" ON + "%s_deadletters".message_id = "%s_messages".id + JOIN "%s_payloads" ON + "%s_messages".payload_id = "%s_payloads".id + WHERE + "%s_payloads".topic = ? AND + "%s_deadletters".consumer = ? AND + "%s_offsets".consumer = ? AND + "%s_deadletters".id NOT IN ( + SELECT deadletter_id FROM "%s_replays" + ) + ORDER BY "%s_deadletters".id ASC; + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + +func allDeadStmt( + db *sql.DB, + prefix string, + _ int, +) (func(string, string) (*sql.Rows, error), func() error, error) { + q := allDeadSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(topic string, consumer string) (*sql.Rows, error) { + return readStmt.Query(topic, consumer, consumer) + } + + return fn, readStmt.Close, nil +} + +func sizeSQL(prefix string) queryT { + const tmpl_read = ` + SELECT + COUNT(1) as size + FROM "%s_messages" + JOIN "%s_payloads" ON + "%s_messages".payload_id = "%s_payloads".id + WHERE "%s_payloads".topic = ?; + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + + +func sizeStmt( + db *sql.DB, + prefix string, + _ int, +) (func(string) (int, error), func() error, error) { + q := sizeSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(topic string) (int, error) { + var size int + err := readStmt.QueryRow(topic).Scan(&size) + if err != nil { + return -1, err + } + + return size, nil + } + + return fn, readStmt.Close, nil +} + +func countSQL(prefix string) queryT { + const tmpl_read = ` + SELECT + COUNT(1) as count + FROM "%s_messages" + JOIN "%s_offsets" ON + "%s_messages".id = "%s_offsets".message_id + JOIN "%s_payloads" ON + "%s_messages".payload_id = "%s_payloads".id + WHERE + "%s_payloads".topic = ? AND + "%s_offsets".consumer = ?; + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + +func countStmt( + db *sql.DB, + prefix string, + _ int, +) (func(string, string) (int, error), func() error, error) { + q := countSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(topic string, consumer string) (int, error) { + var count int + err := readStmt.QueryRow(topic, consumer).Scan(&count) + if err != nil { + return -1, err + } + + return count, nil + } + + return fn, readStmt.Close, nil +} + +func hasDataSQL(prefix string) queryT { + const tmpl_read = ` + SELECT 1 as data + FROM "%s_messages" + JOIN "%s_payloads" ON + "%s_payloads".id = "%s_messages".payload_id + WHERE + "%s_payloads".topic = ? AND + "%s_messages".id NOT IN ( + SELECT message_id FROM "%s_offsets" + WHERE consumer = ? + ) + LIMIT 1; + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + +func hasDataStmt( + db *sql.DB, + prefix string, + _ int, +) (func(string, string) (bool, error), func() error, error) { + q := hasDataSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(topic string, consumer string) (bool, error) { + var _x int + err := readStmt.QueryRow(topic, consumer).Scan(&_x) + if err == sql.ErrNoRows { + return false, nil + } + + if err != nil { + return false, err + } + + return true, nil + } + + return fn, readStmt.Close, nil +} + +func initDB( + db *sql.DB, + prefix string, + notifyFn func(messageT), + instanceID int, +) (queriesT, error) { + createTablesErr := createTables(db, prefix) + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + find, findClose, findErr := findStmt(db, prefix, instanceID) + next, nextClose, nextErr := nextStmt(db, prefix, instanceID) + pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID) + allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID) + size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID) + count, countClose, countErr := countStmt(db, prefix, instanceID) + hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID) + + err := g.SomeError( + createTablesErr, + takeErr, + publishErr, + findErr, + nextErr, + pendingErr, + commitErr, + toDeadErr, + replayErr, + oneDeadErr, + allDeadErr, + sizeErr, + countErr, + hasDataErr, + ) + if err != nil { + return queriesT{}, err + } + + close := func() error { + return g.SomeFnError( + takeClose, + publishClose, + findClose, + nextClose, + pendingClose, + commitClose, + toDeadClose, + replayClose, + oneDeadClose, + allDeadClose, + sizeClose, + countClose, + hasDataClose, + ) + } + + var connMutex sync.RWMutex + return queriesT{ + take: func(a string, b string) error { + connMutex.RLock() + defer connMutex.RUnlock() + return take(a, b) + }, + publish: func(a UnsentMessage, b guuid.UUID) (messageT, error) { + var ( + err error + message messageT + ) + { + connMutex.RLock() + defer connMutex.RUnlock() + message, err = publish(a, b) + } + if err != nil { + return messageT{}, err + } + + go notifyFn(message) + return message, nil + }, + find: func(a string, b guuid.UUID) (messageT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return find(a, b) + }, + next: func(a string, b string) (messageT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return next(a, b) + }, + pending: func( + a string, + b string, + callback func(messageT) error, + ) error { + var ( + err error + rows *sql.Rows + ) + { + connMutex.RLock() + defer connMutex.RUnlock() + rows, err = pending(a, b) + } + if err != nil { + return err + } + + return messageEach(rows, callback) + }, + commit: func(a string, b guuid.UUID) error { + connMutex.RLock() + defer connMutex.RUnlock() + return commit(a, b) + }, + toDead: func( + a string, + b guuid.UUID, + c guuid.UUID, + ) error { + connMutex.RLock() + defer connMutex.RUnlock() + return toDead(a, b, c) + }, + replay: func(a guuid.UUID, b guuid.UUID) (messageT, error) { + var ( + err error + message messageT + ) + { + connMutex.RLock() + defer connMutex.RUnlock() + message, err = replay(a, b) + } + if err != nil { + return messageT{}, err + } + + go notifyFn(message) + return message, nil + }, + oneDead: func(a string, b string) (deadletterT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return oneDead(a, b) + }, + allDead: func( + a string, + b string, + callback func(deadletterT, messageT) error, + ) error { + var ( + err error + rows *sql.Rows + ) + { + connMutex.RLock() + defer connMutex.RUnlock() + rows, err = allDead(a, b) + } + if err != nil { + return err + } + + return deadletterEach(rows, callback) + }, + size: func(a string) (int, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return size(a) + }, + count: func(a string, b string) (int, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return count(a, b) + }, + hasData: func(a string, b string) (bool, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return hasData(a, b) + }, + close: func() error { + connMutex.Lock() + defer connMutex.Unlock() + return close() + }, + }, nil +} + + +func newPinger[T any]() pingerT[T] { + channel := make(chan T, 1) + closed := false + var rwmutex sync.RWMutex + return pingerT[T]{ + tryPing: func(x T) { + rwmutex.RLock() + defer rwmutex.RUnlock() + if closed { + return + } + select { + case channel <- x: + default: + } + }, + onPing: func(cb func(T)) { + for x := range channel { + cb(x) + } + }, + closed: func() bool { + rwmutex.RLock() + defer rwmutex.RUnlock() + return closed + }, + close: func() { + rwmutex.Lock() + defer rwmutex.Unlock() + if closed { + return + } + close(channel) + closed = true + }, + } +} + +func makeSubscriptionsFuncs() subscriptionsT { + var rwmutex sync.RWMutex + subscriptions := subscriptionsSetM{} + return subscriptionsT{ + read: func(callback func(subscriptionsSetM) error) error { + rwmutex.RLock() + defer rwmutex.RUnlock() + return callback(subscriptions) + }, + write: func(callback func(subscriptionsSetM) error) error { + rwmutex.Lock() + defer rwmutex.Unlock() + return callback(subscriptions) + }, + } +} + +// Try notifying the consumer that they have data to work with. If they're +// already full, we simply drop the notification, as on each they'll look for +// all pending messages and process them all. So dropping the event here +// doesn't mean not notifying the consumer, but simply acknoledging that the +// existing notifications are enough for them to work with, without letting any +// message slip through. +func makeNotifyFn( + readFn func(func(subscriptionsSetM) error) error, + pinger pingerT[struct{}], +) func(messageT) { + return func(message messageT) { + readFn(func(set subscriptionsSetM) error { + topicSub, ok := set[message.topic] + if !ok { + return nil + } + + for _, consumer := range topicSub.consumers { + consumer.pinger.tryPing(struct{}{}) + } + waiters := topicSub.waiters[message.flowID] + for _, waiter := range waiters { + waiter.pinger.tryPing(message.payload) + } + return nil + }) + pinger.tryPing(struct{}{}) + } +} + +func collectClosedWaiters( + set subscriptionsSetM, +) map[string]map[guuid.UUID][]string { + waiters := map[string]map[guuid.UUID][]string{} + for topic, topicSub := range set { + waiters[topic] = map[guuid.UUID][]string{} + for flowID, waitersByName := range topicSub.waiters { + names := []string{} + for name, waiter := range waitersByName { + if (*waiter.closed)() { + names = append(names, name) + } + } + waiters[topic][flowID] = names + } + } + + return waiters +} + +func trimEmptyLeaves(closedWaiters map[string]map[guuid.UUID][]string) { + for topic, waiters := range closedWaiters { + for flowID, names := range waiters { + if len(names) == 0 { + delete(closedWaiters[topic], flowID) + } + } + if len(waiters) == 0 { + delete(closedWaiters, topic) + } + } +} + +func deleteIfEmpty(set subscriptionsSetM, topic string) { + topicSub, ok := set[topic] + if !ok { + return + } + + emptyConsumers := len(topicSub.consumers) == 0 + emptyWaiters := len(topicSub.waiters) == 0 + if emptyConsumers && emptyWaiters { + delete(set, topic) + } +} + +func deleteEmptyTopics(set subscriptionsSetM) { + for topic, _ := range set { + deleteIfEmpty(set, topic) + } +} + +func removeClosedWaiters( + set subscriptionsSetM, + closedWaiters map[string]map[guuid.UUID][]string, +) { + for topic, waiters := range closedWaiters { + _, ok := set[topic] + if !ok { + continue + } + + for flowID, names := range waiters { + if set[topic].waiters[flowID] == nil { + continue + } + for _, name := range names { + delete(set[topic].waiters[flowID], name) + } + if len(set[topic].waiters[flowID]) == 0 { + delete(set[topic].waiters, flowID) + } + } + } + + deleteEmptyTopics(set) +} + +func reapClosedWaiters( + readFn func(func(subscriptionsSetM) error) error, + writeFn func(func(subscriptionsSetM) error) error, +) { + var closedWaiters map[string]map[guuid.UUID][]string + readFn(func(set subscriptionsSetM) error { + closedWaiters = collectClosedWaiters(set) + return nil + }) + + trimEmptyLeaves(closedWaiters) + if len(closedWaiters) == 0 { + return + } + + writeFn(func(set subscriptionsSetM) error { + removeClosedWaiters(set, closedWaiters) + return nil + }) +} + +func everyNthCall[T any](n int, fn func(T)) func(T) { + i := 0 + return func(x T) { + i++ + if i == n { + i = 0 + fn(x) + } + } +} + +func runReaper( + onPing func(func(struct{})), + readFn func(func(subscriptionsSetM) error) error, + writeFn func(func(subscriptionsSetM) error) error, +) { + onPing(everyNthCall(reaperSkipCount, func(struct{}) { + reapClosedWaiters(readFn, writeFn) + })) +} + +func NewWithPrefix(db *sql.DB, prefix string) (IQueue, error) { + err := g.ValidateSQLTablePrefix(prefix) + if err != nil { + return queueT{}, err + } + + subscriptions := makeSubscriptionsFuncs() + pinger := newPinger[struct{}]() + notifyFn := makeNotifyFn(subscriptions.read, pinger) + queries, err := initDB(db, prefix, notifyFn, os.Getpid()) + if err != nil { + return queueT{}, err + } + + go runReaper(pinger.onPing, subscriptions.read, subscriptions.write) + + return queueT{ + queries: queries, + subscriptions: subscriptions, + pinger: pinger, + }, nil +} + +func New(db *sql.DB) (IQueue, error) { + return NewWithPrefix(db, defaultPrefix) +} + +func asPublicMessage(message messageT) Message { + return Message{ + ID: message.uuid, + Timestamp: message.timestamp, + Topic: message.topic, + FlowID: message.flowID, + Payload: message.payload, + } +} + +func (queue queueT) Publish(unsent UnsentMessage) (Message, error) { + message, err := queue.queries.publish(unsent, guuid.New()) + return asPublicMessage(message), err +} + +func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error { + topicSub := topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + } + + return func(set subscriptionsSetM) error { + topic := consumer.data.topic + _, ok := set[topic] + if !ok { + set[topic] = topicSub + } + set[topic].consumers[consumer.data.name] = consumer + + return nil + } +} + +func registerWaiterFn(waiter waiterT) func(subscriptionsSetM) error { + topicSub := topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + } + waiters := map[string]waiterT{} + + return func(set subscriptionsSetM) error { + var ( + topic = waiter.data.topic + flowID = waiter.data.flowID + ) + _, ok := set[topic] + if !ok { + set[topic] = topicSub + } + if set[topic].waiters[flowID] == nil { + set[topic].waiters[flowID] = waiters + } + set[topic].waiters[flowID][waiter.data.name] = waiter + return nil + } +} + +func makeConsumeOneFn( + data consumerDataT, + callback func(Message) error, + successFn func(string, guuid.UUID) error, + errorFn func(string, guuid.UUID, guuid.UUID) error, +) func(messageT) error { + return func(message messageT) error { + err := callback(asPublicMessage(message)) + if err != nil { + g.Info( + "consumer failed", "q-consumer", + "topic", data.topic, + "consumer", data.name, + "error", err, + slog.Group( + "message", + "id", message.id, + "flow-id", message.flowID.String(), + ), + ) + + return errorFn(data.name, message.uuid, guuid.New()) + } + + return successFn(data.name, message.uuid) + } +} + +func makeConsumeAllFn( + data consumerDataT, + consumeOneFn func(messageT) error, + eachFn func(string, string, func(messageT) error) error, +) func(struct{}) { + return func(struct{}) { + err := eachFn(data.topic, data.name, consumeOneFn) + if err != nil { + g.Warning( + "eachFn failed", "q-consume-all", + "topic", data.topic, + "consumer", data.name, + "error", err, + "circuit-breaker-enabled?", false, + ) + } + } +} + +func makeWaitFn(channel chan []byte, closeFn func()) func([]byte) { + closed := false + var mutex sync.Mutex + return func(payload []byte) { + mutex.Lock() + defer mutex.Unlock() + if closed { + return + } + + closeFn() + channel <- payload + close(channel) + closed = true + } +} + +func runConsumer(onPing func(func(struct{})), consumeAllFn func(struct{})) { + consumeAllFn(struct{}{}) + onPing(consumeAllFn) +} + +func tryFinding( + findFn func(string, guuid.UUID) (messageT, error), + topic string, + flowID guuid.UUID, + waitFn func([]byte), +) { + message, err := findFn(topic, flowID) + if err != nil { + return + } + + waitFn(message.payload) +} + +func (queue queueT) Subscribe( + topic string, + name string, + callback func(Message) error, +) error { + data := consumerDataT{ + topic: topic, + name: name, + } + pinger := newPinger[struct{}]() + consumer := consumerT{ + data: data, + callback: callback, + pinger: pinger, + close: &pinger.close, + } + consumeOneFn := makeConsumeOneFn( + consumer.data, + consumer.callback, + queue.queries.commit, + queue.queries.toDead, + ) + consumeAllFn := makeConsumeAllFn( + consumer.data, + consumeOneFn, + queue.queries.pending, + ) + + err := queue.queries.take(topic, name) + if err != nil { + return err + } + + queue.subscriptions.write(registerConsumerFn(consumer)) + go runConsumer(pinger.onPing, consumeAllFn) + return nil +} + +type Waiter struct{ + Channel <-chan []byte + Close func() +} + +func (queue queueT) WaitFor( + topic string, + flowID guuid.UUID, + name string, +) Waiter { + data := waiterDataT{ + topic: topic, + flowID: flowID, + name: name, + } + pinger := newPinger[[]byte]() + waiter := waiterT{ + data: data, + pinger: pinger, + closed: &pinger.closed, + close: &pinger.close, + } + channel := make(chan []byte, 1) + waitFn := makeWaitFn(channel, (*waiter.close)) + closeFn := func() { + queue.subscriptions.read(func(set subscriptionsSetM) error { + (*set[topic].waiters[flowID][name].close)() + return nil + }) + } + + queue.subscriptions.write(registerWaiterFn(waiter)) + tryFinding(queue.queries.find, topic, flowID, waitFn) + go pinger.onPing(waitFn) + return Waiter{channel, closeFn} +} + +func unsubscribeIfExistsFn( + topic string, + name string, +) func(subscriptionsSetM) error { + return func(set subscriptionsSetM) error { + topicSub, ok := set[topic] + if !ok { + return nil + } + + consumer, ok := topicSub.consumers[name] + if !ok { + return nil + } + + (*consumer.close)() + delete(set[topic].consumers, name) + deleteIfEmpty(set, topic) + return nil + } +} + +func (queue queueT) Unsubscribe(topic string, name string) { + queue.subscriptions.write(unsubscribeIfExistsFn(topic, name)) +} + +func cleanSubscriptions(set subscriptionsSetM) error { + for _, topicSub := range set { + for _, consumer := range topicSub.consumers { + (*consumer.close)() + } + for _, waiters := range topicSub.waiters { + for _, waiter := range waiters { + (*waiter.close)() + } + } + } + return nil +} + +func (queue queueT) Close() error { + queue.pinger.close() + return g.WrapErrors( + queue.subscriptions.write(cleanSubscriptions), + queue.queries.close(), + ) +} + + +func topicGetopt(args argsT, w io.Writer) (argsT, bool) { + if len(args.args) == 0 { + fmt.Fprintf(w, "Missing TOPIC.\n") + return args, false + } + + args.topic = args.args[0] + return args, true +} + +func topicConsumerGetopt(args argsT, w io.Writer) (argsT, bool) { + fs := flag.NewFlagSet("", flag.ContinueOnError) + fs.Usage = func() {} + fs.SetOutput(w) + + consumer := fs.String( + "C", + "default-consumer", + "The name of the consumer to be used", + ) + + if fs.Parse(args.args) != nil { + return args, false + } + + subArgs := fs.Args() + if len(subArgs) == 0 { + fmt.Fprintf(w, "Missing TOPIC.\n") + return args, false + } + + args.consumer = *consumer + args.topic = subArgs[0] + return args, true +} + +func inExec( + args argsT, + queries queriesT, + r io.Reader, + w io.Writer, +) (int, error) { + payload, err := io.ReadAll(r) + if err != nil { + return 1, err + } + + unsent := UnsentMessage{ + Topic: args.topic, + FlowID: guuid.New(), + Payload: payload, + } + message, err := queries.publish(unsent, guuid.New()) + if err != nil { + return 1, err + } + + fmt.Fprintf(w, "%s\n", message.uuid.String()) + + return 0, nil +} + +func outExec( + args argsT, + queries queriesT, + _ io.Reader, + w io.Writer, +) (int, error) { + err := queries.take(args.topic, args.consumer) + if err != nil { + return 1, err + } + + message, err := queries.next(args.topic, args.consumer) + + if err == sql.ErrNoRows { + return 3, nil + } + + if err != nil { + return 1, err + } + + fmt.Fprintln(w, string(message.payload)) + + return 0, nil +} + +func commitExec( + args argsT, + queries queriesT, + r io.Reader, + w io.Writer, +) (int, error) { + err := queries.take(args.topic, args.consumer) + if err != nil { + return 1, err + } + + message, err := queries.next(args.topic, args.consumer) + if err != nil { + return 1, err + } + + err = queries.commit(args.consumer, message.uuid) + if err != nil { + return 1, err + } + + return 0, nil +} + +func deadExec( + args argsT, + queries queriesT, + r io.Reader, + w io.Writer, +) (int, error) { + err := queries.take(args.topic, args.consumer) + if err != nil { + return 1, err + } + + message, err := queries.next(args.topic, args.consumer) + if err != nil { + return 1, err + } + + err = queries.toDead(args.consumer, message.uuid, guuid.New()) + if err != nil { + return 1, err + } + + return 0, nil +} + +func listDeadExec( + args argsT, + queries queriesT, + r io.Reader, + w io.Writer, +) (int, error) { + eachFn := func(deadletter deadletterT, _ messageT) error { + fmt.Fprintf( + w, + "%s\t%s\t%s\n", + deadletter.uuid.String(), + deadletter.timestamp.Format(time.RFC3339), + deadletter.consumer, + ) + return nil + } + + err := queries.allDead(args.topic, args.consumer, eachFn) + if err != nil { + return 1, err + } + + return 0, nil +} + +func replayExec( + args argsT, + queries queriesT, + r io.Reader, + w io.Writer, +) (int, error) { + deadletter, err := queries.oneDead(args.topic, args.consumer) + if err != nil { + return 1, err + } + + _, err = queries.replay(deadletter.uuid, guuid.New()) + if err != nil { + return 1, err + } + + return 0, nil +} + +func sizeExec( + args argsT, + queries queriesT, + r io.Reader, + w io.Writer, +) (int, error) { + size, err := queries.size(args.topic) + if err != nil { + return 1, err + } + + fmt.Fprintln(w, size) + + return 0, nil +} + +func countExec( + args argsT, + queries queriesT, + r io.Reader, + w io.Writer, +) (int, error) { + count, err := queries.count(args.topic, args.consumer) + if err != nil { + return 1, err + } + + fmt.Fprintln(w, count) + + return 0, nil +} + +func hasDataExec( + args argsT, + queries queriesT, + r io.Reader, + w io.Writer, +) (int, error) { + hasData, err := queries.hasData(args.topic, args.consumer) + if err != nil { + return 1, err + } + + if hasData { + return 0, nil + } else { + return 1, nil + } +} + +func usage(argv0 string, w io.Writer) { + fmt.Fprintf( + w, + "Usage: %s [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n", + argv0, + ) +} + +func getopt( + allArgs []string, + commandsMap map[string]commandT, + w io.Writer, +) (argsT, commandT, int) { + argv0 := allArgs[0] + argv := allArgs[1:] + fs := flag.NewFlagSet("", flag.ContinueOnError) + fs.Usage = func() {} + fs.SetOutput(w) + databasePath := fs.String( + "f", + "q.db", + "The path to the file where the queue is kept", + ) + prefix := fs.String( + "p", + defaultPrefix, + "The q prefix of the table names", + ) + if fs.Parse(argv) != nil { + usage(argv0, w) + return argsT{}, commandT{}, 2 + } + + subArgs := fs.Args() + if len(subArgs) == 0 { + fmt.Fprintf(w, "Missing COMMAND.\n") + usage(argv0, w) + return argsT{}, commandT{}, 2 + } + + args := argsT{ + databasePath: *databasePath, + prefix: *prefix, + command: subArgs[0], + allArgs: allArgs, + args: subArgs[1:], + } + + command := commandsMap[args.command] + if command.name == "" { + fmt.Fprintf(w, "Bad COMMAND: \"%s\".\n", args.command) + usage(allArgs[0], w) + return argsT{}, commandT{}, 2 + } + + args, ok := command.getopt(args, w) + if !ok { + usage(allArgs[0], w) + return argsT{}, commandT{}, 2 + } + + return args, command, 0 +} + +func runCommand( + args argsT, + command commandT, + stdin io.Reader, + stdout io.Writer, + stderr io.Writer, +) int { + db, err := sql.Open("acude", args.databasePath) + if err != nil { + fmt.Fprintln(stderr, err) + return 1 + } + defer db.Close() + + iqueue, err := NewWithPrefix(db, args.prefix) + if err != nil { + fmt.Fprintln(stderr, err) + return 1 + } + defer iqueue.Close() + + rc, err := command.exec(args, iqueue.(queueT).queries, stdin, stdout) + if err != nil { + fmt.Fprintln(stderr, err) + } + + return rc +} + +var commands = map[string]commandT { + "in": commandT{ + name: "in", + getopt: topicGetopt, + exec: inExec, + }, + "out": commandT{ + name: "out", + getopt: topicConsumerGetopt, + exec: outExec, + }, + "commit": commandT{ + name: "commit", + getopt: topicConsumerGetopt, + exec: commitExec, + }, + "dead": commandT{ + name: "dead", + getopt: topicConsumerGetopt, + exec: deadExec, + }, + "ls-dead": commandT{ + name: "ls-dead", + getopt: topicConsumerGetopt, + exec: listDeadExec, + }, + "replay": commandT{ + name: "replay", + getopt: topicConsumerGetopt, + exec: replayExec, + }, + "size": commandT{ + name: "size", + getopt: topicGetopt, + exec: sizeExec, + }, + "count": commandT{ + name: "count", + getopt: topicConsumerGetopt, + exec: countExec, + }, + "has-data": commandT{ + name: "has-data", + getopt: topicConsumerGetopt, + exec: hasDataExec, + }, +} + + + +func Main() { + g.Init() + args, command, rc := getopt(os.Args, commands, os.Stderr) + if rc != 0 { + os.Exit(2) + } + os.Exit(runCommand(args, command, os.Stdin, os.Stdout, os.Stderr)) +} diff --git a/tests/benchmarks/deadletters/main.go b/tests/benchmarks/deadletters/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/deadletters/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/deadletters/q.go b/tests/benchmarks/deadletters/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/deadletters/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/lookup/main.go b/tests/benchmarks/lookup/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/lookup/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/lookup/q.go b/tests/benchmarks/lookup/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/lookup/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/multiple-consumers/main.go b/tests/benchmarks/multiple-consumers/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/multiple-consumers/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/multiple-consumers/q.go b/tests/benchmarks/multiple-consumers/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/multiple-consumers/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/multiple-produces/main.go b/tests/benchmarks/multiple-produces/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/multiple-produces/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/multiple-produces/q.go b/tests/benchmarks/multiple-produces/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/multiple-produces/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/reaper/main.go b/tests/benchmarks/reaper/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/reaper/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/reaper/q.go b/tests/benchmarks/reaper/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/reaper/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/replay/main.go b/tests/benchmarks/replay/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/replay/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/replay/q.go b/tests/benchmarks/replay/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/replay/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/single-consumer/main.go b/tests/benchmarks/single-consumer/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/single-consumer/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/single-consumer/q.go b/tests/benchmarks/single-consumer/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/single-consumer/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/single-producer/main.go b/tests/benchmarks/single-producer/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/single-producer/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/single-producer/q.go b/tests/benchmarks/single-producer/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/single-producer/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/subscribe/main.go b/tests/benchmarks/subscribe/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/subscribe/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/subscribe/q.go b/tests/benchmarks/subscribe/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/subscribe/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/unsubscribe/main.go b/tests/benchmarks/unsubscribe/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/unsubscribe/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/unsubscribe/q.go b/tests/benchmarks/unsubscribe/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/unsubscribe/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/benchmarks/waiter/main.go b/tests/benchmarks/waiter/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/benchmarks/waiter/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/waiter/q.go b/tests/benchmarks/waiter/q.go new file mode 100644 index 0000000..0c203bb --- /dev/null +++ b/tests/benchmarks/waiter/q.go @@ -0,0 +1,24 @@ +package q + +import ( + "flag" + "time" +) + + + +var nFlag = flag.Int( + "n", + 1_000, + "The number of iterations to execute", +) + +func MainTest() { + // FIXME + flag.Parse() + n := *nFlag + + for i := 0; i < n; i++ { + time.Sleep(time.Millisecond * 1) + } +} diff --git a/tests/functional/consume-one-produce-many/main.go b/tests/functional/consume-one-produce-many/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/consume-one-produce-many/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/consume-one-produce-many/q.go b/tests/functional/consume-one-produce-many/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/consume-one-produce-many/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/consumer-with-deadletter/main.go b/tests/functional/consumer-with-deadletter/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/consumer-with-deadletter/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go new file mode 100644 index 0000000..e1462d7 --- /dev/null +++ b/tests/functional/consumer-with-deadletter/q.go @@ -0,0 +1,97 @@ +package q + +import ( + "database/sql" + "errors" + "os" + "runtime" + + _ "acudego" + g "gobang" + "guuid" +) + + + +const ( + topicX = "new-event-x" + topicY = "new-event-y" +) + +var forbidden3Err = errors.New("we don't like 3") + + + +func processNewEventXToY(message Message) (UnsentMessage, error) { + payload := string(message.Payload) + if payload == "event 3" { + return UnsentMessage{}, forbidden3Err + } + + newPayload := []byte("processed " + payload) + unsent := UnsentMessage{ + Topic: topicY, + FlowID: message.FlowID, + Payload: newPayload, + } + return unsent, nil +} + + + +func MainTest() { + g.SetLevel(g.LevelNone) + + _, file, _, ok := runtime.Caller(0) + g.TAssertEqualS(ok, true, "can't get filename") + + databasePath := file + ".db" + os.Remove(databasePath) + os.Remove(databasePath + "-shm") + os.Remove(databasePath + "-wal") + + db, err := sql.Open("acude", databasePath) + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + defer queue.Close() + + + pub := func(payload []byte, flowID guuid.UUID) { + unsent := UnsentMessage{ + Topic: topicX, + FlowID: flowID, + Payload: payload, + } + _, err := queue.Publish(unsent) + g.TErrorIf(err) + } + + + g.Testing("we can WaitFor() a message after a deadletter", func() { + flowID := guuid.New() + + handlerFn := func(message Message) error { + messageY, err := processNewEventXToY(message) + if err != nil { + return err + } + + _, err = queue.Publish(messageY) + return err + } + queue.Subscribe(topicX, "main-worker", handlerFn) + defer queue.Unsubscribe(topicX, "main-worker") + + pub([]byte("event 1"), guuid.New()) + pub([]byte("event 2"), guuid.New()) + pub([]byte("event 3"), guuid.New()) + pub([]byte("event 4"), guuid.New()) + pub([]byte("event 5"), flowID) + + given := <- queue.WaitFor(topicY, flowID, "waiter").Channel + g.TAssertEqual(given, []byte("processed event 5")) + }) +} diff --git a/tests/functional/custom-prefix/main.go b/tests/functional/custom-prefix/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/custom-prefix/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/custom-prefix/q.go b/tests/functional/custom-prefix/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/custom-prefix/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/distinct-consumers-separate-instances/main.go b/tests/functional/distinct-consumers-separate-instances/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/distinct-consumers-separate-instances/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/distinct-consumers-separate-instances/q.go b/tests/functional/distinct-consumers-separate-instances/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/distinct-consumers-separate-instances/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/flow-id/main.go b/tests/functional/flow-id/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/flow-id/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/flow-id/q.go b/tests/functional/flow-id/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/flow-id/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/idempotency/main.go b/tests/functional/idempotency/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/idempotency/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/idempotency/q.go b/tests/functional/idempotency/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/idempotency/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/new-instance-takeover/main.go b/tests/functional/new-instance-takeover/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/new-instance-takeover/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go new file mode 100644 index 0000000..a678415 --- /dev/null +++ b/tests/functional/new-instance-takeover/q.go @@ -0,0 +1,127 @@ +package q + +import ( +"fmt" + "database/sql" + "runtime" + "os" + + g "gobang" + "guuid" +) + + + +const topic = "topic" + + + +func pub(queue IQueue, topic string, flowID guuid.UUID) { + unsent := UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: []byte{}, + } + _, err := queue.Publish(unsent) + g.TErrorIf(err) +} + +func handlerFn(publish func(guuid.UUID)) func(Message) error { + return func(message Message) error { + publish(message.FlowID) + return nil + } +} + +func startInstance( + databasePath string, + instanceID int, + name string, +) (*sql.DB, IQueue, error) { + db, err := sql.Open("acude", databasePath) + g.TErrorIf(err) + + iqueue, err := New(db) + g.TErrorIf(err) + queue := iqueue.(queueT) + + notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger) + queries, err := initDB(db, defaultPrefix, notifyFn, instanceID) + g.TErrorIf(err) + + err = queue.queries.close() + g.TErrorIf(err) + + queue.queries = queries + + pub_ := func(topic string) func(guuid.UUID) { + return func(flowID guuid.UUID) { + pub(queue, topic, flowID) + } + } + + individual := "individual-" + name + shared := "shared" + + queue.Subscribe(topic, individual, handlerFn(pub_(individual))) + queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name))) + + return db, queue, nil +} + + + +func MainTest() { + // https://sqlite.org/forum/forumpost/2507664507 + g.Init() + + _, file, _, ok := runtime.Caller(0) + g.TAssertEqualS(ok, true, "can't get filename") + + dbpath := file + ".db" + dbpath = "/mnt/dois/andreh/t.db" + os.Remove(dbpath) + os.Remove(dbpath + "-shm") + os.Remove(dbpath + "-wal") + + instanceID1 := os.Getpid() + instanceID2 := instanceID1 + 1 + + flowID1 := guuid.New() + flowID2 := guuid.New() + + g.Testing("new instances take ownership of topic+name combo", func() { + if false { + fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1) + } + + db, q1, err := startInstance(dbpath, instanceID1, "first") + g.TErrorIf(err) + defer db.Close() + defer q1.Close() + + pub(q1, topic, guuid.New()) + pub(q1, topic, guuid.New()) + pub(q1, topic, flowID1) + + <- q1.WaitFor("individual-first", flowID1, "w").Channel + <- q1.WaitFor( "shared-first", flowID1, "w").Channel + // println("waited 1") + + db, q2, err := startInstance(dbpath, instanceID2, "second") + g.TErrorIf(err) + defer db.Close() + defer q2.Close() + + <- q2.WaitFor("individual-second", flowID1, "w").Channel + + pub(q2, topic, guuid.New()) + pub(q2, topic, guuid.New()) + pub(q2, topic, flowID2) + + // FIXME: notify multiple instances so we can add this: + // <- q2.WaitFor("individual-first", flowID2, "w").Channel + <- q2.WaitFor("individual-second", flowID2, "w").Channel + <- q2.WaitFor( "shared-second", flowID2, "w").Channel + }) +} diff --git a/tests/functional/wait-after-publish/main.go b/tests/functional/wait-after-publish/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/wait-after-publish/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go new file mode 100644 index 0000000..701258a --- /dev/null +++ b/tests/functional/wait-after-publish/q.go @@ -0,0 +1,64 @@ +package q + +import ( + "database/sql" + "os" + "runtime" + + g "gobang" + "guuid" +) + + + +const topic = "topic" + + + +func MainTest() { + _, file, _, ok := runtime.Caller(0) + g.TAssertEqualS(ok, true, "can't get filename") + + databasePath := file + ".db" + os.Remove(databasePath) + os.Remove(databasePath + "-shm") + os.Remove(databasePath + "-wal") + + db, err := sql.Open("acude", databasePath) + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + defer queue.Close() + + pub := func(flowID guuid.UUID, payload []byte) { + unsent := UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + _, err := queue.Publish(unsent) + g.TErrorIf(err) + } + + + g.Testing("we can WaitFor() a message before its publishing", func() { + flowID := guuid.New() + waiter := queue.WaitFor(topic, flowID, "waiter").Channel + + pub(flowID, []byte("payload before")) + + given := <- waiter + g.TAssertEqual(given, []byte("payload before")) + }) + + g.Testing("we can also do it after its publishing", func() { + flowID := guuid.New() + + pub(flowID, []byte("payload after")) + + given := <- queue.WaitFor(topic, flowID, "waiter").Channel + g.TAssertEqual(given, []byte("payload after")) + }) +} diff --git a/tests/functional/waiter/main.go b/tests/functional/waiter/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/waiter/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/waiter/q.go b/tests/functional/waiter/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/waiter/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/fuzz/api-check/main.go b/tests/fuzz/api-check/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/fuzz/api-check/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/fuzz/api-check/q.go b/tests/fuzz/api-check/q.go new file mode 100644 index 0000000..f8b2ab4 --- /dev/null +++ b/tests/fuzz/api-check/q.go @@ -0,0 +1,35 @@ +package q + +import ( + "os" + "testing" + "testing/internal/testdeps" +) + + + +func api(f *testing.F) { + f.Fuzz(func(t *testing.T, n int) { + // FIXME + if n > 1 { + if n < 2 { + t.Errorf("Failed n: %v\n", n) + } + } + }) +} + + + +func MainTest() { + fuzzTargets := []testing.InternalFuzzTarget{ + { "api", api }, + } + + deps := testdeps.TestDeps{} + tests := []testing.InternalTest {} + benchmarks := []testing.InternalBenchmark{} + examples := []testing.InternalExample {} + m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) + os.Exit(m.Run()) +} diff --git a/tests/fuzz/cli-check/main.go b/tests/fuzz/cli-check/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/fuzz/cli-check/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/fuzz/cli-check/q.go b/tests/fuzz/cli-check/q.go new file mode 100644 index 0000000..5c872e5 --- /dev/null +++ b/tests/fuzz/cli-check/q.go @@ -0,0 +1,35 @@ +package q + +import ( + "os" + "testing" + "testing/internal/testdeps" +) + + + +func queries(f *testing.F) { + f.Fuzz(func(t *testing.T, n int) { + if n > 154 { + if n < 155 { + t.Errorf("Failed n: %v\n", n) + } + } + }) +} + + + +func MainTest() { + // FIXME + fuzzTargets := []testing.InternalFuzzTarget{ + { "queries", queries }, + } + + deps := testdeps.TestDeps{} + tests := []testing.InternalTest {} + benchmarks := []testing.InternalBenchmark{} + examples := []testing.InternalExample {} + m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) + os.Exit(m.Run()) +} diff --git a/tests/fuzz/equal-produced-consumed-order-check/main.go b/tests/fuzz/equal-produced-consumed-order-check/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/fuzz/equal-produced-consumed-order-check/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/fuzz/equal-produced-consumed-order-check/q.go b/tests/fuzz/equal-produced-consumed-order-check/q.go new file mode 100644 index 0000000..8b95aef --- /dev/null +++ b/tests/fuzz/equal-produced-consumed-order-check/q.go @@ -0,0 +1,35 @@ +package q + +import ( + "os" + "testing" + "testing/internal/testdeps" +) + + + +func queries(f *testing.F) { + f.Fuzz(func(t *testing.T, n int) { + if n > 154 { + if n < 155 { + t.Errorf("Failed n: %v\n", n) + } + } + }) +} + + + +func MainTest() { + // FIXME: produced order is identical to consumed order + fuzzTargets := []testing.InternalFuzzTarget{ + { "queries", queries }, + } + + deps := testdeps.TestDeps{} + tests := []testing.InternalTest {} + benchmarks := []testing.InternalBenchmark{} + examples := []testing.InternalExample {} + m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) + os.Exit(m.Run()) +} diff --git a/tests/fuzz/exactly-once-check/main.go b/tests/fuzz/exactly-once-check/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/fuzz/exactly-once-check/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/fuzz/exactly-once-check/q.go b/tests/fuzz/exactly-once-check/q.go new file mode 100644 index 0000000..d483ad8 --- /dev/null +++ b/tests/fuzz/exactly-once-check/q.go @@ -0,0 +1,35 @@ +package q + +import ( + "os" + "testing" + "testing/internal/testdeps" +) + + + +func queries(f *testing.F) { + f.Fuzz(func(t *testing.T, n int) { + if n > 154 { + if n < 155 { + t.Errorf("Failed n: %v\n", n) + } + } + }) +} + + + +func MainTest() { + // FIXME: a message is consumed exactly once + fuzzTargets := []testing.InternalFuzzTarget{ + { "queries", queries }, + } + + deps := testdeps.TestDeps{} + tests := []testing.InternalTest {} + benchmarks := []testing.InternalBenchmark{} + examples := []testing.InternalExample {} + m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) + os.Exit(m.Run()) +} diff --git a/tests/fuzz/queries-check/main.go b/tests/fuzz/queries-check/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/fuzz/queries-check/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/fuzz/queries-check/q.go b/tests/fuzz/queries-check/q.go new file mode 100644 index 0000000..5c872e5 --- /dev/null +++ b/tests/fuzz/queries-check/q.go @@ -0,0 +1,35 @@ +package q + +import ( + "os" + "testing" + "testing/internal/testdeps" +) + + + +func queries(f *testing.F) { + f.Fuzz(func(t *testing.T, n int) { + if n > 154 { + if n < 155 { + t.Errorf("Failed n: %v\n", n) + } + } + }) +} + + + +func MainTest() { + // FIXME + fuzzTargets := []testing.InternalFuzzTarget{ + { "queries", queries }, + } + + deps := testdeps.TestDeps{} + tests := []testing.InternalTest {} + benchmarks := []testing.InternalBenchmark{} + examples := []testing.InternalExample {} + m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) + os.Exit(m.Run()) +} diff --git a/tests/fuzz/total-order-check/main.go b/tests/fuzz/total-order-check/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/fuzz/total-order-check/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/fuzz/total-order-check/q.go b/tests/fuzz/total-order-check/q.go new file mode 100644 index 0000000..67af544 --- /dev/null +++ b/tests/fuzz/total-order-check/q.go @@ -0,0 +1,35 @@ +package q + +import ( + "os" + "testing" + "testing/internal/testdeps" +) + + + +func queries(f *testing.F) { + f.Fuzz(func(t *testing.T, n int) { + if n > 154 { + if n < 155 { + t.Errorf("Failed n: %v\n", n) + } + } + }) +} + + + +func MainTest() { + // FIXME: a consumer gets the messages in total order + fuzzTargets := []testing.InternalFuzzTarget{ + { "queries", queries }, + } + + deps := testdeps.TestDeps{} + tests := []testing.InternalTest {} + benchmarks := []testing.InternalBenchmark{} + examples := []testing.InternalExample {} + m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) + os.Exit(m.Run()) +} diff --git a/tests/liteq.go b/tests/liteq.go deleted file mode 100644 index 86d1bef..0000000 --- a/tests/liteq.go +++ /dev/null @@ -1,35 +0,0 @@ -package q - -import ( - "os" - "testing" - "testing/internal/testdeps" - - g "gobang" -) - - - -func TestX(t *testing.T) { - g.AssertEqual(1, 1) -} - - - -func MainTest() { - tests := []testing.InternalTest { - { "TestX", TestX }, - } - - benchmarks := []testing.InternalBenchmark {} - fuzzTargets := []testing.InternalFuzzTarget {} - examples := []testing.InternalExample {} - m := testing.MainStart( - testdeps.TestDeps {}, - tests, - benchmarks, - fuzzTargets, - examples, - ) - os.Exit(m.Run()) -} diff --git a/tests/q.go b/tests/q.go new file mode 100644 index 0000000..6b9e422 --- /dev/null +++ b/tests/q.go @@ -0,0 +1,5776 @@ +package q + +import ( + "bytes" + "database/sql" + "errors" + "fmt" + "io" + "log/slog" + "os" + "reflect" + "sort" + "strings" + "sync" + "time" + + "acudego" + "guuid" + g "gobang" +) + + + +func test_defaultPrefix() { + g.TestStart("defaultPrefix") + + g.Testing("the defaultPrefix is valid", func() { + g.TErrorIf(g.ValidateSQLTablePrefix(defaultPrefix)) + }) +} + +func test_inTx() { + /* + // FIXME + g.TestStart("inTx()") + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + + g.Testing("when fn() errors, we propagate it", func() { + myError := errors.New("to be propagated") + err := inTx(db, func(tx *sql.Tx) error { + return myError + }) + g.TAssertEqual(err, myError) + }) + + g.Testing("on nil error we get nil", func() { + err := inTx(db, func(tx *sql.Tx) error { + return nil + }) + g.TErrorIf(err) + }) + */ +} + +func test_createTables() { + g.TestStart("createTables()") + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + + g.Testing("tables exist afterwards", func() { + tmpl := ` + SELECT id FROM "%s_messages" LIMIT 1; + ` + q := fmt.Sprintf(tmpl, defaultPrefix) + + _, err := db.Exec(q) + g.TErrorNil(err) + + err = createTables(db, defaultPrefix) + g.TErrorIf(err) + + _, err = db.Exec(q) + g.TErrorIf(err) + }) + + g.Testing("we can do it multiple times", func() { + g.TErrorIf(g.SomeError( + createTables(db, defaultPrefix), + createTables(db, defaultPrefix), + createTables(db, defaultPrefix), + )) + }) +} + +func test_takeStmt() { + g.TestStart("takeStmt()") + + const ( + topic = "take() topic" + consumer = "take() consumer" + prefix = defaultPrefix + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + g.TErrorIf(takeErr) + defer g.SomeFnError( + takeClose, + db.Close, + ) + + const tmpl = ` + SELECT owner_id from "%s_owners" + WHERE + topic = ? AND + consumer = ?; + ` + sqlOwner := fmt.Sprintf(tmpl, prefix) + + + g.Testing("when there is no owner, we become it", func() { + var ownerID int + err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) + g.TAssertEqual(err, sql.ErrNoRows) + + err = take(topic, consumer) + g.TErrorIf(err) + + err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) + g.TErrorIf(err) + g.TAssertEqual(ownerID, instanceID) + }) + + g.Testing("if there is already an owner, we overtake it", func() { + otherID := instanceID + 1 + + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + var ownerID int + err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) + g.TErrorIf(err) + g.TAssertEqual(ownerID, instanceID) + + err = take(topic, consumer) + g.TErrorIf(err) + + err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) + g.TErrorIf(err) + g.TAssertEqual(ownerID, otherID) + }) + g.Testing("no error if closed more than once", func() { + g.TErrorIf(g.SomeError( + takeClose(), + takeClose(), + takeClose(), + )) + }) +} + +func test_publishStmt() { + g.TestStart("publishStmt()") + + const ( + topic = "publish() topic" + payloadStr = "publish() payload" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + publishErr, + )) + defer g.SomeFnError( + publishClose, + db.Close, + ) + + + g.Testing("we can publish a message", func() { + messageID := guuid.New() + message, err := publish(unsent, messageID) + g.TErrorIf(err) + + g.TAssertEqual(message.id, int64(1)) + g.TAssertEqual(message.uuid, messageID) + g.TAssertEqual(message.topic, topic) + g.TAssertEqual(message.flowID, flowID) + g.TAssertEqual(message.payload, payload) + }) + + g.Testing("we can publish the same message repeatedly", func() { + messageID1 := guuid.New() + messageID2 := guuid.New() + message1, err1 := publish(unsent, messageID1) + message2, err2 := publish(unsent, messageID2) + g.TErrorIf(g.SomeError(err1, err2)) + + g.TAssertEqual(message1.id, message2.id - 1) + g.TAssertEqual(message1.topic, message2.topic) + g.TAssertEqual(message1.flowID, message2.flowID) + g.TAssertEqual(message1.payload, message2.payload) + + g.TAssertEqual(message1.uuid, messageID1) + g.TAssertEqual(message2.uuid, messageID2) + }) + + g.Testing("publishing a message with the same UUID errors", func() { + messageID := guuid.New() + message1, err1 := publish(unsent, messageID) + _, err2 := publish(unsent, messageID) + g.TErrorIf(err1) + + g.TAssertEqual(message1.uuid, messageID) + g.TAssertEqual(message1.topic, topic) + g.TAssertEqual(message1.flowID, flowID) + g.TAssertEqual(message1.payload, payload) + + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + publishClose(), + publishClose(), + publishClose(), + )) + }) +} + +func test_findStmt() { + g.TestStart("findStmt()") + + const ( + topic = "find() topic" + payloadStr = "find() payload" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + find, findClose, findErr := findStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + publishErr, + findErr, + )) + defer g.SomeFnError( + publishClose, + findClose, + db.Close, + ) + + pub := func(flowID guuid.UUID) guuid.UUID { + unsentWithFlowID := unsent + unsentWithFlowID.FlowID = flowID + messageID := guuid.New() + _, err := publish(unsentWithFlowID, messageID) + g.TErrorIf(err) + return messageID + } + + + g.Testing("we can find a message by topic and flowID", func() { + flowID := guuid.New() + messageID := pub(flowID) + message, err := find(topic, flowID) + g.TErrorIf(err) + + g.TAssertEqual(message.uuid, messageID) + g.TAssertEqual(message.topic, topic) + g.TAssertEqual(message.flowID, flowID) + g.TAssertEqual(message.payload, payload) + }) + + g.Testing("a non-existent message gives us an error", func() { + message, err := find(topic, guuid.New()) + g.TAssertEqual(message, messageT{}) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("findig twice yields the exact same message", func() { + flowID := guuid.New() + messageID := pub(flowID) + message1, err1 := find(topic, flowID) + message2, err2 := find(topic, flowID) + g.TErrorIf(g.SomeError(err1, err2)) + + g.TAssertEqual(message1.uuid, messageID) + g.TAssertEqual(message1, message2) + }) + + g.Testing("returns the latest entry if multiple are available", func() { + flowID := guuid.New() + + _ , err0 := find(topic, flowID) + pub(flowID) + message1, err1 := find(topic, flowID) + pub(flowID) + message2, err2 := find(topic, flowID) + + g.TAssertEqual(err0, sql.ErrNoRows) + g.TErrorIf(g.SomeError(err1, err2)) + g.TAssertEqual(message1.uuid == message2.uuid, false) + g.TAssertEqual(message1.id < message2.id, true) + }) + + g.Testing("no error if closed more than once", func() { + g.TErrorIf(g.SomeError( + findClose(), + findClose(), + findClose(), + )) + }) +} + +func test_nextStmt() { + g.TestStart("nextStmt()") + + const ( + topic = "next() topic" + payloadStr = "next() payload" + consumer = "next() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + next, nextClose, nextErr := nextStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + nextErr, + commitErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + nextClose, + commitClose, + db.Close, + ) + + pub := func(topic string) messageT { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message + } + + + g.Testing("we get an error on empty topic", func() { + _, err := next(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("we don't get messages from other topics", func() { + pub("other topic") + _, err := next(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("we can get the next message", func() { + expectedMessage := pub(topic) + pub(topic) + pub(topic) + message, err := next(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(message, expectedMessage) + }) + + g.Testing("we keep getting the next until we commit", func() { + message1, err1 := next(topic, consumer) + message2, err2 := next(topic, consumer) + g.TErrorIf(commit(consumer, message1.uuid)) + message3, err3 := next(topic, consumer) + g.TErrorIf(g.SomeError(err1, err2, err3)) + + g.TAssertEqual(message1, message2) + g.TAssertEqual(message2.uuid != message3.uuid, true) + }) + + g.Testing("each consumer has its own next message", func() { + g.TErrorIf(take(topic, "other consumer")) + message1, err1 := next(topic, consumer) + message2, err2 := next(topic, "other consumer") + g.TErrorIf(g.SomeError(err1, err2)) + g.TAssertEqual(message1.uuid != message2.uuid, true) + }) + + g.Testing("error when we're not the owner", func() { + otherID := instanceID + 1 + + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + _, err := next(topic, consumer) + g.TErrorIf(err) + + err = take(topic, consumer) + g.TErrorIf(err) + + _, err = next(topic, consumer) + g.TAssertEqual(err, fmt.Errorf( + notOwnerErrorFmt, + otherID, + topic, + consumer, + instanceID, + )) + }) + + g.Testing("we can close more than once", func() { + g.TErrorIf(g.SomeError( + nextClose(), + nextClose(), + nextClose(), + )) + }) +} + +func test_messageEach() { + g.TestStart("messageEach()") + + const ( + topic = "messageEach() topic" + payloadStr = "messageEach() payload" + consumer = "messageEach() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + pendingErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + pendingClose, + db.Close, + ) + + pub := func() guuid.UUID { + message, err := publish(unsent, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + g.TErrorIf(take(topic, consumer)) + + + g.Testing("not called on empty set", func() { + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + messageEach(rows, func(messageT) error { + g.Unreachable() + return nil + }) + }) + + g.Testing("the callback is called once for each entry", func() { + messageIDs := []guuid.UUID{ + pub(), + pub(), + pub(), + } + + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + var collectedIDs []guuid.UUID + err = messageEach(rows, func(message messageT) error { + collectedIDs = append(collectedIDs, message.uuid) + return nil + }) + g.TErrorIf(err) + + g.TAssertEqual(collectedIDs, messageIDs) + }) + + g.Testing("we halt if the timestamp is ill-formatted", func() { + messageID := pub() + message_id_bytes := messageID[:] + pub() + pub() + pub() + + const tmplUpdate = ` + UPDATE "%s_messages" + SET timestamp = '01/01/1970' + WHERE uuid = ?; + ` + sqlUpdate := fmt.Sprintf(tmplUpdate, prefix) + _, err := db.Exec(sqlUpdate, message_id_bytes) + g.TErrorIf(err) + + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + n := 0 + err = messageEach(rows, func(messageT) error { + n++ + return nil + }) + + g.TAssertEqual( + err, + &time.ParseError{ + Layout: time.RFC3339Nano, + Value: "01/01/1970", + LayoutElem: "2006", + ValueElem: "01/01/1970", + Message: "", + }, + ) + g.TAssertEqual(n, 3) + + const tmplDelete = ` + DELETE FROM "%s_messages" + WHERE uuid = ?; + ` + sqlDelete := fmt.Sprintf(tmplDelete, prefix) + _, err = db.Exec(sqlDelete, message_id_bytes) + g.TErrorIf(err) + }) + + g.Testing("we halt if the callback returns an error", func() { + myErr := errors.New("callback error early return") + + rows1, err1 := pending(topic, consumer) + g.TErrorIf(err1) + + n1 := 0 + err1 = messageEach(rows1, func(messageT) error { + n1++ + if n1 == 4 { + return myErr + } + return nil + }) + + rows2, err2 := pending(topic, consumer) + g.TErrorIf(err2) + + n2 := 0 + err2 = messageEach(rows2, func(messageT) error { + n2++ + return nil + }) + + g.TAssertEqual(err1, myErr) + g.TErrorIf(err2) + g.TAssertEqual(n1, 4) + g.TAssertEqual(n2, 6) + }) + + g.Testing("noop when given nil for *sql.Rows", func() { + err := messageEach(nil, func(messageT) error { + g.Unreachable() + return nil + }) + g.TErrorIf(err) + }) +} + +func test_pendingStmt() { + g.TestStart("pendingStmt()") + + const ( + topic = "pending() topic" + payloadStr = "pending() payload" + consumer = "pending() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + pendingErr, + commitErr, + toDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + pendingClose, + commitClose, + toDeadClose, + db.Close, + ) + + pub := func(topic string) messageT { + g.TErrorIf(take(topic, consumer)) + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message + } + g.TErrorIf(take(topic, consumer)) + + collectPending := func(topic string, consumer string) []messageT { + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + var messages []messageT + err = messageEach(rows, func(message messageT) error { + messages = append(messages, message) + return nil + }) + g.TErrorIf(err) + return messages + } + + + g.Testing("an empty database has 0 pending items", func() { + g.TAssertEqual(len(collectPending(topic, consumer)), 0) + }) + + g.Testing("after publishing we get all messages", func() { + expected := []messageT{ + pub(topic), + pub(topic), + } + + g.TAssertEqualI(collectPending(topic, consumer), expected) + }) + + g.Testing("we get the same messages when calling again", func() { + messages1 := collectPending(topic, consumer) + messages2 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + g.TAssertEqualI(messages1, messages2) + }) + + g.Testing("we don't get messages from other topics", func() { + pub("other topic") + + g.TAssertEqual(len(collectPending(topic, consumer)), 2) + g.TAssertEqual(len(collectPending("other topic", consumer)), 1) + }) + + g.Testing("after others commit, pending still returns them", func() { + g.TErrorIf(take(topic, "other consumer")) + messages1 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + g.TErrorIf( + commit("other consumer", messages1[0].uuid), + ) + + messages2 := collectPending(topic, consumer) + g.TAssertEqualI(messages1, messages2) + }) + + g.Testing("committing other topic doesn't change current", func() { + messages1 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + + message := pub("other topic") + + g.TErrorIf(commit(consumer, message.uuid)) + + messages2 := collectPending(topic, consumer) + g.TAssertEqualI(messages1, messages2) + }) + + g.Testing("after commiting, pending doesn't return them again", func() { + messages1 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + + g.TErrorIf(commit(consumer, messages1[0].uuid)) + + messages2 := collectPending(topic, consumer) + g.TAssertEqual(len(messages2), 1) + g.TAssertEqual(messages2[0], messages1[1]) + + g.TErrorIf(commit(consumer, messages1[1].uuid)) + + messages3 := collectPending(topic, consumer) + g.TAssertEqual(len(messages3), 0) + }) + + g.Testing("on deadletter, pending also doesn't return them", func() { + messages0 := collectPending(topic, consumer) + g.TAssertEqual(len(messages0), 0) + + message1 := pub(topic) + message2 := pub(topic) + + messages1 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + + err = toDead(consumer, message1.uuid, guuid.New()) + g.TErrorIf(err) + + messages2 := collectPending(topic, consumer) + g.TAssertEqual(len(messages2), 1) + g.TAssertEqual(messages2[0], message2) + + err = toDead(consumer, message2.uuid, guuid.New()) + g.TErrorIf(err) + + messages3 := collectPending(topic, consumer) + g.TAssertEqual(len(messages3), 0) + }) + + g.Testing("if commits are unordered, pending is still sorted", func() { + message1 := pub(topic) + message2 := pub(topic) + message3 := pub(topic) + + g.TAssertEqual(collectPending(topic, consumer), []messageT{ + message1, + message2, + message3, + }) + + g.TErrorIf(commit(consumer, message2.uuid)) + g.TAssertEqual(collectPending(topic, consumer), []messageT{ + message1, + message3, + }) + + g.TErrorIf(commit(consumer, message1.uuid)) + g.TAssertEqual(collectPending(topic, consumer), []messageT{ + message3, + }) + + g.TErrorIf(commit(consumer, message3.uuid)) + g.TAssertEqual(len(collectPending(topic, consumer)), 0) + }) + + g.Testing("when we're not the owners we get nothing", func() { + otherID := instanceID + 1 + + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + message1 := pub(topic) + message2 := pub(topic) + message3 := pub(topic) + message4 := pub(topic) + message5 := pub(topic) + + expected := []messageT{ + message1, + message2, + message3, + message4, + message5, + } + + g.TAssertEqual(collectPending(topic, consumer), expected) + + err := take(topic, consumer) + g.TErrorIf(err) + + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + err = messageEach(rows, func(messageT) error { + g.Unreachable() + return nil + }) + g.TErrorIf(err) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + pendingClose(), + pendingClose(), + pendingClose(), + )) + }) +} + +func test_commitStmt() { + g.TestStart("commitStmt()") + + const ( + topic = "commit() topic" + payloadStr = "commit() payload" + consumer = "commit() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + commitErr, + toDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + commitClose, + toDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + cmt := func(consumer string, messageID guuid.UUID) error { + g.TErrorIf(take(topic, consumer)) + + return commit(consumer, messageID) + } + + + g.Testing("we can't commit twice", func() { + messageID := pub(topic) + + err1 := cmt(consumer, messageID) + err2 := cmt(consumer, messageID) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("we can't commit non-existent messages", func() { + err := cmt(consumer, guuid.New()) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("multiple consumers may commit a message", func() { + messageID := pub(topic) + + g.TErrorIf(g.SomeError( + cmt(consumer, messageID), + cmt("other consumer", messageID), + cmt("yet another consumer", messageID), + )) + }) + + g.Testing("a consumer can commit to multiple topics", func() { + messageID1 := pub(topic) + messageID2 := pub("other topic") + messageID3 := pub("yet another topic") + + g.TErrorIf(g.SomeError( + cmt(consumer, messageID1), + cmt(consumer, messageID2), + cmt(consumer, messageID3), + )) + }) + + g.Testing("a consumer can consume many messages from a topic", func() { + messageID1 := pub(topic) + messageID2 := pub(topic) + messageID3 := pub(topic) + + g.TErrorIf(g.SomeError( + cmt(consumer, messageID1), + cmt(consumer, messageID2), + cmt(consumer, messageID3), + )) + }) + + g.Testing("we can't commit a dead message", func() { + messageID := pub(topic) + + err1 := toDead(consumer, messageID, guuid.New()) + err2 := cmt(consumer, messageID) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("error if we don't own the topic/consumer", func() { + otherID := instanceID + 1 + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + messageID := pub(topic) + + err := take(topic, consumer) + g.TErrorIf(err) + + err = commit(consumer, messageID) + g.TAssertEqual(err, fmt.Errorf( + noLongerOwnerErrorFmt, + instanceID, + topic, + consumer, + otherID, + )) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + commitClose(), + commitClose(), + commitClose(), + )) + }) +} + +func test_toDeadStmt() { + g.TestStart("toDeadStmt()") + + const ( + topic = "toDead() topic" + payloadStr = "toDead() payload" + consumer = "toDead() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + commitErr, + toDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + commitClose, + toDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + asDead := func( + consumer string, + messageID guuid.UUID, + deadletterID guuid.UUID, + ) error { + g.TErrorIf(take(topic, consumer)) + return toDead(consumer, messageID, deadletterID) + } + + + g.Testing("we can't mark as dead twice", func() { + messageID := pub(topic) + + err1 := asDead(consumer, messageID, guuid.New()) + err2 := asDead(consumer, messageID, guuid.New()) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("we can't reuse a deadletter id", func() { + messageID1 := pub(topic) + messageID2 := pub(topic) + deadletterID := guuid.New() + + err1 := asDead(consumer, messageID1, deadletterID) + err2 := asDead(consumer, messageID2, deadletterID) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + + }) + + g.Testing("we can't mark as dead non-existent messages", func() { + err := asDead(consumer, guuid.New(), guuid.New()) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("multiple consumers may mark a message as dead", func() { + messageID := pub(topic) + + g.TErrorIf(g.SomeError( + asDead(consumer, messageID, guuid.New()), + asDead("another consumer", messageID, guuid.New()), + asDead("yet another consumer", messageID, guuid.New()), + )) + }) + + g.Testing("a consumer can mark as dead in multiple topics", func() { + messageID1 := pub(topic) + messageID2 := pub("other topic") + messageID3 := pub("yet other topic") + + g.TErrorIf(g.SomeError( + asDead(consumer, messageID1, guuid.New()), + asDead(consumer, messageID2, guuid.New()), + asDead(consumer, messageID3, guuid.New()), + )) + }) + + g.Testing("a consumer can produce many deadletters in a topic", func() { + messageID1 := pub(topic) + messageID2 := pub(topic) + messageID3 := pub(topic) + + g.TErrorIf(g.SomeError( + asDead(consumer, messageID1, guuid.New()), + asDead(consumer, messageID2, guuid.New()), + asDead(consumer, messageID3, guuid.New()), + )) + }) + + g.Testing("a consumer can intercalate commits and deadletters", func() { + messageID1 := pub(topic) + messageID2 := pub(topic) + messageID3 := pub(topic) + messageID4 := pub(topic) + messageID5 := pub(topic) + + g.TErrorIf(g.SomeError( + asDead(consumer, messageID1, guuid.New()), + commit(consumer, messageID2), + commit(consumer, messageID3), + asDead(consumer, messageID4, guuid.New()), + commit(consumer, messageID5), + )) + }) + + g.Testing("we can't mark a committed message as dead", func() { + messageID := pub(topic) + + err1 := commit(consumer, messageID) + err2 := asDead(consumer, messageID, guuid.New()) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("error if we don't own the message's consumer/topic", func() { + otherID := instanceID + 1 + messageID1 := pub(topic) + messageID2 := pub(topic) + + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + err := toDead(consumer, messageID1, guuid.New()) + g.TErrorIf(err) + + err = take(topic, consumer) + g.TErrorIf(err) + + err = toDead(consumer, messageID2, guuid.New()) + g.TAssertEqual(err, fmt.Errorf( + noLongerOwnerErrorFmt, + instanceID, + topic, + consumer, + otherID, + )) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + toDeadClose(), + toDeadClose(), + toDeadClose(), + )) + }) +} + +func test_replayStmt() { + g.TestStart("replayStmt()") + + const ( + topic = "replay() topic" + payloadStr = "replay() payload" + consumer = "replay() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + replayErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + replayClose, + db.Close, + ) + + pub := func() messageT { + message, err := publish(unsent, guuid.New()) + g.TErrorIf(err) + return message + } + g.TErrorIf(take(topic, consumer)) + + + g.Testing("we can replay a message", func() { + message := pub() + deadletterID := guuid.New() + replayedID := guuid.New() + + err1 := toDead(consumer, message.uuid, deadletterID) + replayed, err2 := replay(deadletterID, replayedID) + g.TErrorIf(g.SomeError(err1, err2)) + + g.TAssertEqual(replayed.uuid, replayedID) + g.TAssertEqual(replayed.id == message.id, false) + g.TAssertEqual(replayed.uuid == message.uuid, false) + }) + + g.Testing("a replayed message keeps its payload", func() { + message := pub() + deadletterID := guuid.New() + err := toDead(consumer, message.uuid, deadletterID) + g.TErrorIf(err) + + replayed, err := replay(deadletterID, guuid.New()) + g.TErrorIf(err) + g.TAssertEqual(message.flowID, replayed.flowID) + g.TAssertEqual(message.payload, replayed.payload) + }) + + g.Testing("we can't replay a dead message twice", func() { + message := pub() + deadletterID := guuid.New() + + err := toDead(consumer, message.uuid, deadletterID) + g.TErrorIf(err) + + _, err1 := replay(deadletterID, guuid.New()) + _, err2 := replay(deadletterID, guuid.New()) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("we cant replay non-existent messages", func() { + _, err := replay(guuid.New(), guuid.New()) + g.TAssertEqual( + err.(acudego.Error).ExtendedCode, + acudego.ErrConstraintNotNull, + ) + }) + + g.Testing("messages can die and then be replayed many times", func() { + message := pub() + deadletterID1 := guuid.New() + deadletterID2 := guuid.New() + + err := toDead(consumer, message.uuid, deadletterID1) + g.TErrorIf(err) + + replayed1, err := replay(deadletterID1, guuid.New()) + g.TErrorIf(err) + + err = toDead(consumer, replayed1.uuid, deadletterID2) + g.TErrorIf(err) + + replayed2, err := replay(deadletterID2, guuid.New()) + g.TErrorIf(err) + + g.TAssertEqual(message.flowID, replayed1.flowID) + g.TAssertEqual(replayed1.flowID, replayed2.flowID) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + replayClose(), + replayClose(), + replayClose(), + )) + }) +} + +func test_oneDeadStmt() { + g.TestStart("oneDeadStmt()") + + const ( + topic = "oneDead() topic" + payloadStr = "oneDead() payload" + consumer = "oneDead() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + replayErr, + oneDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + replayClose, + oneDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + + g.Testing("error on missing deadletters", func() { + _, err := oneDead(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("deadletters on other topics don't show for us", func() { + err := toDead(consumer, pub("other topic"), guuid.New()) + g.TErrorIf(err) + + _, err = oneDead(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("deadletters for other consumers don't show for use", func() { + g.TErrorIf(take(topic, "other consumer")) + err := toDead("other consumer", pub(topic), guuid.New()) + g.TErrorIf(err) + + _, err = oneDead(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("after being replayed deadletters aren't returned", func() { + messageID1 := guuid.New() + messageID2 := guuid.New() + messageID3 := guuid.New() + + err1 := toDead(consumer, pub(topic), messageID1) + err2 := toDead(consumer, pub(topic), messageID2) + err3 := toDead(consumer, pub(topic), messageID3) + g.TErrorIf(g.SomeError(err1, err2, err3)) + + deadletter, err := oneDead(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(deadletter.uuid, messageID1) + + _, err = replay(messageID2, guuid.New()) + g.TErrorIf(err) + + deadletter, err = oneDead(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(deadletter.uuid, messageID1) + + _, err = replay(messageID1, guuid.New()) + g.TErrorIf(err) + + deadletter, err = oneDead(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(deadletter.uuid, messageID3) + + _, err = replay(messageID3, guuid.New()) + g.TErrorIf(err) + + _, err = oneDead(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) +} + +func test_deadletterEach() { + g.TestStart("deadletterEach") + + const ( + topic = "deadletterEach() topic" + payloadStr = "deadletterEach() payload" + consumer = "deadletterEach() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + allDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + allDeadClose, + db.Close, + ) + + pub := func() guuid.UUID { + message, err := publish(unsent, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + dead := func(messageID guuid.UUID) guuid.UUID { + deadletterID := guuid.New() + err := toDead(consumer, messageID, deadletterID) + g.TErrorIf(err) + + return deadletterID + } + g.TErrorIf(take(topic, consumer)) + + + g.Testing("not called on empty set", func() { + rows, err := allDead(topic, consumer) + g.TErrorIf(err) + + n := 0 + deadletterEach(rows, func(deadletterT, messageT) error { + n++ + return nil + }) + }) + + g.Testing("the callback is called once for each entry", func() { + expected := []guuid.UUID{ + dead(pub()), + dead(pub()), + dead(pub()), + } + + rows, err := allDead(topic, consumer) + g.TErrorIf(err) + + var deadletterIDs []guuid.UUID + deadletterEach(rows, func( + deadletter deadletterT, + _ messageT, + ) error { + deadletterIDs = append(deadletterIDs, deadletter.uuid) + return nil + }) + + g.TAssertEqual(deadletterIDs, expected) + }) + + g.Testing("we halt if the timestamp is ill-formatted", func() { + messageID := pub() + message_id_bytes := messageID[:] + dead(messageID) + dead(pub()) + dead(pub()) + dead(pub()) + dead(pub()) + + const tmplUpdate = ` + UPDATE "%s_offsets" + SET timestamp = '01-01-1970' + WHERE message_id IN ( + SELECT id FROM "%s_messages" WHERE uuid = ? + ); + ` + sqlUpdate := fmt.Sprintf(tmplUpdate, prefix, prefix) + _, err := db.Exec(sqlUpdate, message_id_bytes) + g.TErrorIf(err) + + rows, err := allDead(topic, consumer) + g.TErrorIf(err) + + n := 0 + err = deadletterEach(rows, func(deadletterT, messageT) error { + n++ + return nil + }) + + g.TAssertEqual( + err, + &time.ParseError{ + Layout: time.RFC3339Nano, + Value: "01-01-1970", + LayoutElem: "2006", + ValueElem: "01-01-1970", + Message: "", + }, + ) + g.TAssertEqual(n, 3) + + const tmplDelete = ` + DELETE FROM "%s_offsets" + WHERE message_id IN ( + SELECT id FROM "%s_messages" WHERE uuid = ? + ); + ` + sqlDelete := fmt.Sprintf(tmplDelete, prefix, prefix) + _, err = db.Exec(sqlDelete, message_id_bytes) + g.TErrorIf(err) + }) + + g.Testing("we halt if the callback returns an error", func() { + myErr := errors.New("early return error") + + rows1, err1 := allDead(topic, consumer) + g.TErrorIf(err1) + + n1 := 0 + err1 = deadletterEach(rows1, func(deadletterT, messageT) error { + n1++ + if n1 == 1 { + return myErr + } + return nil + }) + + rows2, err2 := allDead(topic, consumer) + g.TErrorIf(err2) + + n2 := 0 + err = deadletterEach(rows2, func(deadletterT, messageT) error { + n2++ + return nil + }) + + g.TAssertEqual(err1, myErr) + g.TErrorIf(err2) + g.TAssertEqual(n1, 1) + g.TAssertEqual(n2, 7) + }) +} + +func test_allDeadStmt() { + g.TestStart("allDeadStmt()") + + const ( + topic = "allDead() topic" + payloadStr = "allDead() payload" + consumer = "allDead() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + replayErr, + allDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + replayClose, + allDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + collectAll := func( + topic string, + consumer string, + ) ([]deadletterT, []messageT) { + var ( + deadletters []deadletterT + messages []messageT + ) + eachFn := func( + deadletter deadletterT, + message messageT, + ) error { + deadletters = append(deadletters, deadletter) + messages = append(messages, message) + return nil + } + + rows, err := allDead(topic, consumer) + g.TErrorIf(err) + + err = deadletterEach(rows, eachFn) + g.TErrorIf(err) + + return deadletters, messages + } + + + g.Testing("no entry on empty deadletters", func() { + deadletterIDs, _ := collectAll(topic, consumer) + g.TAssertEqual(len(deadletterIDs), 0) + }) + + g.Testing("deadletters on other topics don't show up", func() { + err := toDead(consumer, pub("other topic"), guuid.New()) + g.TErrorIf(err) + + deadletters, _ := collectAll(topic, consumer) + g.TAssertEqual(len(deadletters), 0) + }) + + g.Testing("deadletters of other consumers don't show up", func() { + g.TErrorIf(take(topic, "other consumer")) + err := toDead("other consumer", pub(topic), guuid.New()) + g.TErrorIf(err) + + deadletterIDs, _ := collectAll(topic, consumer) + g.TAssertEqual(len(deadletterIDs), 0) + }) + + g.Testing("deadletters are given in order", func() { + deadletterIDs := []guuid.UUID{ + guuid.New(), + guuid.New(), + guuid.New(), + } + messageIDs := []guuid.UUID{ + pub(topic), + pub(topic), + pub(topic), + } + + err1 := toDead(consumer, messageIDs[0], deadletterIDs[0]) + err2 := toDead(consumer, messageIDs[1], deadletterIDs[1]) + err3 := toDead(consumer, messageIDs[2], deadletterIDs[2]) + g.TErrorIf(g.SomeError(err1, err2, err3)) + + deadletters, messages := collectAll(topic, consumer) + g.TAssertEqual(deadletters[0].uuid, deadletterIDs[0]) + g.TAssertEqual(deadletters[1].uuid, deadletterIDs[1]) + g.TAssertEqual(deadletters[2].uuid, deadletterIDs[2]) + g.TAssertEqual( messages[0].uuid, messageIDs[0]) + g.TAssertEqual( messages[1].uuid, messageIDs[1]) + g.TAssertEqual( messages[2].uuid, messageIDs[2]) + g.TAssertEqual(len(deadletters), 3) + g.TAssertEqual(len(messages), 3) + }) + + g.Testing("after being replayed, they stop appearing", func() { + deadletters, _ := collectAll(topic, consumer) + g.TAssertEqual(len(deadletters), 3) + + _, err := replay(deadletters[0].uuid, guuid.New()) + g.TErrorIf(err) + collecteds, _ := collectAll(topic, consumer) + g.TAssertEqual(len(collecteds), 2) + + _, err = replay(deadletters[1].uuid, guuid.New()) + g.TErrorIf(err) + collecteds, _ = collectAll(topic, consumer) + g.TAssertEqual(len(collecteds), 1) + + _, err = replay(deadletters[2].uuid, guuid.New()) + g.TErrorIf(err) + collecteds, _ = collectAll(topic, consumer) + g.TAssertEqual(len(collecteds), 0) + }) +} + +func test_sizeStmt() { + g.TestStart("sizeStmt()") + + const ( + topic = "size() topic" + payloadStr = "size() payload" + consumer = "size() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID) + size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + replayErr, + oneDeadErr, + sizeErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + replayClose, + sizeClose, + oneDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + + g.Testing("0 on empty topic", func() { + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("other topics don't fall into our count", func() { + pub("other topic") + + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("otherwise we just get the sum", func() { + pub(topic) + pub(topic) + pub(topic) + pub(topic) + pub(topic) + + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 5) + }) + + g.Testing("deadletters aren't taken into account", func() { + sixthMessageID := pub(topic) + err := toDead(consumer, sixthMessageID, guuid.New()) + g.TErrorIf(err) + + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 6) + }) + + g.Testing("after replay, deadletters increases the size", func() { + deadletter, err := oneDead(topic, consumer) + g.TErrorIf(err) + + _, err = replay(deadletter.uuid, guuid.New()) + g.TErrorIf(err) + + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 7) + }) +} + +func test_countStmt() { + g.TestStart("countStmt()") + + const ( + topic = "count() topic" + payloadStr = "count() payload" + consumer = "count() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + next, nextClose, nextErr := nextStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + count, countClose, countErr := countStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + nextErr, + commitErr, + toDeadErr, + countErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + nextClose, + commitClose, + toDeadClose, + countClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + + g.Testing("0 on empty topic", func() { + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("other topics don't add to our count", func() { + err := commit(consumer, pub("other topic")) + g.TErrorIf(err) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("other consumers don't influence our count", func() { + g.TErrorIf(take(topic, "other consumer")) + err := commit("other consumer", pub(topic)) + g.TErrorIf(err) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("unconsumed messages don't count", func() { + pub(topic) + pub(topic) + pub(topic) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("consumed messages do count", func() { + message, err := next(topic, consumer) + g.TErrorIf(err) + err = commit(consumer, message.uuid) + g.TErrorIf(err) + + message, err = next(topic, consumer) + g.TErrorIf(err) + err = commit(consumer, message.uuid) + g.TErrorIf(err) + + message, err = next(topic, consumer) + g.TErrorIf(err) + err = commit(consumer, message.uuid) + g.TErrorIf(err) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 3) + }) + + g.Testing("deadletters count as consumed", func() { + message, err := next(topic, consumer) + g.TErrorIf(err) + + err = toDead(consumer, message.uuid, guuid.New()) + g.TErrorIf(err) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 4) + }) +} + +func test_hasDataStmt() { + g.TestStart("hasDataStmt()") + + const ( + topic = "hasData() topic" + payloadStr = "hasData() payload" + consumer = "hasData() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + next, nextClose, nextErr := nextStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + nextErr, + commitErr, + toDeadErr, + hasDataErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + nextClose, + commitClose, + toDeadClose, + hasDataClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + + g.Testing("false on empty topic", func() { + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, false) + }) + + g.Testing("other topics don't change the response", func() { + pub("other topic") + + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, false) + }) + + g.Testing("published messages flip the flag", func() { + pub(topic) + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, true) + }) + + g.Testing("other consumers don't influence us", func() { + g.TErrorIf(take(topic, "other consumer")) + message, err := next(topic, "other consumer") + g.TErrorIf(err) + + err = commit("other consumer", message.uuid) + g.TErrorIf(err) + + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, true) + }) + + g.Testing("consuming messages unflips the result", func() { + message, err := next(topic, consumer) + g.TErrorIf(err) + + err = commit(consumer, message.uuid) + g.TErrorIf(err) + + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, false) + }) + + g.Testing("same for deadletters", func() { + has0, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has0, false) + + messageID1 := pub(topic) + messageID2 := pub(topic) + + has1, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has1, true) + + err = toDead(consumer, messageID1, guuid.New()) + g.TErrorIf(err) + err = commit(consumer, messageID2) + g.TErrorIf(err) + + has2, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has2, false) + }) +} + +func test_initDB() { + g.TestStart("initDB()") + + const ( + topic = "initDB() topic" + payloadStr = "initDB() payload" + consumer = "initDB() consumer" + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + var messages []messageT + notifyFn := func(message messageT) { + messages = append(messages, message) + } + + instanceID := os.Getpid() + queries, err := initDB(db, defaultPrefix, notifyFn, instanceID) + g.TErrorIf(err) + defer queries.close() + + g.TErrorIf(queries.take(topic, consumer)) + + + g.Testing("we can perform all the wrapped operations", func() { + messageID := guuid.New() + newMessageID := guuid.New() + deadletterID := guuid.New() + + messageV1, err := queries.publish(unsent, messageID) + g.TErrorIf(err) + + messageV2, err := queries.next(topic, consumer) + g.TErrorIf(err) + + var messagesV3 []messageT + pendingFn := func(message messageT) error { + messagesV3 = append(messagesV3, message) + return nil + } + err = queries.pending(topic, consumer, pendingFn) + g.TErrorIf(err) + g.TAssertEqual(len(messagesV3), 1) + messageV3 := messagesV3[0] + + err = queries.toDead(consumer, messageID, deadletterID) + g.TErrorIf(err) + + deadletterV1, err := queries.oneDead(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(deadletterV1.uuid, deadletterID) + + var ( + deadlettersV2 []deadletterT + messagesV4 []messageT + ) + deadletterFn := func( + deadletter deadletterT, + message messageT, + ) error { + deadlettersV2 = append(deadlettersV2, deadletter) + messagesV4 = append(messagesV4, message) + return nil + } + err = queries.allDead(topic, consumer, deadletterFn) + g.TErrorIf(err) + g.TAssertEqual(len(deadlettersV2), 1) + g.TAssertEqual(deadlettersV2[0].uuid, deadletterID) + g.TAssertEqual(len(messagesV4), 1) + messageV4 := messagesV4[0] + + g.TAssertEqual(messageV1, messageV2) + g.TAssertEqual(messageV1, messageV3) + g.TAssertEqual(messageV1, messageV4) + + newMessageV1, err := queries.replay(deadletterID, newMessageID) + g.TErrorIf(err) + + err = queries.commit(consumer, newMessageID) + g.TErrorIf(err) + + newMessageV0 := messageV1 + newMessageV0.id = newMessageV1.id + newMessageV0.uuid = newMessageID + newMessageV0.timestamp = newMessageV1.timestamp + g.TAssertEqual(newMessageV1, newMessageV0) + + size, err := queries.size(topic) + g.TErrorIf(err) + g.TAssertEqual(size, 2) + + count, err := queries.count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(count, 2) + + hasData, err := queries.hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(hasData, false) + }) +} + +func test_queriesTclose() { + g.TestStart("queriesT.close()") + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + instanceID := os.Getpid() + queries, err := initDB(db, defaultPrefix, func(messageT) {}, instanceID) + g.TErrorIf(err) + + + g.Testing("after closing, we can't run queries", func() { + unsent := UnsentMessage{ Payload: []byte{}, } + _, err := queries.publish(unsent, guuid.New()) + g.TErrorIf(err) + g.TErrorIf(db.Close()) + + err = queries.close() + g.TErrorIf(err) + + _, err = queries.publish(unsent, guuid.New()) + g.TAssertEqual( + err.Error(), + "sql: database is closed", + ) + }) + + g.Testing("closing mre than once does not error", func() { + g.TErrorIf(g.SomeError( + queries.close(), + queries.close(), + )) + }) +} + + +func test_newPinger() { + g.TestStart("newPinger()") + + g.Testing("onPing() on a closed pinger is a noop", func() { + pinger := newPinger[string]() + pinger.close() + pinger.onPing(func(s string) { + panic(s) + }) + pinger.tryPing("ignored") + }) + + g.Testing("onPing() on a closed pinger with data gets that", func() { + pinger := newPinger[string]() + pinger.tryPing("received") + pinger.tryPing("ignored") + g.TAssertEqual(pinger.closed(), false) + + pinger.close() + g.TAssertEqual(pinger.closed(), true) + + c := make(chan string) + count := 0 + go pinger.onPing(func(s string) { + if count > 0 { + panic(s) + } + count++ + + c <- s + }) + + given := <- c + g.TAssertEqual(given, "received") + }) + + g.Testing("when onPing is late, we loose messages", func() { + pinger := newPinger[string]() + pinger.tryPing("first seen") + pinger.tryPing("second dropped") + pinger.tryPing("third dropped") + + c := make(chan string) + go pinger.onPing(func(s string) { + c <- s + }) + s := <- c + + close(c) + pinger.close() + g.TAssertEqual(s, "first seen") + }) + + g.Testing("if onPing is on time, it may not loose", func() { + pinger := newPinger[string]() + pinger.tryPing("first seen") + + c := make(chan string) + go pinger.onPing(func(s string) { + c <- s + }) + + s1 := <- c + pinger.tryPing("second seen") + s2 := <- c + + close(c) + pinger.close() + g.TAssertEqual(s1, "first seen") + g.TAssertEqual(s2, "second seen") + }) + + g.Testing("if onPing takes too long, it still looses messages", func() { + pinger := newPinger[string]() + pinger.tryPing("first seen") + + c1 := make(chan string) + c2 := make(chan struct{}) + go pinger.onPing(func(s string) { + c1 <- s + c2 <- struct{}{} + }) + + s1 := <- c1 + pinger.tryPing("second seen") + pinger.tryPing("third dropped") + <- c2 + s2 := <- c1 + <- c2 + + close(c2) + close(c1) + pinger.close() + g.TAssertEqual(s1, "first seen") + g.TAssertEqual(s2, "second seen") + }) +} + +func test_makeSubscriptionsFunc() { + g.TestStart("makeSubscriptionsFunc()") + + g.Testing("we can have multiple readers", func() { + subscriptions := makeSubscriptionsFuncs() + + var ( + readStarted sync.WaitGroup + readFinished sync.WaitGroup + ) + c := make(chan struct{}) + readFn := func(subscriptionsSetM) error { + readStarted.Done() + <- c + return nil + } + addReader := func() { + readStarted.Add(1) + readFinished.Add(1) + go func() { + subscriptions.read(readFn) + readFinished.Done() + }() + } + + addReader() + addReader() + addReader() + readStarted.Wait() + c <- struct{}{} + c <- struct{}{} + c <- struct{}{} + readFinished.Wait() + }) + + g.Testing("writers are exclusive", func() { + subscriptions := makeSubscriptionsFuncs() + + var ( + readStarted sync.WaitGroup + readFinished sync.WaitGroup + writeWillStart sync.WaitGroup + writeFinished sync.WaitGroup + ) + c := make(chan string) + readFn := func(subscriptionsSetM) error { + readStarted.Done() + c <- "reader" + return nil + } + addReader := func() { + readStarted.Add(1) + readFinished.Add(1) + go func() { + subscriptions.read(readFn) + readFinished.Done() + }() + } + + writeFn := func(subscriptionsSetM) error { + c <- "writer" + return nil + } + addWriter := func() { + writeWillStart.Add(1) + writeFinished.Add(1) + go func() { + writeWillStart.Done() + subscriptions.write(writeFn) + writeFinished.Done() + }() + } + + addReader() + addReader() + addReader() + readStarted.Wait() + addWriter() + writeWillStart.Wait() + + g.TAssertEqual(<-c, "reader") + g.TAssertEqual(<-c, "reader") + g.TAssertEqual(<-c, "reader") + + readFinished.Wait() + g.TAssertEqual(<-c, "writer") + writeFinished.Wait() + }) +} + +func test_makeNotifyFn() { + g.TestStart("makeNotifyFn()") + + g.Testing("when topic is nil only top pinger gets pinged", func() { + pinger1 := newPinger[struct{}]() + pinger2 := newPinger[[]byte]() + defer pinger1.close() + defer pinger2.close() + + go pinger1.onPing(func(struct{}) { + panic("consumer pinger") + }) + go pinger2.onPing(func(payload []byte) { + panic("waiter pinger") + }) + + flowID := guuid.New() + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer-1": consumerT{ + pinger: pinger1, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter-1": waiterT{ + pinger: pinger2, + }, + }, + }, + }, + } + subsFn := func(fn func(subscriptionsSetM) error) error { + return fn(set) + } + topPinger := newPinger[struct{}]() + defer topPinger.close() + + var wg sync.WaitGroup + wg.Add(1) + go topPinger.onPing(func(struct{}) { + wg.Done() + }) + + notifyFn := makeNotifyFn(subsFn, topPinger) + + message := messageT{ + uuid: guuid.New(), + topic: "nobody is subscribed to this one", + payload: []byte("nobody with get this payload"), + } + notifyFn(message) + wg.Wait() + }) + + g.Testing("otherwise all interested subscribers get pinged", func() { + const topic = "the topic name" + + pinger1 := newPinger[struct{}]() + pinger2 := newPinger[[]byte]() + defer pinger1.close() + defer pinger2.close() + + var wg sync.WaitGroup + go pinger1.onPing(func(struct{}) { + wg.Done() + }) + go pinger2.onPing(func([]byte) { + wg.Done() + }) + wg.Add(2) + + flowID := guuid.New() + + set := subscriptionsSetM{ + topic: topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer-1": consumerT{ + pinger: pinger1, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter-1": waiterT{ + pinger: pinger2, + }, + }, + }, + }, + } + + subsFn := func(fn func(subscriptionsSetM) error) error { + return fn(set) + } + + topPinger := newPinger[struct{}]() + defer topPinger.close() + go topPinger.onPing(func(struct{}) { + wg.Done() + }) + wg.Add(1) + + notifyFn := makeNotifyFn(subsFn, topPinger) + + message := messageT{ + uuid: guuid.New(), + topic: topic, + flowID: flowID, + payload: []byte("ignored in this test"), + } + notifyFn(message) + wg.Wait() + }) +} + +func test_collectClosedWaiters() { + g.TestStart("collectClosedWaiter()") + + g.Testing("collects all the reports to be closed", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + flowID4 := guuid.New() + flowID5 := guuid.New() + + mkwaiter := func(closed bool) waiterT { + fn := func() bool { + return closed + } + return waiterT{ + closed: &fn, + } + } + + set := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": mkwaiter(false), + "waiter-2": mkwaiter(true), + "waiter-3": mkwaiter(true), + }, + flowID2: map[string]waiterT{ + "waiter-4": mkwaiter(true), + "waiter-5": mkwaiter(false), + }, + }, + }, + "topic-2": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID3: map[string]waiterT{ + "waiter-1": mkwaiter(false), + "waiter-2": mkwaiter(false), + }, + flowID4: map[string]waiterT{ + "waiter-3": mkwaiter(true), + "waiter-4": mkwaiter(true), + "waiter-5": mkwaiter(true), + "waiter-6": mkwaiter(true), + }, + }, + }, + "topic-3": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID5: map[string]waiterT{}, + }, + }, + "topic-4": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := map[string]map[guuid.UUID][]string{ + "topic-1": map[guuid.UUID][]string{ + flowID1: []string{ + "waiter-2", + "waiter-3", + }, + flowID2: []string{ + "waiter-4", + }, + }, + "topic-2": map[guuid.UUID][]string{ + flowID3: []string{}, + flowID4: []string{ + "waiter-3", + "waiter-4", + "waiter-5", + "waiter-6", + }, + }, + "topic-3": map[guuid.UUID][]string{ + flowID5: []string{}, + }, + "topic-4": map[guuid.UUID][]string{}, + } + + given := collectClosedWaiters(set) + + // sort names for equality + for _, waitersIndex := range given { + for _, names := range waitersIndex { + sort.Strings(names) + } + } + g.TAssertEqual(given, expected) + }) +} + +func test_trimEmptyLeaves() { + g.TestStart("trimEmptyLeaves()") + + g.Testing("noop on an empty index", func() { + input := map[string]map[guuid.UUID][]string{} + expected := map[string]map[guuid.UUID][]string{} + + trimEmptyLeaves(input) + g.TAssertEqual(input, expected) + }) + + g.Testing("simplifies tree when it can", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + flowID4 := guuid.New() + + input := map[string]map[guuid.UUID][]string{ + "topic-1": map[guuid.UUID][]string{ + flowID1: []string{ + "waiter-1", + }, + flowID2: []string{}, + }, + "topic-2": map[guuid.UUID][]string{ + flowID3: []string{}, + flowID4: []string{}, + }, + "topic-3": map[guuid.UUID][]string{}, + } + + expected := map[string]map[guuid.UUID][]string{ + "topic-1": map[guuid.UUID][]string{ + flowID1: []string{ + "waiter-1", + }, + }, + } + + trimEmptyLeaves(input) + g.TAssertEqual(input, expected) + }) + + g.Testing("fully prune tree if possible", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + flowID4 := guuid.New() + flowID5 := guuid.New() + + input := map[string]map[guuid.UUID][]string{ + "topic-1": map[guuid.UUID][]string{}, + "topic-2": map[guuid.UUID][]string{}, + "topic-3": map[guuid.UUID][]string{}, + "topic-4": map[guuid.UUID][]string{ + flowID1: []string{}, + }, + "topic-5": map[guuid.UUID][]string{ + flowID2: []string{}, + flowID3: []string{}, + flowID4: []string{}, + flowID5: []string{}, + }, + } + + expected := map[string]map[guuid.UUID][]string{} + + trimEmptyLeaves(input) + g.TAssertEqual(input, expected) + }) +} + +func test_deleteIfEmpty() { + g.TestStart("deleteIfEmpty()") + + g.Testing("noop if there are consumers", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + }, + } + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + }, + } + expected2 := subscriptionsSetM{} + + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected1) + + delete(set["topic"].consumers, "consumer") + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected2) + }) + + g.Testing("noop if there are waiters", func() { + flowID := guuid.New() + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: nil, + }, + }, + } + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: nil, + }, + }, + } + expected2 := subscriptionsSetM{} + + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected1) + + delete(set["topic"].waiters, flowID) + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected2) + }) + + g.Testing("otherwise deletes when empty", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{} + + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected) + }) + + g.Testing("unrelated topics are left untouched", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + deleteIfEmpty(set, "another-topic") + g.TAssertEqual(set, expected) + }) +} + +func test_deleteEmptyTopics() { + g.TestStart("deleteEmptyTopics()") + + g.Testing("cleans up all empty topics from the set", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + + set := subscriptionsSetM{ + "empty": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + "has-consumers": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + "has-waiters": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: nil, + }, + }, + "has-both": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID2: nil, + }, + }, + "has-neither": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{ + "has-consumers": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + "has-waiters": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: nil, + }, + }, + "has-both": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID2: nil, + }, + }, + } + + deleteEmptyTopics(set) + g.TAssertEqual(set, expected) + }) +} + +func test_removeClosedWaiter() { + g.TestStart("removeClosedWaiter()") + + g.Testing("removes from set all that we request", func() { + flowID0 := guuid.New() + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + + set := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": waiterT{}, + "waiter-2": waiterT{}, + }, + flowID2: map[string]waiterT{ + "waiter-3": waiterT{}, + "waiter-4": waiterT{}, + "waiter-5": waiterT{}, + }, + }, + }, + "topic-2": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID3: map[string]waiterT{ + "waiter-6": waiterT{}, + "waiter-7": waiterT{}, + "waiter-8": waiterT{}, + }, + }, + }, + "topic-3": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + input := map[string]map[guuid.UUID][]string{ + "topic-0": map[guuid.UUID][]string{ + flowID0: []string{ + "waiter-0", + }, + }, + "topic-1": map[guuid.UUID][]string{ + flowID1: []string{ + "waiter-2", + }, + flowID2: []string{ + "waiter-3", + "waiter-4", + "waiter-5", + }, + }, + "topic-2": map[guuid.UUID][]string{ + flowID3: []string{ + "waiter-6", + "waiter-7", + "waiter-8", + }, + }, + } + + expected := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": waiterT{}, + }, + }, + }, + } + + removeClosedWaiters(set, input) + g.TAssertEqual(set, expected) + }) + + g.Testing("empty flowIDs from input GET LEAKED", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + + input := map[string]map[guuid.UUID][]string{ + "topic-2": map[guuid.UUID][]string{ + flowID2: []string{ + "waiter", + }, + }, + } + + set := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{}, + }, + }, + "topic-2": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID2: map[string]waiterT{ + "waiter": waiterT{}, + }, + }, + }, + } + + expected := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{}, + }, + }, + } + + removeClosedWaiters(set, input) + g.TAssertEqual(set, expected) + }) +} + +func test_reapClosedWaiters() { + g.TestStart("reapClosedWaiters()") + + g.Testing("trimEmptyLeaves() prevents removal of empty flows", func() { + var ( + set subscriptionsSetM + readCount = 0 + writeCount = 0 + ) + readFn := func(fn func(subscriptionsSetM) error) error { + readCount++ + return fn(set) + } + writeFn := func(fn func(subscriptionsSetM) error) error { + writeCount++ + return fn(set) + } + + openFn := func() bool { + return false + } + closedFn := func() bool { + return true + } + open := waiterT{ closed: &openFn } + closed := waiterT{ closed: &closedFn } + flowID1 := guuid.New() + flowID2 := guuid.New() + + set = subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": open, + "waiter-2": open, + "waiter-3": open, + }, + flowID2: map[string]waiterT{ + "waiter-4": open, + }, + }, + }, + } + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": open, + "waiter-2": open, + "waiter-3": open, + }, + flowID2: map[string]waiterT{ + "waiter-4": open, + }, + }, + }, + } + + expected2 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-2": open, + "waiter-3": open, + }, + flowID2: map[string]waiterT{ + "waiter-4": open, + }, + }, + }, + } + + expected3 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-2": open, + "waiter-3": open, + }, + }, + }, + } + + reapClosedWaiters(readFn, writeFn) + g.TAssertEqual(readCount, 1) + g.TAssertEqual(writeCount, 0) + g.TAssertEqual(set, expected1) + + set["topic"].waiters[flowID1]["waiter-1"] = closed + reapClosedWaiters(readFn, writeFn) + g.TAssertEqual(readCount, 2) + g.TAssertEqual(writeCount, 1) + g.TAssertEqual(set, expected2) + + set["topic"].waiters[flowID2]["waiter-4"] = closed + reapClosedWaiters(readFn, writeFn) + g.TAssertEqual(readCount, 3) + g.TAssertEqual(writeCount, 2) + g.TAssertEqual(set, expected3) + }) +} + +func test_everyNthCall() { + g.TestStart("everyNthCall()") + + g.Testing("0 (incorrect) would make fn never be called", func() { + never := everyNthCall[int](0, func(int) { + panic("unreachable") + }) + for i := 0; i < 100; i++ { + never(i) + } + }) + + g.Testing("the first call is delayed to be the nth", func() { + count := 0 + values := []string{} + fn := everyNthCall[string](3, func(s string) { + count++ + values = append(values, s) + }) + + fn("1 ignored") + g.TAssertEqual(count, 0) + fn("2 ignored") + g.TAssertEqual(count, 0) + fn("3 not ignored") + g.TAssertEqual(count, 1) + + fn("4 ignored") + fn("5 ignored") + g.TAssertEqual(count, 1) + + fn("6 not ignored") + fn("7 ignored") + fn("8 ignored") + g.TAssertEqual(count, 2) + + fn("9 not ignored") + g.TAssertEqual(count, 3) + + expected := []string{ + "3 not ignored", + "6 not ignored", + "9 not ignored", + } + g.TAssertEqual(values, expected) + }) +} + +func test_runReaper() { + g.TestStart("runReaper()") + + g.Testing("debounce reapClosedWaiters `reaperSkipCount` times", func() { + set := subscriptionsSetM{} + + var ( + readCount = 0 + writeCount = 0 + ) + readFn := func(fn func(subscriptionsSetM) error) error { + readCount++ + return fn(set) + } + writeFn := func(fn func(subscriptionsSetM) error) error { + writeCount++ + return fn(set) + } + + var iterCount int + onPing := func(fn func(struct{})) { + for i := 0; i < iterCount; i++ { + fn(struct{}{}) + } + } + + iterCount = reaperSkipCount - 1 + runReaper(onPing, readFn, writeFn) + g.TAssertEqual(readCount, 0) + g.TAssertEqual(writeCount, 0) + + iterCount = reaperSkipCount + runReaper(onPing, readFn, writeFn) + g.TAssertEqual(readCount, 1) + g.TAssertEqual(writeCount, 0) + }) +} + +func test_NewWithPrefix() { + g.TestStart("NewWithPrefix()") + + g.Testing("we get an error with a bad prefix", func() { + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + _, err = NewWithPrefix(db, "a bad prefix") + g.TAssertEqual(err, g.ErrBadSQLTablePrefix) + }) + + g.Testing("otherwise we have a queueT and no errors", func() { + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + queue, err := NewWithPrefix(db, "good") + g.TErrorIf(err) + queue.Close() + + g.TAssertEqual(queue.(queueT).pinger.closed(), true) + }) +} + +func test_New() { + g.TestStart("New()") + + g.Testing("smoke test that we get a queueT", func() { + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + queue.Close() + + g.TAssertEqual(queue.(queueT).pinger.closed(), true) + }) +} + +func test_asPublicMessage() { + g.TestStart("asPublicMessage()") + + g.Testing("it picks the correct fields 🤷", func() { + input := messageT{ + uuid: guuid.New(), + timestamp: time.Now(), + topic: "topic", + flowID: guuid.New(), + payload: []byte("payload"), + } + + expected := Message{ + ID: input.uuid, + Timestamp: input.timestamp, + Topic: input.topic, + FlowID: input.flowID, + Payload: input.payload, + } + + given := asPublicMessage(input) + g.TAssertEqual(given, expected) + }) +} + +func test_queueT_Publish() { + g.TestStart("queueT.Publish()") + + const ( + topic = "queueT.Publish() topic" + payloadStr = "queueT.Publish() payload" + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + defer queue.Close() + + + g.Testing("it just publishes and returns", func() { + message, err := queue.Publish(unsent) + g.TErrorIf(err) + + g.TAssertEqual(message.Timestamp == time.Time{}, false) + g.TAssertEqual(message.Topic, topic) + g.TAssertEqual(message.FlowID, flowID) + g.TAssertEqual(message.Payload, payload) + }) + + queue.Close() + g.TAssertEqual(queue.(queueT).pinger.closed(), true) +} + +func test_registerConsumerFn() { + g.TestStart("registerConsumerFn()") + + g.Testing("adds a new topicSubscriptionT{} if needed", func() { + consumer := consumerT{ + data: consumerDataT{ + topic: "topic", + name: "consumer", + }, + } + + set := subscriptionsSetM{} + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumer, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + registerConsumerFn(consumer)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("otherwise it just uses what exists", func() { + flowID := guuid.New() + + consumer := consumerT{ + data: consumerDataT{ + topic: "topic", + name: "consumer", + }, + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "other-consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{}, + }, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumer, + "other-consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{}, + }, + }, + } + + registerConsumerFn(consumer)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("overwrites existing consumer if desired", func() { + close1 := func() {} + consumer1 := consumerT{ + data: consumerDataT{ + topic: "topic", + name: "consumer", + }, + close: &close1, + } + + close2 := func() {} + consumer2 := consumerT{ + data: consumerDataT{ + topic: "topic", + name: "consumer", + }, + close: &close2, + } + + set := subscriptionsSetM{} + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumer1, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected2 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumer2, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + registerConsumerFn(consumer1)(set) + g.TAssertEqual(set, expected1) + g.TAssertEqual(reflect.DeepEqual(set, expected2), false) + + registerConsumerFn(consumer2)(set) + g.TAssertEqual(set, expected2) + g.TAssertEqual(reflect.DeepEqual(set, expected1), false) + }) +} + +func test_registerWaiterFn() { + g.TestStart("registerWaiterFn()") + + g.Testing("adds a new topicSubscriptionT{} if needed", func() { + flowID := guuid.New() + + waiter := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + } + + set := subscriptionsSetM{} + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter, + }, + }, + }, + } + + registerWaiterFn(waiter)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("adds a new waiters map if needed", func() { + flowID := guuid.New() + + waiter := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter, + }, + }, + }, + } + + registerWaiterFn(waiter)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("otherwise it just uses what exists", func() { + flowID := guuid.New() + + waiter := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "other-waiter": waiterT{}, + }, + }, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter, + "other-waiter": waiterT{}, + }, + }, + }, + } + + registerWaiterFn(waiter)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("overwrites existing waiter if desired", func() { + flowID := guuid.New() + + close1 := func() {} + waiter1 := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + close: &close1, + } + + close2 := func() {} + waiter2 := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + close: &close2, + } + + set := subscriptionsSetM{} + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter1, + }, + }, + }, + } + + expected2 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter2, + }, + }, + }, + } + + registerWaiterFn(waiter1)(set) + g.TAssertEqual(set, expected1) + g.TAssertEqual(reflect.DeepEqual(set, expected2), false) + + registerWaiterFn(waiter2)(set) + g.TAssertEqual(set, expected2) + g.TAssertEqual(reflect.DeepEqual(set, expected1), false) + }) +} + +func test_makeConsumeOneFn() { + g.TestStart("makeConsumeOneFn()") + + savedLogger := slog.Default() + + s := new(strings.Builder) + g.SetLoggerOutput(s) + + var ( + successCount int + errorCount int + callbackErr error + successFnErr error + errorFnErr error + messages []Message + successNames []string + successIDs []guuid.UUID + errorNames []string + errorIDs []guuid.UUID + deadletterIDs []guuid.UUID + ) + + data := consumerDataT{ + topic: "topic", + name: "name", + } + + callback := func(message Message) error { + messages = append(messages, message) + return callbackErr + } + + successFn := func(name string, messageID guuid.UUID) error { + successCount++ + successNames = append(successNames, name) + successIDs = append(successIDs, messageID) + return successFnErr + } + + errorFn := func( + name string, + messageID guuid.UUID, + deadletterID guuid.UUID, + ) error { + errorCount++ + errorNames = append(errorNames, name) + errorIDs = append(errorIDs, messageID) + deadletterIDs = append(deadletterIDs, deadletterID) + + return errorFnErr + } + + consumeOneFn := makeConsumeOneFn( + data, + callback, + successFn, + errorFn, + ) + + message1 := messageT{ uuid: guuid.New() } + message2 := messageT{ uuid: guuid.New() } + message3 := messageT{ uuid: guuid.New() } + message4 := messageT{ uuid: guuid.New() } + + + g.Testing("error from successFn() is propagated", func() { + err := consumeOneFn(message1) + g.TErrorIf(err) + g.TAssertEqual(successCount, 1) + g.TAssertEqual(errorCount, 0) + + successFnErr = errors.New("successFn() error") + err = consumeOneFn(message2) + g.TAssertEqual(err, successFnErr) + g.TAssertEqual(successCount, 2) + g.TAssertEqual(errorCount, 0) + + g.TAssertEqual(s.String(), "") + }) + + g.Testing("error from callback() is logged and dropped", func() { + *s = strings.Builder{} + + callbackErr = errors.New("callback() error") + err := consumeOneFn(message3) + g.TErrorIf(err) + g.TAssertEqual(successCount, 2) + g.TAssertEqual(errorCount, 1) + g.TAssertEqual(s.String() == "", false) + }) + + g.Testing("error from errorFn() is propagated", func() { + *s = strings.Builder{} + + errorFnErr = errors.New("errorFn() error") + err := consumeOneFn(message4) + g.TAssertEqual(err, errorFnErr) + g.TAssertEqual(successCount, 2) + g.TAssertEqual(errorCount, 2) + g.TAssertEqual(s.String() == "", false) + }) + + g.Testing("calls happened with the expected arguments", func() { + expectedMessages := []Message{ + asPublicMessage(message1), + asPublicMessage(message2), + asPublicMessage(message3), + asPublicMessage(message4), + } + + g.TAssertEqual(messages, expectedMessages) + g.TAssertEqual(successNames, []string{ "name", "name" }) + g.TAssertEqual(errorNames, []string{ "name", "name" }) + g.TAssertEqual(successIDs, []guuid.UUID{ + message1.uuid, + message2.uuid, + }) + g.TAssertEqual(errorIDs, []guuid.UUID{ + message3.uuid, + message4.uuid, + }) + g.TAssertEqual(deadletterIDs[0] == message3.uuid, false) + g.TAssertEqual(deadletterIDs[1] == message4.uuid, false) + }) + + slog.SetDefault(savedLogger) +} + +func test_makeConsumeAllFn() { + g.TestStart("makeConsumeAllFn()") + + savedLogger := slog.Default() + + s := new(strings.Builder) + g.SetLoggerOutput(s) + + data := consumerDataT{} + + consumeOneFn := func(messageT) error { + return nil + } + + var eachFnErr error + eachFn := func(string, string, func(messageT) error) error { + return eachFnErr + } + + consumeAllFn := makeConsumeAllFn(data, consumeOneFn, eachFn) + + + g.Testing("silent when eachFn() returns no error", func() { + consumeAllFn(struct{}{}) + g.TAssertEqual(s.String(), "") + }) + + g.Testing("logs warning otherwise", func() { + eachFnErr = errors.New("eachFn() error") + consumeAllFn(struct{}{}) + g.TAssertEqual(s.String() == "", false) + }) + + slog.SetDefault(savedLogger) +} + +func test_makeWaitFn() { + g.TestStart("makeWaitFn()") + + g.Testing("all it does is send the data and close things", func() { + callCount := 0 + closeFn := func() { + callCount++ + } + + c := make(chan []byte, 1) + waitFn := makeWaitFn(c, closeFn) + + payload := []byte("payload") + waitFn(payload) + given := <- c + + g.TAssertEqual(given, payload) + g.TAssertEqual(callCount, 1) + }) + + g.Testing("you can call it twice for cases of race condition", func() { + callCount := 0 + closeFn := func() { + callCount++ + } + + c := make(chan []byte, 1) + waitFn := makeWaitFn(c, closeFn) + + payload := []byte("something") + waitFn(payload) + waitFn(payload) + given1 := <- c + given2 := <- c + + g.TAssertEqual(given1, payload) + g.TAssertEqual(given2, []byte(nil)) + }) +} + +func test_runConsumer() { + g.TestStart("runConsumer()") + + g.Testing("calls consumeAllFn() at least one", func() { + onPing := func(fn func(struct{})) {} + + count := 0 + consumeAllFn := func(struct{}) { + count++ + } + + runConsumer(onPing, consumeAllFn) + g.TAssertEqual(count, 1) + }) + + g.Testing("can call consumeAllFn() (onPing + 1) times", func() { + onPing := func(fn func(struct{})) { + fn(struct{}{}) + fn(struct{}{}) + fn(struct{}{}) + } + + count := 0 + consumeAllFn := func(struct{}) { + count++ + } + + runConsumer(onPing, consumeAllFn) + g.TAssertEqual(count, 4) + }) +} + +func test_tryFinding() { + g.TestStart("tryFinding()") + + g.Testing("noop in case of failure", func() { + myErr := errors.New("find() error") + findFn := func(string, guuid.UUID) (messageT, error) { + return messageT{}, myErr + } + + count := 0 + waitFn := func([]byte) { + count++ + } + + + tryFinding(findFn, "topic", guuid.New(), waitFn) + g.TAssertEqual(count, 0) + }) + + g.Testing("calls waitFn in case of success", func() { + payload := []byte("find() payload") + + findFn := func(string, guuid.UUID) (messageT, error) { + return messageT{ payload: payload }, nil + } + + payloads := [][]byte{} + waitFn := func(payload []byte) { + payloads = append(payloads, payload) + } + + + tryFinding(findFn, "topic", guuid.New(), waitFn) + g.TAssertEqual(payloads, [][]byte{ payload }) + }) +} + +func test_queueT_Subscribe() { + g.TestStart("queueT.Subscribe()") + + set := subscriptionsSetM{} + consumed := []guuid.UUID{} + messages := []messageT{ + messageT{ uuid: guuid.New() }, + messageT{ uuid: guuid.New() }, + } + + var takeErr error + queue := queueT{ + queries: queriesT{ + take: func(string, string) error { + return takeErr + }, + pending: func( + topic string, + consumer string, + fn func(messageT) error, + ) error { + for _, message := range messages { + consumed = append( + consumed, + message.uuid, + ) + _ = fn(message) + } + return nil + }, + commit: func( + consumer string, + messageID guuid.UUID, + ) error { + return nil + }, + toDead: func(string, guuid.UUID, guuid.UUID) error { + g.Unreachable() + return nil + }, + }, + subscriptions: subscriptionsT{ + write: func(fn func(subscriptionsSetM) error) error { + return fn(set) + }, + }, + } + + + g.Testing("registers our callback in the set and calls it", func() { + var wg sync.WaitGroup + wg.Add(2) + + queue.Subscribe("topic", "consumer-1", func(Message) error { + wg.Done() + return nil + }) + defer queue.Unsubscribe("topic", "consumer-1") + + wg.Wait() + g.TAssertEqual(consumed, []guuid.UUID{ + messages[0].uuid, + messages[1].uuid, + }) + }) + + g.Testing("our callback also gets called when pinged", func() { + consumed = []guuid.UUID{} + + var wg sync.WaitGroup + wg.Add(4) + + queue.Subscribe("topic", "consumer-2", func(m Message) error { + wg.Done() + return nil + }) + defer queue.Unsubscribe("topic", "consumer-2") + + consumer := set["topic"].consumers["consumer-2"] + consumer.pinger.tryPing(struct{}{}) + + wg.Wait() + + g.TAssertEqual(consumed, []guuid.UUID{ + messages[0].uuid, + messages[1].uuid, + messages[0].uuid, + messages[1].uuid, + }) + }) + + g.Testing("we try to own the topic", func() { + takeErr = errors.New("queueT.Subscribe() 1") + + err := queue.Subscribe("topic", "name", func(Message) error { + g.Unreachable() + return nil + }) + + g.TAssertEqual(err, takeErr) + takeErr = nil + }) + + g.Testing("if we can't own the topic, we don't get registered", func() { + takeErr = errors.New("queueT.Subscribe() 2") + + err := queue.Subscribe("topic", "name", func(Message) error { + g.Unreachable() + return nil + }) + g.TErrorNil(err) + + expected := subscriptionsSetM{} + g.TAssertEqual(set, expected) + + takeErr = nil + }) +} + +func test_queueT_WaitFor() { + g.TestStart("queueT.WaitFor()") + + findErr := errors.New("find() error") + message := messageT{ + payload: []byte("queueT.WaitFor() payload"), + } + + set := subscriptionsSetM{} + + queue := queueT{ + queries: queriesT{ + find: func( + topic string, + flowID guuid.UUID, + ) (messageT, error) { + return message, findErr + }, + }, + subscriptions: subscriptionsT{ + write: func(fn func(subscriptionsSetM) error) error { + return fn(set) + }, + read: func(fn func(subscriptionsSetM) error) error { + return fn(set) + }, + }, + } + + + g.Testing("registers the waiter in the set", func() { + flowID := guuid.New() + + defer queue.WaitFor("topic", flowID, "waiter-1").Close() + + expected := waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter-1", + } + + g.TAssertEqual( + set["topic"].waiters[flowID]["waiter-1"].data, + expected, + ) + }) + + g.Testing("the channel gets a message when waiter is pinged", func() { + flowID := guuid.New() + payload := []byte("sent payload") + + w := queue.WaitFor("topic", flowID, "waiter-2") + defer w.Close() + + waiter := set["topic"].waiters[flowID]["waiter-2"] + waiter.pinger.tryPing(payload) + + given := <- w.Channel + + g.TAssertEqual(given, payload) + g.TAssertEqual((*waiter.closed)(), true) + }) + + g.Testing("we can also WaitFor() after publishing the message", func() { + findErr = nil + flowID := guuid.New() + + w := queue.WaitFor("topic", flowID, "waiter-3") + defer w.Close() + + given := <- w.Channel + + waiter := set["topic"].waiters[flowID]["waiter-3"] + waiter.pinger.tryPing([]byte("ignored")) + + g.TAssertEqual(given, message.payload) + g.TAssertEqual((*waiter.closed)(), true) + }) + + g.Testing("if the data already exists we get it immediatelly", func() { + flowID := guuid.New() + + w := queue.WaitFor("topic", flowID, "waiter-4") + defer w.Close() + + waiter := set["topic"].waiters[flowID]["waiter-4"] + g.TAssertEqual((*waiter.closed)(), true) + + given := <- w.Channel + g.TAssertEqual(given, message.payload) + }) +} + +func test_unsubscribeIfExistsFn() { + g.TestStart("unsubscribeIfExistsFn()") + + g.Testing("noop on empty set", func() { + set := subscriptionsSetM{} + expected := subscriptionsSetM{} + unsubscribeIfExistsFn("topic", "consumer")(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("noop on missing topic", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{}, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{}, + } + + unsubscribeIfExistsFn("other-topic", "consumer")(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("noop on missing consumer", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + }, + } + + unsubscribeIfExistsFn("topic", "other-consumer")(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("closes consumer and removes it from set", func() { + flowID := guuid.New() + + count := 0 + close := func() { + count++ + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{ + close: &close, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{}, + }, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{}, + }, + }, + } + + unsubscribeIfExistsFn("topic", "consumer")(set) + g.TAssertEqual(set, expected) + g.TAssertEqual(count, 1) + }) + + g.Testing("empty topics are also removed", func() { + count := 0 + close := func() { + count++ + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{ + close: &close, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{} + + unsubscribeIfExistsFn("topic", "consumer")(set) + g.TAssertEqual(set, expected) + g.TAssertEqual(count, 1) + }) +} + +func test_queueT_Unsubscribe() { + g.TestStart("queueT.Unsubscribe()") + + g.Testing("calls unsubscribesIfExists() via writeFn()", func() { + closed := false + close := func() { + closed = true + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{ + close: &close, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{} + + queue := queueT{ + subscriptions: subscriptionsT{ + write: func( + fn func(subscriptionsSetM) error, + ) error { + return fn(set) + }, + }, + } + + queue.Unsubscribe("topic", "consumer") + g.TAssertEqual(set, expected) + g.TAssertEqual(closed, true) + }) +} + +func test_cleanSubscriptions() { + g.TestStart("cleanSubscriptions()") + + g.Testing("all consumers and waiters get close()'d", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + + type pairT struct{ + closed func() bool + fn func() + } + + mkclose := func() pairT { + closed := false + return pairT{ + closed: func() bool { + return closed + }, + fn: func() { + closed = true + }, + } + } + + c := mkclose() + g.TAssertEqual(c.closed(), false) + c.fn() + var x bool = c.closed() + g.TAssertEqual(x, true) + + close1 := mkclose() + close2 := mkclose() + close3 := mkclose() + close4 := mkclose() + close5 := mkclose() + close6 := mkclose() + close7 := mkclose() + close8 := mkclose() + + set := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer-1": consumerT{ + close: &close1.fn, + }, + "consumer-2": consumerT{ + close: &close2.fn, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": waiterT{ + close: &close3.fn, + }, + }, + flowID2: map[string]waiterT{ + "waiter-2": waiterT{ + close: &close4.fn, + }, + "waiter-3": waiterT{ + close: &close5.fn, + }, + }, + }, + }, + "topic-2": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer-3": consumerT{ + close: &close6.fn, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID3: map[string]waiterT{ + "waiter-4": waiterT{ + close: &close7.fn, + }, + }, + }, + }, + "topic-3": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + cleanSubscriptions(set) + + given := []bool{ + close1.closed(), + close2.closed(), + close3.closed(), + close4.closed(), + close5.closed(), + close6.closed(), + close7.closed(), + close8.closed(), + } + + expected := []bool{ + true, + true, + true, + true, + true, + true, + true, + false, + } + + g.TAssertEqualI(given, expected) + }) +} + +func test_queueT_Close() { + g.TestStart("queueT.Close()") + + g.Testing("clean pinger, subscriptions and queries", func() { + var ( + pingerCount = 0 + subscriptionsCount = 0 + queriesCount = 0 + + subscriptionsErr = errors.New("subscriptionsT{} error") + queriesErr = errors.New("queriesT{} error") + ) + queue := queueT{ + queries: queriesT{ + close: func() error{ + queriesCount++ + return queriesErr + }, + }, + subscriptions: subscriptionsT{ + write: func( + func(subscriptionsSetM) error, + ) error { + subscriptionsCount++ + return subscriptionsErr + }, + }, + pinger: pingerT[struct{}]{ + close: func() { + pingerCount++ + }, + }, + } + + err := queue.Close() + g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr)) + g.TAssertEqual(pingerCount, 1) + g.TAssertEqual(subscriptionsCount, 1) + g.TAssertEqual(queriesCount, 1) + }) +} + + +func test_topicGetopt() { + g.TestStart("topicGetopt()") + + g.Testing("checks for required positional argument", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{}, + } + + argsOut, ok := topicGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "Missing TOPIC.\n") + g.TAssertEqual(ok, false) + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("success otherwise", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"a topic"}, + } + + argsOut, ok := topicGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(ok, true) + argsIn.topic = "a topic" + g.TAssertEqual(argsOut, argsIn) + }) +} + +func test_topicConsumerGetopt() { + g.TestStart("topicConsumerGetopt()") + + g.Testing("checks for TOPIC argument", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{}, + } + + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "Missing TOPIC.\n") + g.TAssertEqual(ok, false) + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("we get an error on unsupported flag", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"-Z"}, + } + + const message = "flag provided but not defined: -Z\n" + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), message) + g.TAssertEqual(ok, false) + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("we also get an error on incorrect usage of flags", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"-C"}, + } + + const message = "flag needs an argument: -C\n" + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), message) + g.TAssertEqual(ok, false) + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("we can customize the CONSUMER", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"-C", "custom consumer", "this topic"}, + } + + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(ok, true) + argsIn.topic = "this topic" + argsIn.consumer = "custom consumer" + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("otherwise we get the default one", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"T"}, + } + + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(ok, true) + argsIn.topic = "T" + argsIn.consumer = "default-consumer" + g.TAssertEqual(argsOut, argsIn) + }) +} + +func test_inExec() { + g.TestStart("inExec()") + + const ( + topic = "inExec topic" + payloadStr = "inExec payload" + ) + var ( + payload = []byte(payloadStr) + args = argsT{ + topic: topic, + } + ) + + var ( + publishErr error + messages []messageT + id int64 = 0 + ) + queries := queriesT{ + publish: func( + unsent UnsentMessage, + messageID guuid.UUID, + ) (messageT, error) { + if publishErr != nil { + return messageT{}, publishErr + } + + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: messageID, + topic: unsent.Topic, + flowID: unsent.FlowID, + payload: unsent.Payload, + } + messages = append(messages, message) + return message, nil + }, + } + + + g.Testing("messageID to output when successful", func() { + r := bytes.NewReader(payload) + var w strings.Builder + rc, err := inExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(messages[0].topic, topic) + g.TAssertEqual(messages[0].payload, payload) + g.TAssertEqual(rc, 0) + g.TAssertEqual(w.String(), messages[0].uuid.String() + "\n") + }) + + g.Testing("if reading fails, we return the error", func() { + var r *os.File + var w strings.Builder + rc, err := inExec(args, queries, r, &w) + g.TAssertEqual( + err.Error(), + "invalid argument", + ) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) + + g.Testing("if publishing fails, we return the error", func() { + publishErr = errors.New("publish() error") + r := strings.NewReader("read but not published") + var w strings.Builder + rc, err := inExec(args, queries, r, &w) + g.TAssertEqual(err, publishErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_outExec() { + g.TestStart("outExec()") + + const ( + topic = "outExec topic" + consumer = "outExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + takeErr error + nextErr error + messages []messageT + id int64 = 0 + ) + queries := queriesT{ + take: func(string, string) error { + return takeErr + }, + next: func(string, string) (messageT, error) { + if nextErr != nil { + return messageT{}, nextErr + } + + if len(messages) == 0 { + return messageT{}, sql.ErrNoRows + } + + return messages[0], nil + }, + } + pub := func(payload []byte) { + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + + + g.Testing("exit code 1 when we can't take", func() { + takeErr = errors.New("outExec() take error") + + var w strings.Builder + + rc, err := outExec(args, queries, r, &w) + g.TAssertEqual(err, takeErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + + takeErr = nil + }) + + g.Testing("exit code 3 when no message is available", func() { + var w strings.Builder + + rc, err := outExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 3) + }) + + g.Testing("we get the same message until we commit", func() { + var ( + w1 strings.Builder + w2 strings.Builder + w3 strings.Builder + ) + args := argsT{ + topic: topic, + consumer: consumer, + } + + pub([]byte("first payload")) + pub([]byte("second payload")) + + rc1, err1 := outExec(args, queries, r, &w1) + rc2, err2 := outExec(args, queries, r, &w2) + messages = messages[1:] + rc3, err3 := outExec(args, queries, r, &w3) + + g.TErrorIf(g.SomeError(err1, err2, err3)) + g.TAssertEqual(w1.String(), "first payload\n") + g.TAssertEqual(w2.String(), "first payload\n") + g.TAssertEqual(w3.String(), "second payload\n") + g.TAssertEqual(rc1, 0) + g.TAssertEqual(rc2, 0) + g.TAssertEqual(rc3, 0) + }) + + g.Testing("we propagate the error when the query fails", func() { + nextErr = errors.New("next() error") + var w strings.Builder + rc, err := outExec(args, queries, r, &w) + g.TAssertEqual(err, nextErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_commitExec() { + g.TestStart("commitExec()") + + const ( + topic = "commitExec topic" + consumer = "commitExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + takeErr error + nextErr error + commitErr error + messages []messageT + id int64 = 0 + ) + queries := queriesT{ + take: func(string, string) error { + return takeErr + }, + next: func(string, string) (messageT, error) { + if nextErr != nil { + return messageT{}, nextErr + } + + if len(messages) == 0 { + return messageT{}, sql.ErrNoRows + } + + return messages[0], nil + }, + commit: func(string, guuid.UUID) error { + if commitErr != nil { + return commitErr + } + + messages = messages[1:] + return nil + }, + } + pub := func(payload []byte) { + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + + + g.Testing("error when we can't take", func() { + takeErr = errors.New("commitExec() take error") + + var w strings.Builder + + rc, err := commitExec(args, queries, r, &w) + g.TAssertEqual(err, takeErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + + takeErr = nil + }) + + g.Testing("error when there is nothing to commit", func() { + var w strings.Builder + + rc, err := commitExec(args, queries, r, &w) + g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) + + g.Testing("messages get committed in order", func() { + var w strings.Builder + + pub([]byte("first payload")) + pub([]byte("second payload")) + pub([]byte("third payload")) + + message1 := messages[0] + g.TAssertEqual(message1.payload, []byte("first payload")) + + rc, err := commitExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + message2 := messages[0] + g.TAssertEqual(message2.payload, []byte("second payload")) + + rc, err = commitExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + message3 := messages[0] + g.TAssertEqual(message3.payload, []byte("third payload")) + + rc, err = commitExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + g.TAssertEqual(len(messages), 0) + }) + + g.Testing("when next() query fails, we propagate its result", func() { + nextErr = errors.New("next() error") + var w strings.Builder + rc, err := commitExec(args, queries, r, &w) + g.TAssertEqual(err, nextErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + nextErr = nil + }) + + g.Testing("we also propagate the error on commit() failure", func() { + commitErr = errors.New("commit() error") + pub([]byte{}) + var w strings.Builder + rc, err := commitExec(args, queries, r, &w) + g.TAssertEqual(err, commitErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_deadExec() { + g.TestStart("deadExec()") + + const ( + topic = "deadExec topic" + consumer = "deadExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + takeErr error + nextErr error + toDeadErr error + messages []messageT + id int64 = 0 + ) + queries := queriesT{ + take: func(string, string) error { + return takeErr + }, + next: func(string, string) (messageT, error) { + if nextErr != nil { + return messageT{}, nextErr + } + + if len(messages) == 0 { + return messageT{}, sql.ErrNoRows + } + + return messages[0], nil + }, + toDead: func( + _ string, + _ guuid.UUID, + deadletterID guuid.UUID, + ) error { + if toDeadErr != nil { + return toDeadErr + } + + messages = messages[1:] + return nil + }, + } + pub := func(payload []byte) { + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + + + g.Testing("error when we can't take", func() { + takeErr = errors.New("deadExec() take error") + + var w strings.Builder + + rc, err := deadExec(args, queries, r, &w) + g.TAssertEqual(err, takeErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + + takeErr = nil + }) + + g.Testing("error when there is nothing to mark as dead", func() { + var w strings.Builder + + rc, err := deadExec(args, queries, r, &w) + g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) + + g.Testing("the latest message becomes a deadletter", func() { + var w strings.Builder + + pub([]byte("first payload")) + pub([]byte("second payload")) + + message1 := messages[0] + g.TAssertEqual(message1.payload, []byte("first payload")) + + rc, err := deadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + message2 := messages[0] + g.TAssertEqual(message2.payload, []byte("second payload")) + + rc, err = deadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + g.TAssertEqual(len(messages), 0) + }) + + g.Testing("next() error is propagated", func() { + nextErr = errors.New("next() error") + var w strings.Builder + rc, err := deadExec(args, queries, r, &w) + g.TAssertEqual(err, nextErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + nextErr = nil + }) + + g.Testing("toDead() error is propagated", func() { + toDeadErr = errors.New("toDead() error") + pub([]byte{}) + var w strings.Builder + rc, err := deadExec(args, queries, r, &w) + g.TAssertEqual(err, toDeadErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_listDeadExec() { + g.TestStart("listDeadExec()") + + const ( + topic = "listDeadExec topic" + consumer = "listDeadExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + messages []messageT + deadletters []deadletterT + id int64 = 0 + errorIndex = -1 + allDeadErr = errors.New("allDead() error") + ) + queries := queriesT{ + allDead: func( + _ string, + _ string, + callback func(deadletterT, messageT) error, + ) error { + for i, deadletter := range deadletters { + if i == errorIndex { + return allDeadErr + } + + callback(deadletter, messageT{}) + } + + return nil + }, + } + pub := func() { + payload := []byte("ignored payload for this test") + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + commit := func() { + messages = messages[1:] + } + dead := func() { + message := messages[0] + now := time.Now() + deadletter := deadletterT{ + uuid: guuid.New(), + timestamp: now, + consumer: consumer, + messageID: message.uuid, + } + + messages = messages[1:] + deadletters = append(deadletters, deadletter) + } + replay := func() { + pub() + deadletters = deadletters[1:] + } + + + g.Testing("nothing is shown if topic is empty", func() { + var w strings.Builder + + rc, err := listDeadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + }) + + g.Testing("deadletters are printed in order", func() { + var w strings.Builder + + pub() + pub() + pub() + + rc, err := listDeadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + + dead() + commit() + dead() + + expected := fmt.Sprintf( + "%s\t%s\t%s\n%s\t%s\t%s\n", + deadletters[0].uuid.String(), + deadletters[0].timestamp.Format(time.RFC3339), + deadletters[0].consumer, + deadletters[1].uuid.String(), + deadletters[1].timestamp.Format(time.RFC3339), + deadletters[1].consumer, + ) + + rc, err = listDeadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), expected) + g.TAssertEqual(rc, 0) + }) + + g.Testing("deadletters disappear after being replayed", func() { + var ( + w1 strings.Builder + w2 strings.Builder + ) + + replay() + + deadletter := deadletters[0] + expected := fmt.Sprintf( + "%s\t%s\t%s\n", + deadletter.uuid.String(), + deadletter.timestamp.Format(time.RFC3339), + deadletter.consumer, + ) + + rc, err := listDeadExec(args, queries, r, &w1) + g.TErrorIf(err) + g.TAssertEqual(w1.String(), expected) + g.TAssertEqual(rc, 0) + + replay() + + rc, err = listDeadExec(args, queries, r, &w2) + g.TErrorIf(err) + g.TAssertEqual(w2.String(), "") + g.TAssertEqual(rc, 0) + }) + + g.Testing("a database failure interrupts the output", func() { + var w strings.Builder + + pub() + pub() + pub() + dead() + dead() + dead() + + deadletter := deadletters[0] + expected := fmt.Sprintf( + "%s\t%s\t%s\n", + deadletter.uuid.String(), + deadletter.timestamp.Format(time.RFC3339), + deadletter.consumer, + ) + + errorIndex = 1 + rc, err := listDeadExec(args, queries, r, &w) + g.TAssertEqual(err, allDeadErr) + g.TAssertEqual(w.String(), expected) + g.TAssertEqual(rc, 1) + }) +} + +func test_replayExec() { + g.TestStart("replayExec()") + + const ( + topic = "replayExec topic" + consumer = "replayExec consumer" + ) + var ( + w strings.Builder + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + oneDeadErr error + replayErr error + messages []messageT + deadletters []deadletterT + deadMessages []messageT + id int64 = 0 + ) + queries := queriesT{ + oneDead: func(string, string) (deadletterT, error) { + if oneDeadErr != nil { + return deadletterT{}, oneDeadErr + } + + if len(deadletters) == 0 { + return deadletterT{}, sql.ErrNoRows + } + + return deadletters[0], nil + }, + replay: func(guuid.UUID, guuid.UUID) (messageT, error) { + if replayErr != nil { + return messageT{}, replayErr + } + + message := deadMessages[0] + messages = append(messages, message) + deadletters = deadletters[1:] + deadMessages = deadMessages[1:] + return message, nil + }, + } + pub := func(payload []byte) { + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + commit := func() { + messages = messages[1:] + } + dead := func() { + message := messages[0] + deadletter := deadletterT{ uuid: guuid.New() } + + messages = messages[1:] + deadletters = append(deadletters, deadletter) + deadMessages = append(deadMessages, message) + } + next := func() string { + return string(messages[0].payload) + } + + + g.Testing("error when there is nothing to replay", func() { + rc, err := replayExec(args, queries, r, &w) + g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + + pub([]byte("first payload")) + pub([]byte("second payload")) + pub([]byte("third payload")) + pub([]byte("fourth payload")) + + rc, err = replayExec(args, queries, r, &w) + g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) + + g.Testing("deadletters are replayed in order", func() { + dead() + commit() + dead() + commit() + + rc, err := replayExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(next(), "first payload") + + commit() + + rc, err = replayExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(next(), "third payload") + }) + + g.Testing("oneDead() error is forwarded", func() { + oneDeadErr = errors.New("oneDead() error") + rc, err := replayExec(args, queries, r, &w) + g.TAssertEqual(err, oneDeadErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + oneDeadErr = nil + }) + + g.Testing("replay() error is also forwarded", func() { + pub([]byte{}) + dead() + + replayErr = errors.New("replay() error") + rc, err := replayExec(args, queries, r, &w) + g.TAssertEqual(err, replayErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_sizeExec() { + g.TestStart("sizeExec()") + + const ( + topic = "sizeExec topic" + consumer = "sizeExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var sizeErr error + queries := queriesT{ + size: func(string) (int, error) { + if sizeErr != nil { + return -1, sizeErr + } + + return 123, nil + }, + } + + + g.Testing("it propagates the error when the query fails", func() { + sizeErr = errors.New("size() error") + var w strings.Builder + rc, err := sizeExec(args, queries, r, &w) + g.TAssertEqual(err, sizeErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + sizeErr = nil + }) + + g.Testing("otherwise it just prints what is was given", func() { + var w strings.Builder + rc, err := sizeExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "123\n") + g.TAssertEqual(rc, 0) + }) +} + +func test_countExec() { + g.TestStart("countExec()") + + const ( + topic = "countExec topic" + consumer = "countExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var countErr error + queries := queriesT{ + count: func(string, string) (int, error) { + if countErr != nil { + return -1, countErr + } + + return 2222, nil + }, + } + + + g.Testing("it propagates the query error", func() { + countErr = errors.New("count() error") + var w strings.Builder + rc, err := countExec(args, queries, r, &w) + g.TAssertEqual(err, countErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + countErr = nil + }) + + g.Testing("otherwise it prints the given count", func() { + var w strings.Builder + rc, err := countExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "2222\n") + g.TAssertEqual(rc, 0) + }) +} + +func test_hasDataExec() { + g.TestStart("hasData()") + + const ( + topic = "hasData topic" + consumer = "hasData consumer" + ) + var ( + w strings.Builder + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + hasData := true + var hasDataErr error + queries := queriesT{ + hasData: func(string, string) (bool, error) { + if hasDataErr != nil { + return false, hasDataErr + } + + return hasData, nil + }, + } + + + g.Testing("it propagates the query error", func() { + hasDataErr = errors.New("hasData() error") + rc, err := hasDataExec(args, queries, r, &w) + g.TAssertEqual(err, hasDataErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + hasDataErr = nil + }) + + g.Testing("otherwise if just returns (not prints) the flag", func() { + hasData = true + rc, err := hasDataExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + hasData = false + rc, err = hasDataExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 1) + + g.TAssertEqual(w.String(), "") + }) +} + +func test_usage() { + g.TestStart("usage()") + + g.Testing("it just writes to io.Writer", func() { + var w strings.Builder + usage("xxx", &w) + const message = + "Usage: xxx [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" + g.TAssertEqual(w.String(), message) + }) + + g.Testing("noop on io.Discard for io.Writer", func() { + usage("AN ERROR IF SEEN ANYWHERE!", io.Discard) + }) +} + +func test_getopt() { + g.TestStart("getopt()") + + const warning = "Missing COMMAND.\n" + const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" + + execFn := func(argsT, queriesT, io.Reader, io.Writer) (int, error) { + return 0, nil + } + commandsMap := map[string]commandT { + "good": commandT{ + name: "good", + getopt: func(args argsT, _ io.Writer) (argsT, bool) { + return args, true + }, + exec: execFn, + }, + "bad": commandT{ + name: "bad", + getopt: func(args argsT, w io.Writer) (argsT, bool) { + if len(args.args) == 0 { + fmt.Fprintln( + w, + "no required arg", + ) + return args, false + } + + if args.args[0] != "required" { + fmt.Fprintln( + w, + "not correct one", + ) + return args, false + } + + args.topic = "a topic" + args.consumer = "a consumer" + return args, true + }, + exec: execFn, + }, + } + + + g.Testing("we suppress the default error message", func() { + var w strings.Builder + argv := []string{"$0", "-h"} + _, _, rc := getopt(argv, commandsMap, &w) + + g.TAssertEqual(w.String(), usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("we get an error on unsupported flag", func() { + var w strings.Builder + argv := []string{"$0", "-X"} + _, _, rc := getopt(argv, commandsMap, &w) + + const message = "flag provided but not defined: -X\n" + g.TAssertEqual(w.String(), message + usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("we also get an error on incorrect usage of flags", func() { + var w strings.Builder + argv := []string{"$0", "-f"} + _, _, rc := getopt(argv, commandsMap, &w) + + const message = "flag needs an argument: -f\n" + g.TAssertEqual(w.String(), message + usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("error when not given a command", func() { + var w strings.Builder + argv := []string{"$0"} + _, _, rc := getopt(argv, commandsMap, &w) + + g.TAssertEqual(w.String(), warning + usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("error on unknown command", func() { + var w strings.Builder + argv := []string{"$0", "unknown"} + _, _, rc := getopt(argv, commandsMap, &w) + + const message = `Bad COMMAND: "unknown".` + "\n" + g.TAssertEqual(w.String(), message + usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("checks the command usage", func() { + var ( + w1 strings.Builder + w2 strings.Builder + w3 strings.Builder + ) + + argv1 := []string{"$0", "bad"} + argv2 := []string{"$0", "bad", "arg"} + argv3 := []string{"$0", "bad", "required"} + _, _, rc1 := getopt(argv1, commandsMap, &w1) + _, _, rc2 := getopt(argv2, commandsMap, &w2) + args, command, rc3 := getopt(argv3, commandsMap, &w3) + expectedArgs := argsT{ + databasePath: "q.db", + prefix: "q", + command: "bad", + allArgs: argv3, + args: argv3[2:], + topic: "a topic", + consumer: "a consumer", + } + + g.TAssertEqual(w1.String(), "no required arg\n" + usage) + g.TAssertEqual(w2.String(), "not correct one\n" + usage) + g.TAssertEqual(w3.String(), "") + g.TAssertEqual(rc1, 2) + g.TAssertEqual(rc2, 2) + g.TAssertEqual(rc3, 0) + g.TAssertEqual(args, expectedArgs) + g.TAssertEqual(command.name, "bad") + }) + + g.Testing("when given a command we the default values", func() { + var w strings.Builder + args, command, rc := getopt( + []string{"$0", "good"}, + commandsMap, + &w, + ) + expectedArgs := argsT{ + databasePath: "q.db", + prefix: "q", + command: "good", + allArgs: []string{"$0", "good"}, + args: []string{}, + } + + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(args, expectedArgs) + g.TAssertEqual(command.name, "good") + }) + + g.Testing("we can customize both values", func() { + var w strings.Builder + argv := []string{"$0", "-f", "F", "-p", "P", "good"} + args, command, rc := getopt(argv, commandsMap, &w) + expectedArgs := argsT{ + databasePath: "F", + prefix: "P", + command: "good", + allArgs: argv, + args: []string{}, + } + + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(args, expectedArgs) + g.TAssertEqual(command.name, "good") + }) + + g.Testing("a command can have its own commands and options", func() { + var w strings.Builder + argv := []string{"$0", "-f", "F", "good", "-f", "-f", "SUB"} + args, command, rc := getopt(argv, commandsMap, &w) + expectedArgs := argsT{ + databasePath: "F", + prefix: "q", + command: "good", + allArgs: argv, + args: []string{"-f", "-f", "SUB"}, + } + + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(args, expectedArgs) + g.TAssertEqual(command.name, "good") + }) +} + +func test_runCommand() { + g.TestStart("runCommand()") + + const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" + + + g.Testing("returns an error on bad prefix", func() { + stdin := strings.NewReader("") + var ( + stdout strings.Builder + stderr strings.Builder + ) + args := argsT{ + prefix: "a bad prefix", + command: "in", + allArgs: []string{"$0"}, + args: []string{"some topic name"}, + } + rc := runCommand(args, commands["in"], stdin, &stdout, &stderr) + + g.TAssertEqual(rc, 1) + g.TAssertEqual(stdout.String(), "") + g.TAssertEqual(stderr.String(), "Invalid table prefix\n") + }) + + g.Testing("otherwise it build a queueT and calls command", func() { + stdin := strings.NewReader("") + var ( + stdout1 strings.Builder + stdout2 strings.Builder + stderr1 strings.Builder + stderr2 strings.Builder + ) + args1 := argsT{ + prefix: defaultPrefix, + command: "good", + } + args2 := argsT{ + prefix: defaultPrefix, + command: "bad", + } + myErr := errors.New("an error") + good := commandT{ + exec: func( + argsT, + queriesT, + io.Reader, + io.Writer, + ) (int, error) { + return 0, nil + }, + } + bad := commandT{ + exec: func( + _ argsT, + _ queriesT, + _ io.Reader, + w io.Writer, + ) (int, error) { + fmt.Fprintf(w, "some text\n") + return 1, myErr + }, + } + rc1 := runCommand(args1, good, stdin, &stdout1, &stderr1) + rc2 := runCommand(args2, bad, stdin, &stdout2, &stderr2) + + g.TAssertEqual(stdout1.String(), "") + g.TAssertEqual(stdout2.String(), "some text\n") + g.TAssertEqual(stderr1.String(), "") + g.TAssertEqual(stderr2.String(), "an error\n") + g.TAssertEqual(rc1, 0) + g.TAssertEqual(rc2, 1) + }) +} + + +func dumpQueries() { + queries := []struct{name string; fn func(string) queryT}{ + { "createTables", createTablesSQL }, + { "take", takeSQL }, + { "publish", publishSQL }, + { "find", findSQL }, + { "pending", pendingSQL }, + { "commit", commitSQL }, + { "toDead", toDeadSQL }, + { "replay", replaySQL }, + { "oneDead", oneDeadSQL }, + { "allDead", allDeadSQL }, + { "size", sizeSQL }, + { "count", countSQL }, + { "hasData", hasDataSQL }, + } + for _, query := range queries { + q := query.fn(defaultPrefix) + fmt.Printf("\n-- %s.sql:", query.name) + fmt.Printf("\n-- write: %s\n", q.write) + fmt.Printf("\n-- read: %s\n", q.read) + fmt.Printf("\n-- owner: %s\n", q.owner) + } +} + + + +func MainTest() { + if os.Getenv("TESTING_DUMP_SQL_QUERIES") != "" { + dumpQueries() + return + } + + g.Init() + test_defaultPrefix() + test_inTx() + test_createTables() + test_takeStmt() + test_publishStmt() + test_findStmt() + test_nextStmt() + test_messageEach() + test_pendingStmt() + test_commitStmt() + test_toDeadStmt() + test_replayStmt() + test_oneDeadStmt() + test_deadletterEach() + test_allDeadStmt() + test_sizeStmt() + test_countStmt() + test_hasDataStmt() + test_initDB() + test_queriesTclose() + test_newPinger() + test_makeSubscriptionsFunc() + test_makeNotifyFn() + test_collectClosedWaiters() + test_trimEmptyLeaves() + test_deleteIfEmpty() + test_deleteEmptyTopics() + test_removeClosedWaiter() + test_reapClosedWaiters() + test_everyNthCall() + test_runReaper() + test_NewWithPrefix() + test_New() + test_asPublicMessage() + test_queueT_Publish() + test_registerConsumerFn() + test_registerWaiterFn() + test_makeConsumeOneFn() + test_makeConsumeAllFn() + test_makeWaitFn() + test_runConsumer() + test_tryFinding() + test_queueT_Subscribe() + test_queueT_WaitFor() + test_unsubscribeIfExistsFn() + test_queueT_Unsubscribe() + test_cleanSubscriptions() + test_queueT_Close() + test_topicGetopt() + test_topicConsumerGetopt() + test_inExec() + test_outExec() + test_commitExec() + test_deadExec() + test_listDeadExec() + test_replayExec() + test_sizeExec() + test_countExec() + test_hasDataExec() + test_usage() + test_getopt() + test_runCommand() +} diff --git a/tests/queries.sql b/tests/queries.sql new file mode 100644 index 0000000..2515778 --- /dev/null +++ b/tests/queries.sql @@ -0,0 +1,333 @@ + +-- createTables.sql: +-- write: + CREATE TABLE IF NOT EXISTS "q_payloads" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), + topic TEXT NOT NULL, + payload BLOB NOT NULL + ) STRICT; + CREATE INDEX IF NOT EXISTS "q_payloads_topic" + ON "q_payloads"(topic); + + CREATE TABLE IF NOT EXISTS "q_messages" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), + uuid BLOB NOT NULL UNIQUE, + flow_id BLOB NOT NULL, + payload_id INTEGER NOT NULL + REFERENCES "q_payloads"(id) + ) STRICT; + CREATE INDEX IF NOT EXISTS "q_messages_flow_id" + ON "q_messages"(flow_id); + + CREATE TABLE IF NOT EXISTS "q_offsets" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), + consumer TEXT NOT NULL, + message_id INTEGER NOT NULL + REFERENCES "q_messages"(id), + UNIQUE (consumer, message_id) + ) STRICT; + CREATE INDEX IF NOT EXISTS "q_offsets_consumer" + ON "q_offsets"(consumer); + + CREATE TABLE IF NOT EXISTS "q_deadletters" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + uuid BLOB NOT NULL UNIQUE, + consumer TEXT NOT NULL, + message_id INTEGER NOT NULL + REFERENCES "q_messages"(id), + UNIQUE (consumer, message_id) + ) STRICT; + CREATE INDEX IF NOT EXISTS "q_deadletters_consumer" + ON "q_deadletters"(consumer); + + CREATE TABLE IF NOT EXISTS "q_replays" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + deadletter_id INTEGER NOT NULL UNIQUE + REFERENCES "q_deadletters"(id) , + message_id INTEGER NOT NULL UNIQUE + REFERENCES "q_messages"(id) + ) STRICT; + + CREATE TABLE IF NOT EXISTS "q_owners" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + topic TEXT NOT NULL, + consumer TEXT NOT NULL, + owner_id INTEGER NOT NULL, + UNIQUE (topic, consumer) + ) STRICT; + + +-- read: + +-- owner: + +-- take.sql: +-- write: + INSERT INTO "q_owners" (topic, consumer, owner_id) + VALUES (?, ?, ?) + ON CONFLICT (topic, consumer) DO + UPDATE SET owner_id=excluded.owner_id; + + +-- read: + +-- owner: + +-- publish.sql: +-- write: + INSERT INTO "q_payloads" (topic, payload) + VALUES (?, ?); + + INSERT INTO "q_messages" (uuid, flow_id, payload_id) + VALUES (?, ?, last_insert_rowid()); + + +-- read: + SELECT id, timestamp FROM "q_messages" + WHERE uuid = ?; + + +-- owner: + +-- find.sql: +-- write: + +-- read: + SELECT + "q_messages".id, + "q_messages".timestamp, + "q_messages".uuid, + "q_payloads".payload + FROM "q_messages" + JOIN "q_payloads" ON + "q_payloads".id = "q_messages".payload_id + WHERE + "q_payloads".topic = ? AND + "q_messages".flow_id = ? + ORDER BY "q_messages".id DESC + LIMIT 1; + + +-- owner: + +-- pending.sql: +-- write: + +-- read: + SELECT + "q_messages".id, + "q_messages".timestamp, + "q_messages".uuid, + "q_messages".flow_id, + "q_payloads".topic, + "q_payloads".payload + FROM "q_messages" + JOIN "q_payloads" ON + "q_payloads".id = "q_messages".payload_id + WHERE + "q_payloads".topic = ? AND + "q_messages".id NOT IN ( + SELECT message_id FROM "q_offsets" + WHERE consumer = ? + ) + ORDER BY "q_messages".id ASC; + + +-- owner: + SELECT owner_id FROM "q_owners" + WHERE + topic = ? AND + consumer = ?; + + +-- commit.sql: +-- write: + INSERT INTO "q_offsets" (consumer, message_id) + VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?)); + + +-- read: + SELECT "q_payloads".topic from "q_payloads" + JOIN "q_messages" ON + "q_payloads".id = "q_messages".payload_id + WHERE "q_messages".uuid = ?; + + +-- owner: + SELECT owner_id FROM "q_owners" + WHERE + topic = ? AND + consumer = ?; + + +-- toDead.sql: +-- write: + INSERT INTO "q_offsets" ( consumer, message_id) + VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?)); + + INSERT INTO "q_deadletters" (uuid, consumer, message_id) + VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?)); + + +-- read: + SELECT "q_payloads".topic FROM "q_payloads" + JOIN "q_messages" ON + "q_payloads".id = "q_messages".payload_id + WHERE "q_messages".uuid = ?; + + +-- owner: + SELECT owner_id FROM "q_owners" + WHERE + topic = ? AND + consumer = ?; + + +-- replay.sql: +-- write: + INSERT INTO "q_messages" (uuid, flow_id, payload_id) + SELECT + ?, + "q_messages".flow_id, + "q_messages".payload_id + FROM "q_messages" + JOIN "q_deadletters" ON + "q_messages".id = "q_deadletters".message_id + WHERE "q_deadletters".uuid = ?; + + INSERT INTO "q_replays" (deadletter_id, message_id) + VALUES ( + (SELECT id FROM "q_deadletters" WHERE uuid = ?), + last_insert_rowid() + ); + + +-- read: + SELECT + "q_messages".id, + "q_messages".timestamp, + "q_messages".flow_id, + "q_payloads".topic, + "q_payloads".payload + FROM "q_messages" + JOIN "q_payloads" ON + "q_payloads".id = "q_messages".payload_id + WHERE "q_messages".uuid = ?; + + +-- owner: + +-- oneDead.sql: +-- write: + +-- read: + SELECT + "q_deadletters".uuid, + "q_offsets".timestamp, + "q_messages".uuid + FROM "q_deadletters" + JOIN "q_offsets" ON + "q_deadletters".message_id = "q_offsets".message_id + JOIN "q_messages" ON + "q_deadletters".message_id = "q_messages".id + JOIN "q_payloads" ON + "q_messages".payload_id = "q_payloads".id + WHERE + "q_payloads".topic = ? AND + "q_deadletters".consumer = ? AND + "q_offsets".consumer = ? AND + "q_deadletters".id NOT IN ( + SELECT deadletter_id FROM "q_replays" + ) + ORDER BY "q_deadletters".id ASC + LIMIT 1; + + +-- owner: + +-- allDead.sql: +-- write: + +-- read: + SELECT + "q_deadletters".uuid, + "q_deadletters".message_id, + "q_offsets".timestamp, + "q_offsets".consumer, + "q_messages".timestamp, + "q_messages".uuid, + "q_messages".flow_id, + "q_payloads".topic, + "q_payloads".payload + FROM "q_deadletters" + JOIN "q_offsets" ON + "q_deadletters".message_id = "q_offsets".message_id + JOIN "q_messages" ON + "q_deadletters".message_id = "q_messages".id + JOIN "q_payloads" ON + "q_messages".payload_id = "q_payloads".id + WHERE + "q_payloads".topic = ? AND + "q_deadletters".consumer = ? AND + "q_offsets".consumer = ? AND + "q_deadletters".id NOT IN ( + SELECT deadletter_id FROM "q_replays" + ) + ORDER BY "q_deadletters".id ASC; + + +-- owner: + +-- size.sql: +-- write: + +-- read: + SELECT + COUNT(1) as size + FROM "q_messages" + JOIN "q_payloads" ON + "q_messages".payload_id = "q_payloads".id + WHERE "q_payloads".topic = ?; + + +-- owner: + +-- count.sql: +-- write: + +-- read: + SELECT + COUNT(1) as count + FROM "q_messages" + JOIN "q_offsets" ON + "q_messages".id = "q_offsets".message_id + JOIN "q_payloads" ON + "q_messages".payload_id = "q_payloads".id + WHERE + "q_payloads".topic = ? AND + "q_offsets".consumer = ?; + + +-- owner: + +-- hasData.sql: +-- write: + +-- read: + SELECT 1 as data + FROM "q_messages" + JOIN "q_payloads" ON + "q_payloads".id = "q_messages".payload_id + WHERE + "q_payloads".topic = ? AND + "q_messages".id NOT IN ( + SELECT message_id FROM "q_offsets" + WHERE consumer = ? + ) + LIMIT 1; + + +-- owner: |