diff options
80 files changed, 0 insertions, 10341 deletions
diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 114f11d..0000000 --- a/.gitignore +++ /dev/null @@ -1,20 +0,0 @@ -/doc/* -!/doc/*.en.*.adoc -/po/*/*.mo -/src/meta.go -/*.bin -/*.db* -/src/*.a -/src/*.bin -/src/*cgo* -/tests/*.a -/tests/*.bin -/tests/functional/*/*.a -/tests/functional/*/*.bin -/tests/functional/*/*.go.db* -/tests/fuzz/*/*.a -/tests/fuzz/*/*.bin -/tests/benchmarks/*/*.a -/tests/benchmarks/*/*.bin -/tests/benchmarks/*/*.txt -/tests/fuzz/corpus/ diff --git a/Makefile b/Makefile deleted file mode 100644 index 5eec04a..0000000 --- a/Makefile +++ /dev/null @@ -1,209 +0,0 @@ -.POSIX: -DATE = 1970-01-01 -VERSION = 0.1.0 -NAME = fiinha -NAME_UC = $(NAME) -## Installation prefix. Defaults to "/usr". -PREFIX = /usr -BINDIR = $(PREFIX)/bin -LIBDIR = $(PREFIX)/lib -GOLIBDIR = $(LIBDIR)/go -INCLUDEDIR = $(PREFIX)/include -SRCDIR = $(PREFIX)/src/$(NAME) -SHAREDIR = $(PREFIX)/share -LOCALEDIR = $(SHAREDIR)/locale -MANDIR = $(SHAREDIR)/man -EXEC = ./ -## Where to store the installation. Empty by default. -DESTDIR = -LDLIBS = --static -lsqlite3 -lm -GOCFLAGS = -I $(GOLIBDIR) -GOLDFLAGS = -L $(GOLIBDIR) -N = `nproc` - - - -.SUFFIXES: -.SUFFIXES: .go .a .bin .bin-check .adoc .po .mo - -.go.a: - go tool compile -I $(@D) $(GOCFLAGS) -o $@ -p $(*F) \ - `find $< $$(if [ $(*F) != main ]; then \ - echo src/$(NAME).go src/meta.go; fi) | uniq` - -.a.bin: - go tool link -L $(@D) $(GOLDFLAGS) -o $@ --extldflags '$(LDLIBS)' $< - -.adoc: - asciidoctor -b manpage -o $@ $< - -.po.mo: - msgfmt -cfv -o $@ $< - - - -all: -include deps.mk - - -libs.a = $(libs.go:.go=.a) -mains.a = $(mains.go:.go=.a) -mains.bin = $(mains.go:.go=.bin) -functional/lib.a = $(functional/lib.go:.go=.a) -fuzz/lib.a = $(fuzz/lib.go:.go=.a) -benchmarks/lib.a = $(benchmarks/lib.go:.go=.a) -manpages.N.adoc = $(manpages.en.N.adoc) $(manpages.XX.N.adoc) -manpages.N = $(manpages.N.adoc:.adoc=) -sources.mo = $(sources.po:.po=.mo) - -sources = \ - src/$(NAME).go \ - src/meta.go \ - src/main.go \ - - -derived-assets = \ - src/meta.go \ - $(libs.a) \ - $(mains.a) \ - $(mains.bin) \ - $(NAME).bin \ - $(manpages.XX.N.adoc) \ - $(manpages.N) \ - $(sources.mo) \ - -side-assets = \ - $(NAME).db* \ - tests/functional/*/*.go.db* \ - tests/fuzz/corpus/ \ - tests/benchmarks/*/main.txt \ - - - -## Default target. Builds all artifacts required for testing -## and installation. -all: $(derived-assets) - - -$(libs.a): Makefile deps.mk -$(libs.a): src/$(NAME).go src/meta.go - - -$(fuzz/lib.a): - go tool compile $(GOCFLAGS) -o $@ -p $(NAME) -d=libfuzzer \ - $*.go src/$(NAME).go src/meta.go - -src/meta.go: Makefile - echo 'package $(NAME)' > $@ - echo 'const Version = "$(VERSION)"' >> $@ - echo 'const Name = "$(NAME)"' >> $@ - echo 'const LOCALEDIR = "$(LOCALEDIR)"' >> $@ - -$(NAME).bin: src/main.bin - ln -fs src/main.bin $@ - -$(manpages.XX.N.adoc): po/doc/po4a.cfg - po4a --no-update --translate-only $@ po/doc/po4a.cfg - - -.PRECIOUS: tests/queries.sql -tests/queries.sql: tests/main.bin ALWAYS - env TESTING_DUMP_SQL_QUERIES=1 $(EXEC)tests/main.bin | diff -U10 $@ - - -tests.bin-check = \ - tests/main.bin-check \ - $(functional/main.go:.go=.bin-check) \ - -$(tests.bin-check): - $(EXEC)$*.bin - -check-unit: $(tests.bin-check) -check-unit: tests/queries.sql - - -integration-tests = \ - tests/cli-opts.sh \ - tests/integration.sh \ - -.PRECIOUS: $(integration-tests) -$(integration-tests): $(NAME).bin -$(integration-tests): ALWAYS - sh $@ - -check-integration: $(integration-tests) -check-integration: fuzz - - -## Run all tests. Each test suite is isolated, so that a parallel -## build can run tests at the same time. The required artifacts -## are created if missing. -check: check-unit check-integration - - - -FUZZSEC=1 -fuzz/main.bin-check = $(fuzz/main.go:.go=.bin-check) -$(fuzz/main.bin-check): - $(EXEC)$*.bin --test.fuzztime=$(FUZZSEC)s --test.parallel=$N \ - --test.fuzz='.*' --test.fuzzcachedir=tests/fuzz/corpus - -fuzz: $(fuzz/main.bin-check) - - - -benchmarks/main.bin-check = $(benchmarks/main.go:.go=.bin-check) -$(benchmarks/main.bin-check): - printf '%s\n' '$(EXEC)$*.bin' > $*.txt - LANG=POSIX.UTF-8 time -p $(EXEC)$*.bin 2>> $*.txt - printf '%s\n' '$*.txt' - -bench: $(benchmarks/main.bin-check) - - - -i18n-doc: - po4a po/doc/po4a.cfg - -i18n-code: - gotext src/$(NAME).go > po/$(NAME)/$(NAME).pot - po4a po/$(NAME)/po4a.cfg - -i18n: i18n-doc i18n-code - - - -## Remove *all* derived artifacts produced during the build. -## A dedicated test asserts that this is always true. -clean: - rm -rf $(derived-assets) $(side-assets) - - -## Installs into $(DESTDIR)$(PREFIX). Its dependency target -## ensures that all installable artifacts are crafted beforehand. -install: all - mkdir -p \ - '$(DESTDIR)$(BINDIR)' \ - '$(DESTDIR)$(GOLIBDIR)' \ - '$(DESTDIR)$(SRCDIR)' \ - - cp $(NAME).bin '$(DESTDIR)$(BINDIR)'/$(NAME) - cp src/$(NAME).a '$(DESTDIR)$(GOLIBDIR)' - cp $(sources) '$(DESTDIR)$(SRCDIR)' - instool '$(DESTDIR)$(MANDIR)' install man $(manpages.N) - instool '$(DESTDIR)$(LOCALEDIR)' install mo $(sources.mo) - -## Uninstalls from $(DESTDIR)$(PREFIX). This is a perfect mirror -## of the "install" target, and removes *all* that was installed. -## A dedicated test asserts that this is always true. -uninstall: - rm -rf \ - '$(DESTDIR)$(BINDIR)'/$(NAME) \ - '$(DESTDIR)$(GOLIBDIR)'/$(NAME).a \ - '$(DESTDIR)$(SRCDIR)' \ - - instool '$(DESTDIR)$(MANDIR)' uninstall man $(manpages.N) - instool '$(DESTDIR)$(LOCALEDIR)' uninstall mo $(sources.mo) - - - -ALWAYS: diff --git a/deps.mk b/deps.mk deleted file mode 100644 index 12012ff..0000000 --- a/deps.mk +++ /dev/null @@ -1,281 +0,0 @@ -libs.go = \ - src/fiinha.go \ - tests/benchmarks/deadletters/fiinha.go \ - tests/benchmarks/lookup/fiinha.go \ - tests/benchmarks/multiple-consumers/fiinha.go \ - tests/benchmarks/multiple-produces/fiinha.go \ - tests/benchmarks/reaper/fiinha.go \ - tests/benchmarks/replay/fiinha.go \ - tests/benchmarks/single-consumer/fiinha.go \ - tests/benchmarks/single-producer/fiinha.go \ - tests/benchmarks/subscribe/fiinha.go \ - tests/benchmarks/unsubscribe/fiinha.go \ - tests/benchmarks/waiter/fiinha.go \ - tests/fiinha.go \ - tests/functional/consume-one-produce-many/fiinha.go \ - tests/functional/consumer-with-deadletter/fiinha.go \ - tests/functional/custom-prefix/fiinha.go \ - tests/functional/distinct-consumers-separate-instances/fiinha.go \ - tests/functional/flow-id/fiinha.go \ - tests/functional/idempotency/fiinha.go \ - tests/functional/new-instance-takeover/fiinha.go \ - tests/functional/wait-after-publish/fiinha.go \ - tests/functional/waiter/fiinha.go \ - tests/fuzz/api-check/fiinha.go \ - tests/fuzz/cli-check/fiinha.go \ - tests/fuzz/equal-produced-consumed-order-check/fiinha.go \ - tests/fuzz/exactly-once-check/fiinha.go \ - tests/fuzz/queries-check/fiinha.go \ - tests/fuzz/total-order-check/fiinha.go \ - -mains.go = \ - src/main.go \ - tests/benchmarks/deadletters/main.go \ - tests/benchmarks/lookup/main.go \ - tests/benchmarks/multiple-consumers/main.go \ - tests/benchmarks/multiple-produces/main.go \ - tests/benchmarks/reaper/main.go \ - tests/benchmarks/replay/main.go \ - tests/benchmarks/single-consumer/main.go \ - tests/benchmarks/single-producer/main.go \ - tests/benchmarks/subscribe/main.go \ - tests/benchmarks/unsubscribe/main.go \ - tests/benchmarks/waiter/main.go \ - tests/functional/consume-one-produce-many/main.go \ - tests/functional/consumer-with-deadletter/main.go \ - tests/functional/custom-prefix/main.go \ - tests/functional/distinct-consumers-separate-instances/main.go \ - tests/functional/flow-id/main.go \ - tests/functional/idempotency/main.go \ - tests/functional/new-instance-takeover/main.go \ - tests/functional/wait-after-publish/main.go \ - tests/functional/waiter/main.go \ - tests/fuzz/api-check/main.go \ - tests/fuzz/cli-check/main.go \ - tests/fuzz/equal-produced-consumed-order-check/main.go \ - tests/fuzz/exactly-once-check/main.go \ - tests/fuzz/queries-check/main.go \ - tests/fuzz/total-order-check/main.go \ - tests/main.go \ - -manpages.en.N.adoc = \ - doc/fiinha.en.0.adoc \ - -manpages.XX.N.adoc = \ - doc/fiinha.de.0.adoc \ - doc/fiinha.eo.0.adoc \ - doc/fiinha.es.0.adoc \ - doc/fiinha.fr.0.adoc \ - doc/fiinha.pt.0.adoc \ - -sources.po = \ - po/fiinha/de.po \ - po/fiinha/eo.po \ - po/fiinha/es.po \ - po/fiinha/fr.po \ - po/fiinha/pt.po \ - -functional/lib.go = \ - tests/functional/consume-one-produce-many/fiinha.go \ - tests/functional/consumer-with-deadletter/fiinha.go \ - tests/functional/custom-prefix/fiinha.go \ - tests/functional/distinct-consumers-separate-instances/fiinha.go \ - tests/functional/flow-id/fiinha.go \ - tests/functional/idempotency/fiinha.go \ - tests/functional/new-instance-takeover/fiinha.go \ - tests/functional/wait-after-publish/fiinha.go \ - tests/functional/waiter/fiinha.go \ - -functional/main.go = \ - tests/functional/consume-one-produce-many/main.go \ - tests/functional/consumer-with-deadletter/main.go \ - tests/functional/custom-prefix/main.go \ - tests/functional/distinct-consumers-separate-instances/main.go \ - tests/functional/flow-id/main.go \ - tests/functional/idempotency/main.go \ - tests/functional/new-instance-takeover/main.go \ - tests/functional/wait-after-publish/main.go \ - tests/functional/waiter/main.go \ - -fuzz/lib.go = \ - tests/fuzz/api-check/fiinha.go \ - tests/fuzz/cli-check/fiinha.go \ - tests/fuzz/equal-produced-consumed-order-check/fiinha.go \ - tests/fuzz/exactly-once-check/fiinha.go \ - tests/fuzz/queries-check/fiinha.go \ - tests/fuzz/total-order-check/fiinha.go \ - -fuzz/main.go = \ - tests/fuzz/api-check/main.go \ - tests/fuzz/cli-check/main.go \ - tests/fuzz/equal-produced-consumed-order-check/main.go \ - tests/fuzz/exactly-once-check/main.go \ - tests/fuzz/queries-check/main.go \ - tests/fuzz/total-order-check/main.go \ - -benchmarks/lib.go = \ - tests/benchmarks/deadletters/fiinha.go \ - tests/benchmarks/lookup/fiinha.go \ - tests/benchmarks/multiple-consumers/fiinha.go \ - tests/benchmarks/multiple-produces/fiinha.go \ - tests/benchmarks/reaper/fiinha.go \ - tests/benchmarks/replay/fiinha.go \ - tests/benchmarks/single-consumer/fiinha.go \ - tests/benchmarks/single-producer/fiinha.go \ - tests/benchmarks/subscribe/fiinha.go \ - tests/benchmarks/unsubscribe/fiinha.go \ - tests/benchmarks/waiter/fiinha.go \ - -benchmarks/main.go = \ - tests/benchmarks/deadletters/main.go \ - tests/benchmarks/lookup/main.go \ - tests/benchmarks/multiple-consumers/main.go \ - tests/benchmarks/multiple-produces/main.go \ - tests/benchmarks/reaper/main.go \ - tests/benchmarks/replay/main.go \ - tests/benchmarks/single-consumer/main.go \ - tests/benchmarks/single-producer/main.go \ - tests/benchmarks/subscribe/main.go \ - tests/benchmarks/unsubscribe/main.go \ - tests/benchmarks/waiter/main.go \ - -src/fiinha.a: src/fiinha.go -src/main.a: src/main.go -tests/benchmarks/deadletters/fiinha.a: tests/benchmarks/deadletters/fiinha.go -tests/benchmarks/deadletters/main.a: tests/benchmarks/deadletters/main.go -tests/benchmarks/lookup/fiinha.a: tests/benchmarks/lookup/fiinha.go -tests/benchmarks/lookup/main.a: tests/benchmarks/lookup/main.go -tests/benchmarks/multiple-consumers/fiinha.a: tests/benchmarks/multiple-consumers/fiinha.go -tests/benchmarks/multiple-consumers/main.a: tests/benchmarks/multiple-consumers/main.go -tests/benchmarks/multiple-produces/fiinha.a: tests/benchmarks/multiple-produces/fiinha.go -tests/benchmarks/multiple-produces/main.a: tests/benchmarks/multiple-produces/main.go -tests/benchmarks/reaper/fiinha.a: tests/benchmarks/reaper/fiinha.go -tests/benchmarks/reaper/main.a: tests/benchmarks/reaper/main.go -tests/benchmarks/replay/fiinha.a: tests/benchmarks/replay/fiinha.go -tests/benchmarks/replay/main.a: tests/benchmarks/replay/main.go -tests/benchmarks/single-consumer/fiinha.a: tests/benchmarks/single-consumer/fiinha.go -tests/benchmarks/single-consumer/main.a: tests/benchmarks/single-consumer/main.go -tests/benchmarks/single-producer/fiinha.a: tests/benchmarks/single-producer/fiinha.go -tests/benchmarks/single-producer/main.a: tests/benchmarks/single-producer/main.go -tests/benchmarks/subscribe/fiinha.a: tests/benchmarks/subscribe/fiinha.go -tests/benchmarks/subscribe/main.a: tests/benchmarks/subscribe/main.go -tests/benchmarks/unsubscribe/fiinha.a: tests/benchmarks/unsubscribe/fiinha.go -tests/benchmarks/unsubscribe/main.a: tests/benchmarks/unsubscribe/main.go -tests/benchmarks/waiter/fiinha.a: tests/benchmarks/waiter/fiinha.go -tests/benchmarks/waiter/main.a: tests/benchmarks/waiter/main.go -tests/fiinha.a: tests/fiinha.go -tests/functional/consume-one-produce-many/fiinha.a: tests/functional/consume-one-produce-many/fiinha.go -tests/functional/consume-one-produce-many/main.a: tests/functional/consume-one-produce-many/main.go -tests/functional/consumer-with-deadletter/fiinha.a: tests/functional/consumer-with-deadletter/fiinha.go -tests/functional/consumer-with-deadletter/main.a: tests/functional/consumer-with-deadletter/main.go -tests/functional/custom-prefix/fiinha.a: tests/functional/custom-prefix/fiinha.go -tests/functional/custom-prefix/main.a: tests/functional/custom-prefix/main.go -tests/functional/distinct-consumers-separate-instances/fiinha.a: tests/functional/distinct-consumers-separate-instances/fiinha.go -tests/functional/distinct-consumers-separate-instances/main.a: tests/functional/distinct-consumers-separate-instances/main.go -tests/functional/flow-id/fiinha.a: tests/functional/flow-id/fiinha.go -tests/functional/flow-id/main.a: tests/functional/flow-id/main.go -tests/functional/idempotency/fiinha.a: tests/functional/idempotency/fiinha.go -tests/functional/idempotency/main.a: tests/functional/idempotency/main.go -tests/functional/new-instance-takeover/fiinha.a: tests/functional/new-instance-takeover/fiinha.go -tests/functional/new-instance-takeover/main.a: tests/functional/new-instance-takeover/main.go -tests/functional/wait-after-publish/fiinha.a: tests/functional/wait-after-publish/fiinha.go -tests/functional/wait-after-publish/main.a: tests/functional/wait-after-publish/main.go -tests/functional/waiter/fiinha.a: tests/functional/waiter/fiinha.go -tests/functional/waiter/main.a: tests/functional/waiter/main.go -tests/fuzz/api-check/fiinha.a: tests/fuzz/api-check/fiinha.go -tests/fuzz/api-check/main.a: tests/fuzz/api-check/main.go -tests/fuzz/cli-check/fiinha.a: tests/fuzz/cli-check/fiinha.go -tests/fuzz/cli-check/main.a: tests/fuzz/cli-check/main.go -tests/fuzz/equal-produced-consumed-order-check/fiinha.a: tests/fuzz/equal-produced-consumed-order-check/fiinha.go -tests/fuzz/equal-produced-consumed-order-check/main.a: tests/fuzz/equal-produced-consumed-order-check/main.go -tests/fuzz/exactly-once-check/fiinha.a: tests/fuzz/exactly-once-check/fiinha.go -tests/fuzz/exactly-once-check/main.a: tests/fuzz/exactly-once-check/main.go -tests/fuzz/queries-check/fiinha.a: tests/fuzz/queries-check/fiinha.go -tests/fuzz/queries-check/main.a: tests/fuzz/queries-check/main.go -tests/fuzz/total-order-check/fiinha.a: tests/fuzz/total-order-check/fiinha.go -tests/fuzz/total-order-check/main.a: tests/fuzz/total-order-check/main.go -tests/main.a: tests/main.go -src/main.bin: src/main.a -tests/benchmarks/deadletters/main.bin: tests/benchmarks/deadletters/main.a -tests/benchmarks/lookup/main.bin: tests/benchmarks/lookup/main.a -tests/benchmarks/multiple-consumers/main.bin: tests/benchmarks/multiple-consumers/main.a -tests/benchmarks/multiple-produces/main.bin: tests/benchmarks/multiple-produces/main.a -tests/benchmarks/reaper/main.bin: tests/benchmarks/reaper/main.a -tests/benchmarks/replay/main.bin: tests/benchmarks/replay/main.a -tests/benchmarks/single-consumer/main.bin: tests/benchmarks/single-consumer/main.a -tests/benchmarks/single-producer/main.bin: tests/benchmarks/single-producer/main.a -tests/benchmarks/subscribe/main.bin: tests/benchmarks/subscribe/main.a -tests/benchmarks/unsubscribe/main.bin: tests/benchmarks/unsubscribe/main.a -tests/benchmarks/waiter/main.bin: tests/benchmarks/waiter/main.a -tests/functional/consume-one-produce-many/main.bin: tests/functional/consume-one-produce-many/main.a -tests/functional/consumer-with-deadletter/main.bin: tests/functional/consumer-with-deadletter/main.a -tests/functional/custom-prefix/main.bin: tests/functional/custom-prefix/main.a -tests/functional/distinct-consumers-separate-instances/main.bin: tests/functional/distinct-consumers-separate-instances/main.a -tests/functional/flow-id/main.bin: tests/functional/flow-id/main.a -tests/functional/idempotency/main.bin: tests/functional/idempotency/main.a -tests/functional/new-instance-takeover/main.bin: tests/functional/new-instance-takeover/main.a -tests/functional/wait-after-publish/main.bin: tests/functional/wait-after-publish/main.a -tests/functional/waiter/main.bin: tests/functional/waiter/main.a -tests/fuzz/api-check/main.bin: tests/fuzz/api-check/main.a -tests/fuzz/cli-check/main.bin: tests/fuzz/cli-check/main.a -tests/fuzz/equal-produced-consumed-order-check/main.bin: tests/fuzz/equal-produced-consumed-order-check/main.a -tests/fuzz/exactly-once-check/main.bin: tests/fuzz/exactly-once-check/main.a -tests/fuzz/queries-check/main.bin: tests/fuzz/queries-check/main.a -tests/fuzz/total-order-check/main.bin: tests/fuzz/total-order-check/main.a -tests/main.bin: tests/main.a -src/main.bin-check: src/main.bin -tests/benchmarks/deadletters/main.bin-check: tests/benchmarks/deadletters/main.bin -tests/benchmarks/lookup/main.bin-check: tests/benchmarks/lookup/main.bin -tests/benchmarks/multiple-consumers/main.bin-check: tests/benchmarks/multiple-consumers/main.bin -tests/benchmarks/multiple-produces/main.bin-check: tests/benchmarks/multiple-produces/main.bin -tests/benchmarks/reaper/main.bin-check: tests/benchmarks/reaper/main.bin -tests/benchmarks/replay/main.bin-check: tests/benchmarks/replay/main.bin -tests/benchmarks/single-consumer/main.bin-check: tests/benchmarks/single-consumer/main.bin -tests/benchmarks/single-producer/main.bin-check: tests/benchmarks/single-producer/main.bin -tests/benchmarks/subscribe/main.bin-check: tests/benchmarks/subscribe/main.bin -tests/benchmarks/unsubscribe/main.bin-check: tests/benchmarks/unsubscribe/main.bin -tests/benchmarks/waiter/main.bin-check: tests/benchmarks/waiter/main.bin -tests/functional/consume-one-produce-many/main.bin-check: tests/functional/consume-one-produce-many/main.bin -tests/functional/consumer-with-deadletter/main.bin-check: tests/functional/consumer-with-deadletter/main.bin -tests/functional/custom-prefix/main.bin-check: tests/functional/custom-prefix/main.bin -tests/functional/distinct-consumers-separate-instances/main.bin-check: tests/functional/distinct-consumers-separate-instances/main.bin -tests/functional/flow-id/main.bin-check: tests/functional/flow-id/main.bin -tests/functional/idempotency/main.bin-check: tests/functional/idempotency/main.bin -tests/functional/new-instance-takeover/main.bin-check: tests/functional/new-instance-takeover/main.bin -tests/functional/wait-after-publish/main.bin-check: tests/functional/wait-after-publish/main.bin -tests/functional/waiter/main.bin-check: tests/functional/waiter/main.bin -tests/fuzz/api-check/main.bin-check: tests/fuzz/api-check/main.bin -tests/fuzz/cli-check/main.bin-check: tests/fuzz/cli-check/main.bin -tests/fuzz/equal-produced-consumed-order-check/main.bin-check: tests/fuzz/equal-produced-consumed-order-check/main.bin -tests/fuzz/exactly-once-check/main.bin-check: tests/fuzz/exactly-once-check/main.bin -tests/fuzz/queries-check/main.bin-check: tests/fuzz/queries-check/main.bin -tests/fuzz/total-order-check/main.bin-check: tests/fuzz/total-order-check/main.bin -tests/main.bin-check: tests/main.bin -src/main.a: src/$(NAME).a -tests/benchmarks/deadletters/main.a: tests/benchmarks/deadletters/$(NAME).a -tests/benchmarks/lookup/main.a: tests/benchmarks/lookup/$(NAME).a -tests/benchmarks/multiple-consumers/main.a: tests/benchmarks/multiple-consumers/$(NAME).a -tests/benchmarks/multiple-produces/main.a: tests/benchmarks/multiple-produces/$(NAME).a -tests/benchmarks/reaper/main.a: tests/benchmarks/reaper/$(NAME).a -tests/benchmarks/replay/main.a: tests/benchmarks/replay/$(NAME).a -tests/benchmarks/single-consumer/main.a: tests/benchmarks/single-consumer/$(NAME).a -tests/benchmarks/single-producer/main.a: tests/benchmarks/single-producer/$(NAME).a -tests/benchmarks/subscribe/main.a: tests/benchmarks/subscribe/$(NAME).a -tests/benchmarks/unsubscribe/main.a: tests/benchmarks/unsubscribe/$(NAME).a -tests/benchmarks/waiter/main.a: tests/benchmarks/waiter/$(NAME).a -tests/functional/consume-one-produce-many/main.a: tests/functional/consume-one-produce-many/$(NAME).a -tests/functional/consumer-with-deadletter/main.a: tests/functional/consumer-with-deadletter/$(NAME).a -tests/functional/custom-prefix/main.a: tests/functional/custom-prefix/$(NAME).a -tests/functional/distinct-consumers-separate-instances/main.a: tests/functional/distinct-consumers-separate-instances/$(NAME).a -tests/functional/flow-id/main.a: tests/functional/flow-id/$(NAME).a -tests/functional/idempotency/main.a: tests/functional/idempotency/$(NAME).a -tests/functional/new-instance-takeover/main.a: tests/functional/new-instance-takeover/$(NAME).a -tests/functional/wait-after-publish/main.a: tests/functional/wait-after-publish/$(NAME).a -tests/functional/waiter/main.a: tests/functional/waiter/$(NAME).a -tests/fuzz/api-check/main.a: tests/fuzz/api-check/$(NAME).a -tests/fuzz/cli-check/main.a: tests/fuzz/cli-check/$(NAME).a -tests/fuzz/equal-produced-consumed-order-check/main.a: tests/fuzz/equal-produced-consumed-order-check/$(NAME).a -tests/fuzz/exactly-once-check/main.a: tests/fuzz/exactly-once-check/$(NAME).a -tests/fuzz/queries-check/main.a: tests/fuzz/queries-check/$(NAME).a -tests/fuzz/total-order-check/main.a: tests/fuzz/total-order-check/$(NAME).a -tests/main.a: tests/$(NAME).a diff --git a/doc/fiinha.en.0.adoc b/doc/fiinha.en.0.adoc deleted file mode 100644 index 2c4d896..0000000 --- a/doc/fiinha.en.0.adoc +++ /dev/null @@ -1,5 +0,0 @@ -= fiinha(0) - -== NAME - -fiinha - . diff --git a/meta.capim b/meta.capim deleted file mode 100644 index 0288ec1..0000000 --- a/meta.capim +++ /dev/null @@ -1,9 +0,0 @@ -{ - :dependencies { - :build #{ - gobang - golite - gotext - } - } -} diff --git a/mkdeps.sh b/mkdeps.sh deleted file mode 100755 index ae6fffc..0000000 --- a/mkdeps.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/sh -set -eu - -export LANG=POSIX.UTF-8 - - -libs() { - find src tests -name '*.go' | - grep -Ev '/(main|meta)\.go$' | - grep -Ev '/_cgo_(import|gotypes)\.go$' | - grep -Ev '\.cgo1\.go$' -} - -mains() { - find src tests -name '*.go' | grep '/main\.go$' -} - -docs() { - find doc/*.en.*.adoc -} - -xdocs() { - for l in `find po/doc/*.po | xargs -I% basename % .po`; do - docs | sed 's|/\(.*\)\.en\.\(.*\)$|/\1.'"$l"'.\2|' - done -} - -pos() { - find po/ -name '*.po' | grep -Ev '^po/(doc|tests)/' -} - - -libs | varlist 'libs.go' -mains | varlist 'mains.go' -docs | varlist 'manpages.en.N.adoc' -xdocs | varlist 'manpages.XX.N.adoc' -pos | varlist 'sources.po' - -find tests/functional/*/*.go -not -name main.go | varlist 'functional/lib.go' -find tests/functional/*/main.go | varlist 'functional/main.go' -find tests/fuzz/*/*.go -not -name main.go | varlist 'fuzz/lib.go' -find tests/fuzz/*/main.go | varlist 'fuzz/main.go' -find tests/benchmarks/*/*.go -not -name main.go | varlist 'benchmarks/lib.go' -find tests/benchmarks/*/main.go | varlist 'benchmarks/main.go' - -{ libs; mains; } | sort | sed 's/^\(.*\)\.go$/\1.a:\t\1.go/' -mains | sort | sed 's/^\(.*\)\.go$/\1.bin:\t\1.a/' -mains | sort | sed 's/^\(.*\)\.go$/\1.bin-check:\t\1.bin/' -mains | sort | sed 's|^\(.*\)/main\.go$|\1/main.a:\t\1/$(NAME).a|' diff --git a/po/doc/de.po b/po/doc/de.po deleted file mode 100644 index fd6472f..0000000 --- a/po/doc/de.po +++ /dev/null @@ -1,22 +0,0 @@ -msgid "" -msgstr "" -"Language: de\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n != 1);\n" - -#. type: Title = -#: doc/fiinha.en.0.adoc:1 -#, no-wrap -msgid "fiinha(0)" -msgstr "" - -#. type: Title == -#: doc/fiinha.en.0.adoc:3 -#, no-wrap -msgid "NAME" -msgstr "" - -#. type: Plain text -#: doc/fiinha.en.0.adoc:5 -msgid "fiinha - ." -msgstr "" diff --git a/po/doc/doc.pot b/po/doc/doc.pot deleted file mode 100644 index 5bc8d37..0000000 --- a/po/doc/doc.pot +++ /dev/null @@ -1,22 +0,0 @@ -#, fuzzy -msgid "" -msgstr "" -"Language: \n" -"Content-Type: text/plain; charset=UTF-8\n" - -#. type: Title = -#: doc/fiinha.en.0.adoc:1 -#, no-wrap -msgid "fiinha(0)" -msgstr "" - -#. type: Title == -#: doc/fiinha.en.0.adoc:3 -#, no-wrap -msgid "NAME" -msgstr "" - -#. type: Plain text -#: doc/fiinha.en.0.adoc:5 -msgid "fiinha - ." -msgstr "" diff --git a/po/doc/eo.po b/po/doc/eo.po deleted file mode 100644 index e9e1f2f..0000000 --- a/po/doc/eo.po +++ /dev/null @@ -1,22 +0,0 @@ -msgid "" -msgstr "" -"Language: eo\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n != 1);\n" - -#. type: Title = -#: doc/fiinha.en.0.adoc:1 -#, no-wrap -msgid "fiinha(0)" -msgstr "" - -#. type: Title == -#: doc/fiinha.en.0.adoc:3 -#, no-wrap -msgid "NAME" -msgstr "" - -#. type: Plain text -#: doc/fiinha.en.0.adoc:5 -msgid "fiinha - ." -msgstr "" diff --git a/po/doc/es.po b/po/doc/es.po deleted file mode 100644 index cf72172..0000000 --- a/po/doc/es.po +++ /dev/null @@ -1,22 +0,0 @@ -msgid "" -msgstr "" -"Language: es\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n != 1);\n" - -#. type: Title = -#: doc/fiinha.en.0.adoc:1 -#, no-wrap -msgid "fiinha(0)" -msgstr "" - -#. type: Title == -#: doc/fiinha.en.0.adoc:3 -#, no-wrap -msgid "NAME" -msgstr "" - -#. type: Plain text -#: doc/fiinha.en.0.adoc:5 -msgid "fiinha - ." -msgstr "" diff --git a/po/doc/fr.po b/po/doc/fr.po deleted file mode 100644 index 0a2ca5a..0000000 --- a/po/doc/fr.po +++ /dev/null @@ -1,22 +0,0 @@ -msgid "" -msgstr "" -"Language: fr\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n > 1);\n" - -#. type: Title = -#: doc/fiinha.en.0.adoc:1 -#, no-wrap -msgid "fiinha(0)" -msgstr "" - -#. type: Title == -#: doc/fiinha.en.0.adoc:3 -#, no-wrap -msgid "NAME" -msgstr "" - -#. type: Plain text -#: doc/fiinha.en.0.adoc:5 -msgid "fiinha - ." -msgstr "" diff --git a/po/doc/note.txt b/po/doc/note.txt deleted file mode 100644 index 45279a4..0000000 --- a/po/doc/note.txt +++ /dev/null @@ -1,5 +0,0 @@ -PO4A-HEADER: mode=eof - - - -// Generated from po4a(1). diff --git a/po/doc/po4a.cfg b/po/doc/po4a.cfg deleted file mode 100644 index aaf1cb1..0000000 --- a/po/doc/po4a.cfg +++ /dev/null @@ -1,5 +0,0 @@ -[options] --keep 0 --master-charset UTF-8 --localized-charset UTF-8 --addendum-charset UTF-8 - -[po_directory] po/doc - -[type: asciidoc] doc/fiinha.en.0.adoc $lang:doc/fiinha.$lang.0.adoc diff --git a/po/doc/pt.po b/po/doc/pt.po deleted file mode 100644 index 65a0b0e..0000000 --- a/po/doc/pt.po +++ /dev/null @@ -1,22 +0,0 @@ -msgid "" -msgstr "" -"Language: pt\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n != 1);\n" - -#. type: Title = -#: doc/fiinha.en.0.adoc:1 -#, no-wrap -msgid "fiinha(0)" -msgstr "" - -#. type: Title == -#: doc/fiinha.en.0.adoc:3 -#, no-wrap -msgid "NAME" -msgstr "" - -#. type: Plain text -#: doc/fiinha.en.0.adoc:5 -msgid "fiinha - ." -msgstr "" diff --git a/po/fiinha/de.po b/po/fiinha/de.po deleted file mode 100644 index 947f71d..0000000 --- a/po/fiinha/de.po +++ /dev/null @@ -1,5 +0,0 @@ -msgid "" -msgstr "" -"Language: de\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n != 1);\n" diff --git a/po/fiinha/eo.po b/po/fiinha/eo.po deleted file mode 100644 index 444f1ed..0000000 --- a/po/fiinha/eo.po +++ /dev/null @@ -1,5 +0,0 @@ -msgid "" -msgstr "" -"Language: eo\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n != 1);\n" diff --git a/po/fiinha/es.po b/po/fiinha/es.po deleted file mode 100644 index 9288fad..0000000 --- a/po/fiinha/es.po +++ /dev/null @@ -1,5 +0,0 @@ -msgid "" -msgstr "" -"Language: es\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n != 1);\n" diff --git a/po/fiinha/fiinha.pot b/po/fiinha/fiinha.pot deleted file mode 100644 index c9591aa..0000000 --- a/po/fiinha/fiinha.pot +++ /dev/null @@ -1,5 +0,0 @@ -msgid "" -msgstr "" -"Language: \n" -"Content-Type: text/plain; charset=UTF-8\n" - diff --git a/po/fiinha/fr.po b/po/fiinha/fr.po deleted file mode 100644 index 6a18324..0000000 --- a/po/fiinha/fr.po +++ /dev/null @@ -1,5 +0,0 @@ -msgid "" -msgstr "" -"Language: fr\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n > 1);\n" diff --git a/po/fiinha/po4a.cfg b/po/fiinha/po4a.cfg deleted file mode 100644 index a9de01a..0000000 --- a/po/fiinha/po4a.cfg +++ /dev/null @@ -1,3 +0,0 @@ -[options] --keep 0 --master-charset UTF-8 --localized-charset UTF-8 --addendum-charset UTF-8 - -[po_directory] po/fiinha diff --git a/po/fiinha/pt.po b/po/fiinha/pt.po deleted file mode 100644 index 22c6066..0000000 --- a/po/fiinha/pt.po +++ /dev/null @@ -1,5 +0,0 @@ -msgid "" -msgstr "" -"Language: pt\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Plural-Forms: nplurals=2; plural=(n != 1);\n" diff --git a/src/fiinha.go b/src/fiinha.go deleted file mode 100644 index a819557..0000000 --- a/src/fiinha.go +++ /dev/null @@ -1,2517 +0,0 @@ -package fiinha - -import ( - "database/sql" - "flag" - "fmt" - "io" - "log/slog" - "os" - "sync" - "time" - - "golite" - "uuid" - g "gobang" -) - - - -const ( - defaultPrefix = "fiinha" - reaperSkipCount = 1000 - notOwnerErrorFmt = "%v owns %#v as %#v, not us (%v)" - rollbackErrorFmt = "rollback error: %w; while executing: %w" -) - - - -type dbconfigT struct{ - shared *sql.DB - dbpath string - prefix string - instanceID int -} - -type queryT struct{ - write string - read string - owner string -} - -type queriesT struct{ - take func(string, string) error - publish func(UnsentMessage, uuid.UUID) (messageT, error) - find func(string, uuid.UUID) (messageT, error) - next func(string, string) (messageT, error) - pending func(string, string, func(messageT) error) error - commit func(string, uuid.UUID) error - toDead func(string, uuid.UUID, uuid.UUID) error - replay func(uuid.UUID, uuid.UUID) (messageT, error) - oneDead func(string, string) (deadletterT, error) - allDead func(string, string, func(deadletterT, messageT) error) error - size func(string) (int, error) - count func(string, string) (int, error) - hasData func(string, string) (bool, error) - close func() error -} - -type messageT struct{ - id int64 - timestamp time.Time - uuid uuid.UUID - topic string - flowID uuid.UUID - payload []byte -} - -type UnsentMessage struct{ - Topic string - FlowID uuid.UUID - Payload []byte -} - -type Message struct{ - ID uuid.UUID - Timestamp time.Time - Topic string - FlowID uuid.UUID - Payload []byte -} - -type deadletterT struct{ - uuid uuid.UUID - timestamp time.Time - consumer string - messageID uuid.UUID -} - -type pingerT[T any] struct{ - tryPing func(T) - onPing func(func(T)) - closed func() bool - close func() -} - -type consumerDataT struct{ - topic string - name string -} - -type waiterDataT struct{ - topic string - flowID uuid.UUID - name string - -} - -type consumerT struct{ - data consumerDataT - callback func(Message) error - pinger pingerT[struct{}] - close *func() -} - -type waiterT struct{ - data waiterDataT - pinger pingerT[[]byte] - closed *func() bool - close *func() -} - -type topicSubscriptionT struct{ - consumers map[string]consumerT - waiters map[uuid.UUID]map[string]waiterT -} - -type subscriptionsSetM map[string]topicSubscriptionT - -type subscriptionsT struct { - read func(func(subscriptionsSetM) error) error - write func(func(subscriptionsSetM) error) error -} - -type queueT struct{ - queries queriesT - subscriptions subscriptionsT - pinger pingerT[struct{}] -} - -type argsT struct{ - databasePath string - prefix string - command string - allArgs []string - args []string - topic string - consumer string -} - -type commandT struct{ - name string - getopt func(argsT, io.Writer) (argsT, bool) - exec func(argsT, queriesT, io.Reader, io.Writer) (int, error) -} - -type IQueue interface{ - Publish(UnsentMessage) (Message, error) - Subscribe( string, string, func(Message) error) error - Unsubscribe(string, string) - WaitFor(string, uuid.UUID, string) Waiter - Close() error -} - - - -func tryRollback(tx *sql.Tx, err error) error { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - return fmt.Errorf( - rollbackErrorFmt, - rollbackErr, - err, - ) - } - - return err -} - -func inTx(db *sql.DB, fn func(*sql.Tx) error) error { - tx, err := db.Begin() - if err != nil { - return err - } - - err = fn(tx) - if err != nil { - return tryRollback(tx, err) - } - - err = tx.Commit() - if err != nil { - return tryRollback(tx, err) - } - - return nil -} - -func serialized[A any, B any](callback func(...A) B) (func(...A) B, func()) { - in := make(chan []A) - out := make(chan B) - - closed := false - var ( - closeWg sync.WaitGroup - closeMutex sync.Mutex - ) - closeWg.Add(1) - - go func() { - for input := range in { - out <- callback(input...) - } - close(out) - closeWg.Done() - }() - - fn := func(input ...A) B { - in <- input - return (<- out) - } - - closeFn := func() { - closeMutex.Lock() - defer closeMutex.Unlock() - if closed { - return - } - close(in) - closed = true - closeWg.Wait() - } - - return fn, closeFn -} - -func execSerialized(query string, db *sql.DB) (func(...any) error, func()) { - return serialized(func(args ...any) error { - return inTx(db, func(tx *sql.Tx) error { - _, err := tx.Exec(query, args...) - return err - }) - }) -} - -func createTablesSQL(prefix string) queryT { - const tmpl_write = ` - CREATE TABLE IF NOT EXISTS "%s_payloads" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - topic TEXT NOT NULL, - payload BLOB NOT NULL - ) STRICT; - CREATE INDEX IF NOT EXISTS "%s_payloads_topic" - ON "%s_payloads"(topic); - - CREATE TABLE IF NOT EXISTS "%s_messages" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - uuid BLOB NOT NULL UNIQUE, - flow_id BLOB NOT NULL, - payload_id INTEGER NOT NULL - REFERENCES "%s_payloads"(id) - ) STRICT; - CREATE INDEX IF NOT EXISTS "%s_messages_flow_id" - ON "%s_messages"(flow_id); - - CREATE TABLE IF NOT EXISTS "%s_offsets" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - consumer TEXT NOT NULL, - message_id INTEGER NOT NULL - REFERENCES "%s_messages"(id), - instance_id INTEGER NOT NULL, - UNIQUE (consumer, message_id) - ) STRICT; - CREATE INDEX IF NOT EXISTS "%s_offsets_consumer" - ON "%s_offsets"(consumer); - - CREATE TABLE IF NOT EXISTS "%s_deadletters" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - uuid BLOB NOT NULL UNIQUE, - consumer TEXT NOT NULL, - message_id INTEGER NOT NULL - REFERENCES "%s_messages"(id), - instance_id INTEGER NOT NULL, - UNIQUE (consumer, message_id) - ) STRICT; - CREATE INDEX IF NOT EXISTS "%s_deadletters_consumer" - ON "%s_deadletters"(consumer); - - CREATE TABLE IF NOT EXISTS "%s_replays" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - deadletter_id INTEGER NOT NULL UNIQUE - REFERENCES "%s_deadletters"(id) , - message_id INTEGER NOT NULL UNIQUE - REFERENCES "%s_messages"(id) - ) STRICT; - - CREATE TABLE IF NOT EXISTS "%s_owners" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - topic TEXT NOT NULL, - consumer TEXT NOT NULL, - owner_id INTEGER NOT NULL, - UNIQUE (topic, consumer) - ) STRICT; - - CREATE TRIGGER IF NOT EXISTS "%s_check_instance_owns_topic" - BEFORE INSERT ON "%s_offsets" - WHEN NEW.instance_id != ( - SELECT owner_id FROM "%s_owners" - WHERE topic = ( - SELECT "%s_payloads".topic - FROM "%s_payloads" - JOIN "%s_messages" ON "%s_payloads".id = - "%s_messages".payload_id - WHERE "%s_messages".id = NEW.message_id - ) AND consumer = NEW.consumer - ) - BEGIN - SELECT RAISE( - ABORT, - 'instance does not own topic/consumer combo' - ); - END; - - CREATE TRIGGER IF NOT EXISTS "%s_check_can_publish_deadletter" - BEFORE INSERT ON "%s_deadletters" - WHEN NEW.instance_id != ( - SELECT owner_id FROM "%s_owners" - WHERE topic = ( - SELECT "%s_payloads".topic - FROM "%s_payloads" - JOIN "%s_messages" ON "%s_payloads".id = - "%s_messages".payload_id - WHERE "%s_messages".id = NEW.message_id - ) AND consumer = NEW.consumer - ) - BEGIN - SELECT RAISE( - ABORT, - 'Instance does not own topic/consumer combo' - ); - END; - ` - return queryT{ - write: fmt.Sprintf( - tmpl_write, - prefix, - g.SQLiteNow, - prefix, - prefix, - prefix, - g.SQLiteNow, - prefix, - prefix, - prefix, - prefix, - g.SQLiteNow, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - } -} - -func createTables(db *sql.DB, prefix string) error { - q := createTablesSQL(prefix) - - return inTx(db, func(tx *sql.Tx) error { - _, err := tx.Exec(q.write) - return err - }) -} - -func takeSQL(prefix string) queryT { - const tmpl_write = ` - INSERT INTO "%s_owners" (topic, consumer, owner_id) - VALUES (?, ?, ?) - ON CONFLICT (topic, consumer) DO - UPDATE SET owner_id=excluded.owner_id; - ` - return queryT{ - write: fmt.Sprintf(tmpl_write, prefix), - } -} - -func takeStmt( - cfg dbconfigT, -) (func(string, string) error, func() error, error) { - q := takeSQL(cfg.prefix) - - writeStmt, err := cfg.shared.Prepare(q.write) - if err != nil { - return nil, nil, err - } - - fn := func(topic string, consumer string) error { - _, err := writeStmt.Exec( - topic, - consumer, - cfg.instanceID, - ) - return err - } - - return fn, writeStmt.Close, nil -} - -func publishSQL(prefix string) queryT { - const tmpl_write = ` - INSERT INTO "%s_payloads" (topic, payload) - VALUES (?, ?); - - INSERT INTO "%s_messages" (uuid, flow_id, payload_id) - VALUES (?, ?, last_insert_rowid()); - ` - const tmpl_read = ` - SELECT id, timestamp FROM "%s_messages" - WHERE uuid = ?; - ` - return queryT{ - write: fmt.Sprintf(tmpl_write, prefix, prefix), - read: fmt.Sprintf(tmpl_read, prefix), - } -} - -func publishStmt( - cfg dbconfigT, -) (func(UnsentMessage, uuid.UUID) (messageT, error), func() error, error) { - q := publishSQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - privateDB, err := sql.Open(golite.DriverName, cfg.dbpath) - if err != nil { - readStmt.Close() - return nil, nil, err - } - - writeFn, writeFnClose := execSerialized(q.write, privateDB) - - fn := func( - unsentMessage UnsentMessage, - messageID uuid.UUID, - ) (messageT, error) { - message := messageT{ - uuid: messageID, - topic: unsentMessage.Topic, - flowID: unsentMessage.FlowID, - payload: unsentMessage.Payload, - } - - message_id_bytes := messageID[:] - flow_id_bytes := unsentMessage.FlowID[:] - err := writeFn( - unsentMessage.Topic, - unsentMessage.Payload, - message_id_bytes, - flow_id_bytes, - ) - if err != nil { - return messageT{}, err - } - - var timestr string - err = readStmt.QueryRow(message_id_bytes).Scan( - &message.id, - ×tr, - ) - if err != nil { - return messageT{}, err - } - - message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) - if err != nil { - return messageT{}, err - } - - return message, nil - } - - closeFn := func() error { - writeFnClose() - return g.SomeError(privateDB.Close(), readStmt.Close()) - } - - return fn, closeFn, nil -} - -func findSQL(prefix string) queryT { - const tmpl_read = ` - SELECT - "%s_messages".id, - "%s_messages".timestamp, - "%s_messages".uuid, - "%s_payloads".payload - FROM "%s_messages" - JOIN "%s_payloads" ON - "%s_payloads".id = "%s_messages".payload_id - WHERE - "%s_payloads".topic = ? AND - "%s_messages".flow_id = ? - ORDER BY "%s_messages".id DESC - LIMIT 1; - ` - return queryT{ - read: fmt.Sprintf( - tmpl_read, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - } -} - -func findStmt( - cfg dbconfigT, -) (func(string, uuid.UUID) (messageT, error), func() error, error) { - q := findSQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - fn := func(topic string, flowID uuid.UUID) (messageT, error) { - message := messageT{ - topic: topic, - flowID: flowID, - } - - var ( - timestr string - message_id_bytes []byte - ) - flow_id_bytes := flowID[:] - err = readStmt.QueryRow(topic, flow_id_bytes).Scan( - &message.id, - ×tr, - &message_id_bytes, - &message.payload, - ) - if err != nil { - return messageT{}, err - } - message.uuid = uuid.UUID(message_id_bytes) - - message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) - if err != nil { - return messageT{}, err - } - - return message, nil - } - - return fn, readStmt.Close, nil -} - -func nextSQL(prefix string) queryT { - const tmpl_read = ` - SELECT - ( - SELECT owner_id FROM "%s_owners" - WHERE - topic = ? AND - consumer = ? - LIMIT 1 - ) AS owner_id, - "%s_messages".id, - "%s_messages".timestamp, - "%s_messages".uuid, - "%s_messages".flow_id, - "%s_payloads".payload - FROM "%s_messages" - JOIN "%s_payloads" ON - "%s_payloads".id = "%s_messages".payload_id - WHERE - "%s_payloads".topic = ? AND - "%s_messages".id NOT IN ( - SELECT message_id FROM "%s_offsets" - WHERE consumer = ? - ) - ORDER BY "%s_messages".id ASC - LIMIT 1; - ` - return queryT{ - read: fmt.Sprintf( - tmpl_read, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - } -} - -func nextStmt( - cfg dbconfigT, -) (func(string, string) (messageT, error), func() error, error) { - q := nextSQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - fn := func(topic string, consumer string) (messageT, error) { - message := messageT{ - topic: topic, - } - - var ( - ownerID int - timestr string - message_id_bytes []byte - flow_id_bytes []byte - ) - - err = readStmt.QueryRow(topic, consumer, topic, consumer).Scan( - &ownerID, - &message.id, - ×tr, - &message_id_bytes, - &flow_id_bytes, - &message.payload, - ) - if err != nil { - return messageT{}, err - } - - if ownerID != cfg.instanceID { - err := fmt.Errorf( - notOwnerErrorFmt, - ownerID, - topic, - consumer, - cfg.instanceID, - ) - return messageT{}, err - } - message.uuid = uuid.UUID(message_id_bytes) - message.flowID = uuid.UUID(flow_id_bytes) - - message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) - if err != nil { - return messageT{}, err - } - - return message, nil - } - - return fn, readStmt.Close, nil -} - -func messageEach(rows *sql.Rows, callback func(messageT) error) error { - if rows == nil { - return nil - } - - for rows.Next() { - var ( - message messageT - timestr string - message_id_bytes []byte - flow_id_bytes []byte - ) - err := rows.Scan( - &message.id, - ×tr, - &message_id_bytes, - &flow_id_bytes, - &message.topic, - &message.payload, - ) - if err != nil { - rows.Close() - return err - } - message.uuid = uuid.UUID(message_id_bytes) - message.flowID = uuid.UUID(flow_id_bytes) - - message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) - if err != nil { - rows.Close() - return err - } - - err = callback(message) - if err != nil { - rows.Close() - return err - } - } - - return g.WrapErrors(rows.Err(), rows.Close()) -} - -func pendingSQL(prefix string) queryT { - const tmpl_read = ` - SELECT - "%s_messages".id, - "%s_messages".timestamp, - "%s_messages".uuid, - "%s_messages".flow_id, - "%s_payloads".topic, - "%s_payloads".payload - FROM "%s_messages" - JOIN "%s_payloads" ON - "%s_payloads".id = "%s_messages".payload_id - WHERE - "%s_payloads".topic = ? AND - "%s_messages".id NOT IN ( - SELECT message_id FROM "%s_offsets" - WHERE consumer = ? - ) - ORDER BY "%s_messages".id ASC; - ` - const tmpl_owner = ` - SELECT owner_id FROM "%s_owners" - WHERE - topic = ? AND - consumer = ?; - ` - return queryT{ - read: fmt.Sprintf( - tmpl_read, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - owner: fmt.Sprintf(tmpl_owner, prefix), - } -} - -func pendingStmt( - cfg dbconfigT, -) (func(string, string) (*sql.Rows, error), func() error, error) { - q := pendingSQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - ownerStmt, err := cfg.shared.Prepare(q.owner) - if err != nil { - readStmt.Close() - return nil, nil, err - } - - fn := func(topic string, consumer string) (*sql.Rows, error) { - var ownerID int - err := ownerStmt.QueryRow(topic, consumer).Scan(&ownerID) - if err != nil { - return nil, err - } - - // best effort check, the final one is done during - // commit within a transaction - if ownerID != cfg.instanceID { - return nil, nil - } - - return readStmt.Query(topic, consumer) - } - - closeFn := func() error { - return g.SomeFnError(readStmt.Close, ownerStmt.Close) - } - - return fn, closeFn, nil -} - -func commitSQL(prefix string) queryT { - const tmpl_write = ` - INSERT INTO "%s_offsets" (consumer, message_id, instance_id) - VALUES (?, (SELECT id FROM "%s_messages" WHERE uuid = ?), ?); - ` - return queryT{ - write: fmt.Sprintf(tmpl_write, prefix, prefix), - } -} - -func commitStmt( - cfg dbconfigT, -) (func(string, uuid.UUID) error, func() error, error) { - q := commitSQL(cfg.prefix) - - writeStmt, err := cfg.shared.Prepare(q.write) - if err != nil { - return nil, nil, err - } - - fn := func(consumer string, messageID uuid.UUID) error { - message_id_bytes := messageID[:] - _, err = writeStmt.Exec( - consumer, - message_id_bytes, - cfg.instanceID, - ) - return err - } - - return fn, writeStmt.Close, nil -} - -func toDeadSQL(prefix string) queryT { - const tmpl_write = ` - INSERT INTO "%s_offsets" - ( consumer, message_id, instance_id) - VALUES ( ?, (SELECT id FROM "%s_messages" WHERE uuid = ?), ?); - - INSERT INTO "%s_deadletters" - (uuid, consumer, message_id, instance_id) - VALUES (?, ?, (SELECT id FROM "%s_messages" WHERE uuid = ?), ?); - ` - return queryT{ - write: fmt.Sprintf(tmpl_write, prefix, prefix, prefix, prefix), - } -} - -func toDeadStmt( - cfg dbconfigT, -) ( - func(string, uuid.UUID, uuid.UUID) error, - func() error, - error, -) { - q := toDeadSQL(cfg.prefix) - - privateDB, err := sql.Open(golite.DriverName, cfg.dbpath) - if err != nil { - return nil, nil, err - } - - writeFn, writeFnClose := execSerialized(q.write, privateDB) - - fn := func( - consumer string, - messageID uuid.UUID, - deadletterID uuid.UUID, - ) error { - message_id_bytes := messageID[:] - deadletter_id_bytes := deadletterID[:] - return writeFn( - consumer, - message_id_bytes, - cfg.instanceID, - deadletter_id_bytes, - consumer, - message_id_bytes, - cfg.instanceID, - ) - } - - closeFn := func() error { - writeFnClose() - return privateDB.Close() - } - - - return fn, closeFn, nil -} - -func replaySQL(prefix string) queryT { - const tmpl_write = ` - INSERT INTO "%s_messages" (uuid, flow_id, payload_id) - SELECT - ?, - "%s_messages".flow_id, - "%s_messages".payload_id - FROM "%s_messages" - JOIN "%s_deadletters" ON - "%s_messages".id = "%s_deadletters".message_id - WHERE "%s_deadletters".uuid = ?; - - INSERT INTO "%s_replays" (deadletter_id, message_id) - VALUES ( - (SELECT id FROM "%s_deadletters" WHERE uuid = ?), - last_insert_rowid() - ); - ` - const tmpl_read = ` - SELECT - "%s_messages".id, - "%s_messages".timestamp, - "%s_messages".flow_id, - "%s_payloads".topic, - "%s_payloads".payload - FROM "%s_messages" - JOIN "%s_payloads" ON - "%s_payloads".id = "%s_messages".payload_id - WHERE "%s_messages".uuid = ?; - ` - return queryT{ - write: fmt.Sprintf( - tmpl_write, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - read: fmt.Sprintf( - tmpl_read, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - } -} - -func replayStmt( - cfg dbconfigT, -) (func(uuid.UUID, uuid.UUID) (messageT, error), func() error, error) { - q := replaySQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - privateDB, err := sql.Open(golite.DriverName, cfg.dbpath) - if err != nil { - readStmt.Close() - return nil, nil, err - } - - writeFn, writeFnClose := execSerialized(q.write, privateDB) - - fn := func( - deadletterID uuid.UUID, - messageID uuid.UUID, - ) (messageT, error) { - deadletter_id_bytes := deadletterID[:] - message_id_bytes := messageID[:] - err := writeFn( - message_id_bytes, - deadletter_id_bytes, - deadletter_id_bytes, - ) - if err != nil { - return messageT{}, err - } - - message := messageT{ - uuid: messageID, - } - - var ( - timestr string - flow_id_bytes []byte - ) - err = readStmt.QueryRow(message_id_bytes).Scan( - &message.id, - ×tr, - &flow_id_bytes, - &message.topic, - &message.payload, - ) - if err != nil { - return messageT{}, err - } - message.flowID = uuid.UUID(flow_id_bytes) - - message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) - if err != nil { - return messageT{}, err - } - - return message, nil - } - - closeFn := func() error { - writeFnClose() - return g.SomeError(privateDB.Close(), readStmt.Close()) - } - - return fn, closeFn, nil -} - -func oneDeadSQL(prefix string) queryT { - const tmpl_read = ` - SELECT - "%s_deadletters".uuid, - "%s_offsets".timestamp, - "%s_messages".uuid - FROM "%s_deadletters" - JOIN "%s_offsets" ON - "%s_deadletters".message_id = "%s_offsets".message_id - JOIN "%s_messages" ON - "%s_deadletters".message_id = "%s_messages".id - JOIN "%s_payloads" ON - "%s_messages".payload_id = "%s_payloads".id - WHERE - "%s_payloads".topic = ? AND - "%s_deadletters".consumer = ? AND - "%s_offsets".consumer = ? AND - "%s_deadletters".id NOT IN ( - SELECT deadletter_id FROM "%s_replays" - ) - ORDER BY "%s_deadletters".id ASC - LIMIT 1; - ` - return queryT{ - read: fmt.Sprintf( - tmpl_read, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - } -} - -func oneDeadStmt( - cfg dbconfigT, -) (func(string, string) (deadletterT, error), func() error, error) { - q := oneDeadSQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - fn := func(topic string, consumer string) (deadletterT, error) { - deadletter := deadletterT{ - consumer: consumer, - } - - var ( - deadletter_id_bytes []byte - timestr string - message_id_bytes []byte - ) - err := readStmt.QueryRow(topic, consumer, consumer).Scan( - &deadletter_id_bytes, - ×tr, - &message_id_bytes, - ) - if err != nil { - return deadletterT{}, err - } - deadletter.uuid = uuid.UUID(deadletter_id_bytes) - deadletter.messageID = uuid.UUID(message_id_bytes) - - deadletter.timestamp, err = time.Parse( - time.RFC3339Nano, - timestr, - ) - if err != nil { - return deadletterT{}, err - } - - return deadletter, nil - } - - return fn, readStmt.Close, nil -} - -func deadletterEach( - rows *sql.Rows, - callback func(deadletterT, messageT) error, -) error { - for rows.Next() { - var ( - deadletter deadletterT - deadletter_id_bytes []byte - deadletterTimestr string - message messageT - messageTimestr string - message_id_bytes []byte - flow_id_bytes []byte - ) - err := rows.Scan( - &deadletter_id_bytes, - &message.id, - &deadletterTimestr, - &deadletter.consumer, - &messageTimestr, - &message_id_bytes, - &flow_id_bytes, - &message.topic, - &message.payload, - ) - if err != nil { - rows.Close() - return err - } - - deadletter.uuid = uuid.UUID(deadletter_id_bytes) - deadletter.messageID = uuid.UUID(message_id_bytes) - message.uuid = uuid.UUID(message_id_bytes) - message.flowID = uuid.UUID(flow_id_bytes) - - message.timestamp, err = time.Parse( - time.RFC3339Nano, - messageTimestr, - ) - if err != nil { - rows.Close() - return err - } - - deadletter.timestamp, err = time.Parse( - time.RFC3339Nano, - deadletterTimestr, - ) - if err != nil { - rows.Close() - return err - } - - err = callback(deadletter, message) - if err != nil { - rows.Close() - return err - } - } - - return g.WrapErrors(rows.Err(), rows.Close()) -} - -func allDeadSQL(prefix string) queryT { - const tmpl_read = ` - SELECT - "%s_deadletters".uuid, - "%s_deadletters".message_id, - "%s_offsets".timestamp, - "%s_offsets".consumer, - "%s_messages".timestamp, - "%s_messages".uuid, - "%s_messages".flow_id, - "%s_payloads".topic, - "%s_payloads".payload - FROM "%s_deadletters" - JOIN "%s_offsets" ON - "%s_deadletters".message_id = "%s_offsets".message_id - JOIN "%s_messages" ON - "%s_deadletters".message_id = "%s_messages".id - JOIN "%s_payloads" ON - "%s_messages".payload_id = "%s_payloads".id - WHERE - "%s_payloads".topic = ? AND - "%s_deadletters".consumer = ? AND - "%s_offsets".consumer = ? AND - "%s_deadletters".id NOT IN ( - SELECT deadletter_id FROM "%s_replays" - ) - ORDER BY "%s_deadletters".id ASC; - ` - return queryT{ - read: fmt.Sprintf( - tmpl_read, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - } -} - -func allDeadStmt( - cfg dbconfigT, -) (func(string, string) (*sql.Rows, error), func() error, error) { - q := allDeadSQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - fn := func(topic string, consumer string) (*sql.Rows, error) { - return readStmt.Query(topic, consumer, consumer) - } - - return fn, readStmt.Close, nil -} - -func sizeSQL(prefix string) queryT { - const tmpl_read = ` - SELECT - COUNT(1) as size - FROM "%s_messages" - JOIN "%s_payloads" ON - "%s_messages".payload_id = "%s_payloads".id - WHERE "%s_payloads".topic = ?; - ` - return queryT{ - read: fmt.Sprintf( - tmpl_read, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - } -} - - -func sizeStmt( - cfg dbconfigT, -) (func(string) (int, error), func() error, error) { - q := sizeSQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - fn := func(topic string) (int, error) { - var size int - err := readStmt.QueryRow(topic).Scan(&size) - if err != nil { - return -1, err - } - - return size, nil - } - - return fn, readStmt.Close, nil -} - -func countSQL(prefix string) queryT { - const tmpl_read = ` - SELECT - COUNT(1) as count - FROM "%s_messages" - JOIN "%s_offsets" ON - "%s_messages".id = "%s_offsets".message_id - JOIN "%s_payloads" ON - "%s_messages".payload_id = "%s_payloads".id - WHERE - "%s_payloads".topic = ? AND - "%s_offsets".consumer = ?; - ` - return queryT{ - read: fmt.Sprintf( - tmpl_read, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - } -} - -func countStmt( - cfg dbconfigT, -) (func(string, string) (int, error), func() error, error) { - q := countSQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - fn := func(topic string, consumer string) (int, error) { - var count int - err := readStmt.QueryRow(topic, consumer).Scan(&count) - if err != nil { - return -1, err - } - - return count, nil - } - - return fn, readStmt.Close, nil -} - -func hasDataSQL(prefix string) queryT { - const tmpl_read = ` - SELECT 1 as data - FROM "%s_messages" - JOIN "%s_payloads" ON - "%s_payloads".id = "%s_messages".payload_id - WHERE - "%s_payloads".topic = ? AND - "%s_messages".id NOT IN ( - SELECT message_id FROM "%s_offsets" - WHERE consumer = ? - ) - LIMIT 1; - ` - return queryT{ - read: fmt.Sprintf( - tmpl_read, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - prefix, - ), - } -} - -func hasDataStmt( - cfg dbconfigT, -) (func(string, string) (bool, error), func() error, error) { - q := hasDataSQL(cfg.prefix) - - readStmt, err := cfg.shared.Prepare(q.read) - if err != nil { - return nil, nil, err - } - - fn := func(topic string, consumer string) (bool, error) { - var _x int - err := readStmt.QueryRow(topic, consumer).Scan(&_x) - if err == sql.ErrNoRows { - return false, nil - } - - if err != nil { - return false, err - } - - return true, nil - } - - return fn, readStmt.Close, nil -} - -func initDB( - dbpath string, - prefix string, - notifyFn func(messageT), - instanceID int, -) (queriesT, error) { - err := g.ValidateSQLTablePrefix(prefix) - if err != nil { - return queriesT{}, err - } - - shared, err := sql.Open(golite.DriverName, dbpath) - if err != nil { - return queriesT{}, err - } - - cfg := dbconfigT{ - shared: shared, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - - createTablesErr := createTables(shared, prefix) - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - find, findClose, findErr := findStmt(cfg) - next, nextClose, nextErr := nextStmt(cfg) - pending, pendingClose, pendingErr := pendingStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - replay, replayClose, replayErr := replayStmt(cfg) - oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) - allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) - size, sizeClose, sizeErr := sizeStmt(cfg) - count, countClose, countErr := countStmt(cfg) - hasData, hasDataClose, hasDataErr := hasDataStmt(cfg) - - err = g.SomeError( - createTablesErr, - takeErr, - publishErr, - findErr, - nextErr, - pendingErr, - commitErr, - toDeadErr, - replayErr, - oneDeadErr, - allDeadErr, - sizeErr, - countErr, - hasDataErr, - ) - if err != nil { - return queriesT{}, err - } - - closeFn := func() error { - return g.SomeFnError( - takeClose, - publishClose, - findClose, - nextClose, - pendingClose, - commitClose, - toDeadClose, - replayClose, - oneDeadClose, - allDeadClose, - sizeClose, - countClose, - hasDataClose, - shared.Close, - ) - } - - var connMutex sync.RWMutex - return queriesT{ - take: func(a string, b string) error { - connMutex.RLock() - defer connMutex.RUnlock() - return take(a, b) - }, - publish: func(a UnsentMessage, b uuid.UUID) (messageT, error) { - var ( - err error - message messageT - ) - { - connMutex.RLock() - defer connMutex.RUnlock() - message, err = publish(a, b) - } - if err != nil { - return messageT{}, err - } - - go notifyFn(message) - return message, nil - }, - find: func(a string, b uuid.UUID) (messageT, error) { - connMutex.RLock() - defer connMutex.RUnlock() - return find(a, b) - }, - next: func(a string, b string) (messageT, error) { - connMutex.RLock() - defer connMutex.RUnlock() - return next(a, b) - }, - pending: func( - a string, - b string, - callback func(messageT) error, - ) error { - var ( - err error - rows *sql.Rows - ) - { - connMutex.RLock() - defer connMutex.RUnlock() - rows, err = pending(a, b) - } - if err != nil { - return err - } - - return messageEach(rows, callback) - }, - commit: func(a string, b uuid.UUID) error { - connMutex.RLock() - defer connMutex.RUnlock() - return commit(a, b) - }, - toDead: func( - a string, - b uuid.UUID, - c uuid.UUID, - ) error { - connMutex.RLock() - defer connMutex.RUnlock() - return toDead(a, b, c) - }, - replay: func(a uuid.UUID, b uuid.UUID) (messageT, error) { - var ( - err error - message messageT - ) - { - connMutex.RLock() - defer connMutex.RUnlock() - message, err = replay(a, b) - } - if err != nil { - return messageT{}, err - } - - go notifyFn(message) - return message, nil - }, - oneDead: func(a string, b string) (deadletterT, error) { - connMutex.RLock() - defer connMutex.RUnlock() - return oneDead(a, b) - }, - allDead: func( - a string, - b string, - callback func(deadletterT, messageT) error, - ) error { - var ( - err error - rows *sql.Rows - ) - { - connMutex.RLock() - defer connMutex.RUnlock() - rows, err = allDead(a, b) - } - if err != nil { - return err - } - - return deadletterEach(rows, callback) - }, - size: func(a string) (int, error) { - connMutex.RLock() - defer connMutex.RUnlock() - return size(a) - }, - count: func(a string, b string) (int, error) { - connMutex.RLock() - defer connMutex.RUnlock() - return count(a, b) - }, - hasData: func(a string, b string) (bool, error) { - connMutex.RLock() - defer connMutex.RUnlock() - return hasData(a, b) - }, - close: func() error { - connMutex.Lock() - defer connMutex.Unlock() - return closeFn() - }, - }, nil -} - - -func newPinger[T any]() pingerT[T] { - channel := make(chan T, 1) - closed := false - var rwmutex sync.RWMutex - return pingerT[T]{ - tryPing: func(x T) { - rwmutex.RLock() - defer rwmutex.RUnlock() - if closed { - return - } - select { - case channel <- x: - default: - } - }, - onPing: func(cb func(T)) { - for x := range channel { - cb(x) - } - }, - closed: func() bool { - rwmutex.RLock() - defer rwmutex.RUnlock() - return closed - }, - close: func() { - rwmutex.Lock() - defer rwmutex.Unlock() - if closed { - return - } - close(channel) - closed = true - }, - } -} - -func makeSubscriptionsFuncs() subscriptionsT { - var rwmutex sync.RWMutex - subscriptions := subscriptionsSetM{} - return subscriptionsT{ - read: func(callback func(subscriptionsSetM) error) error { - rwmutex.RLock() - defer rwmutex.RUnlock() - return callback(subscriptions) - }, - write: func(callback func(subscriptionsSetM) error) error { - rwmutex.Lock() - defer rwmutex.Unlock() - return callback(subscriptions) - }, - } -} - -// Try notifying the consumer that they have data to work with. If they're -// already full, we simply drop the notification, as on each they'll look for -// all pending messages and process them all. So dropping the event here -// doesn't mean not notifying the consumer, but simply acknoledging that the -// existing notifications are enough for them to work with, without letting any -// message slip through. -func makeNotifyFn( - readFn func(func(subscriptionsSetM) error) error, - pinger pingerT[struct{}], -) func(messageT) { - return func(message messageT) { - readFn(func(set subscriptionsSetM) error { - topicSub, ok := set[message.topic] - if !ok { - return nil - } - - for _, consumer := range topicSub.consumers { - consumer.pinger.tryPing(struct{}{}) - } - waiters := topicSub.waiters[message.flowID] - for _, waiter := range waiters { - waiter.pinger.tryPing(message.payload) - } - return nil - }) - pinger.tryPing(struct{}{}) - } -} - -func collectClosedWaiters( - set subscriptionsSetM, -) map[string]map[uuid.UUID][]string { - waiters := map[string]map[uuid.UUID][]string{} - for topic, topicSub := range set { - waiters[topic] = map[uuid.UUID][]string{} - for flowID, waitersByName := range topicSub.waiters { - names := []string{} - for name, waiter := range waitersByName { - if (*waiter.closed)() { - names = append(names, name) - } - } - waiters[topic][flowID] = names - } - } - - return waiters -} - -func trimEmptyLeaves(closedWaiters map[string]map[uuid.UUID][]string) { - for topic, waiters := range closedWaiters { - for flowID, names := range waiters { - if len(names) == 0 { - delete(closedWaiters[topic], flowID) - } - } - if len(waiters) == 0 { - delete(closedWaiters, topic) - } - } -} - -func deleteIfEmpty(set subscriptionsSetM, topic string) { - topicSub, ok := set[topic] - if !ok { - return - } - - emptyConsumers := len(topicSub.consumers) == 0 - emptyWaiters := len(topicSub.waiters) == 0 - if emptyConsumers && emptyWaiters { - delete(set, topic) - } -} - -func deleteEmptyTopics(set subscriptionsSetM) { - for topic, _ := range set { - deleteIfEmpty(set, topic) - } -} - -func removeClosedWaiters( - set subscriptionsSetM, - closedWaiters map[string]map[uuid.UUID][]string, -) { - for topic, waiters := range closedWaiters { - _, ok := set[topic] - if !ok { - continue - } - - for flowID, names := range waiters { - if set[topic].waiters[flowID] == nil { - continue - } - for _, name := range names { - delete(set[topic].waiters[flowID], name) - } - if len(set[topic].waiters[flowID]) == 0 { - delete(set[topic].waiters, flowID) - } - } - } - - deleteEmptyTopics(set) -} - -func reapClosedWaiters( - readFn func(func(subscriptionsSetM) error) error, - writeFn func(func(subscriptionsSetM) error) error, -) { - var closedWaiters map[string]map[uuid.UUID][]string - readFn(func(set subscriptionsSetM) error { - closedWaiters = collectClosedWaiters(set) - return nil - }) - - trimEmptyLeaves(closedWaiters) - if len(closedWaiters) == 0 { - return - } - - writeFn(func(set subscriptionsSetM) error { - removeClosedWaiters(set, closedWaiters) - return nil - }) -} - -func everyNthCall[T any](n int, fn func(T)) func(T) { - i := 0 - return func(x T) { - i++ - if i == n { - i = 0 - fn(x) - } - } -} - -func runReaper( - onPing func(func(struct{})), - readFn func(func(subscriptionsSetM) error) error, - writeFn func(func(subscriptionsSetM) error) error, -) { - onPing(everyNthCall(reaperSkipCount, func(struct{}) { - reapClosedWaiters(readFn, writeFn) - })) -} - -func NewWithPrefix(databasePath string, prefix string) (IQueue, error) { - subscriptions := makeSubscriptionsFuncs() - pinger := newPinger[struct{}]() - notifyFn := makeNotifyFn(subscriptions.read, pinger) - queries, err := initDB(databasePath, prefix, notifyFn, os.Getpid()) - if err != nil { - return queueT{}, err - } - - go runReaper(pinger.onPing, subscriptions.read, subscriptions.write) - - return queueT{ - queries: queries, - subscriptions: subscriptions, - pinger: pinger, - }, nil -} - -func New(databasePath string) (IQueue, error) { - return NewWithPrefix(databasePath, defaultPrefix) -} - -func asPublicMessage(message messageT) Message { - return Message{ - ID: message.uuid, - Timestamp: message.timestamp, - Topic: message.topic, - FlowID: message.flowID, - Payload: message.payload, - } -} - -func (queue queueT) Publish(unsent UnsentMessage) (Message, error) { - message, err := queue.queries.publish(unsent, uuid.New()) - if err != nil { - return Message{}, err - } - - return asPublicMessage(message), nil -} - -func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error { - topicSub := topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - } - - return func(set subscriptionsSetM) error { - topic := consumer.data.topic - _, ok := set[topic] - if !ok { - set[topic] = topicSub - } - set[topic].consumers[consumer.data.name] = consumer - - return nil - } -} - -func registerWaiterFn(waiter waiterT) func(subscriptionsSetM) error { - topicSub := topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - } - waiters := map[string]waiterT{} - - return func(set subscriptionsSetM) error { - var ( - topic = waiter.data.topic - flowID = waiter.data.flowID - ) - _, ok := set[topic] - if !ok { - set[topic] = topicSub - } - if set[topic].waiters[flowID] == nil { - set[topic].waiters[flowID] = waiters - } - set[topic].waiters[flowID][waiter.data.name] = waiter - return nil - } -} - -func makeConsumeOneFn( - data consumerDataT, - callback func(Message) error, - successFn func(string, uuid.UUID) error, - errorFn func(string, uuid.UUID, uuid.UUID) error, -) func(messageT) error { - return func(message messageT) error { - err := callback(asPublicMessage(message)) - if err != nil { - g.Info( - "consumer failed", "fiinha-consumer", - "topic", data.topic, - "consumer", data.name, - "error", err, - slog.Group( - "message", - "id", message.id, - "flow-id", message.flowID.String(), - ), - ) - - return errorFn(data.name, message.uuid, uuid.New()) - } - - return successFn(data.name, message.uuid) - } -} - -func makeConsumeAllFn( - data consumerDataT, - consumeOneFn func(messageT) error, - eachFn func(string, string, func(messageT) error) error, -) func(struct{}) { - return func(struct{}) { - err := eachFn(data.topic, data.name, consumeOneFn) - if err != nil { - g.Warning( - "eachFn failed", "fiinha-consume-all", - "topic", data.topic, - "consumer", data.name, - "error", err, - "circuit-breaker-enabled?", false, - ) - } - } -} - -func makeWaitFn(channel chan []byte, closeFn func()) func([]byte) { - closed := false - var mutex sync.Mutex - return func(payload []byte) { - mutex.Lock() - defer mutex.Unlock() - if closed { - return - } - - closeFn() - channel <- payload - close(channel) - closed = true - } -} - -func runConsumer(onPing func(func(struct{})), consumeAllFn func(struct{})) { - consumeAllFn(struct{}{}) - onPing(consumeAllFn) -} - -func tryFinding( - findFn func(string, uuid.UUID) (messageT, error), - topic string, - flowID uuid.UUID, - waitFn func([]byte), -) { - message, err := findFn(topic, flowID) - if err != nil { - return - } - - waitFn(message.payload) -} - -func (queue queueT) Subscribe( - topic string, - name string, - callback func(Message) error, -) error { - data := consumerDataT{ - topic: topic, - name: name, - } - pinger := newPinger[struct{}]() - consumer := consumerT{ - data: data, - callback: callback, - pinger: pinger, - close: &pinger.close, - } - consumeOneFn := makeConsumeOneFn( - consumer.data, - consumer.callback, - queue.queries.commit, - queue.queries.toDead, - ) - consumeAllFn := makeConsumeAllFn( - consumer.data, - consumeOneFn, - queue.queries.pending, - ) - - err := queue.queries.take(topic, name) - if err != nil { - return err - } - - queue.subscriptions.write(registerConsumerFn(consumer)) - go runConsumer(pinger.onPing, consumeAllFn) - return nil -} - -type Waiter struct{ - Channel <-chan []byte - Close func() -} - -func (queue queueT) WaitFor( - topic string, - flowID uuid.UUID, - name string, -) Waiter { - data := waiterDataT{ - topic: topic, - flowID: flowID, - name: name, - } - pinger := newPinger[[]byte]() - waiter := waiterT{ - data: data, - pinger: pinger, - closed: &pinger.closed, - close: &pinger.close, - } - channel := make(chan []byte, 1) - waitFn := makeWaitFn(channel, (*waiter.close)) - closeFn := func() { - queue.subscriptions.read(func(set subscriptionsSetM) error { - (*set[topic].waiters[flowID][name].close)() - return nil - }) - } - - queue.subscriptions.write(registerWaiterFn(waiter)) - tryFinding(queue.queries.find, topic, flowID, waitFn) - go pinger.onPing(waitFn) - return Waiter{channel, closeFn} -} - -func unsubscribeIfExistsFn( - topic string, - name string, -) func(subscriptionsSetM) error { - return func(set subscriptionsSetM) error { - topicSub, ok := set[topic] - if !ok { - return nil - } - - consumer, ok := topicSub.consumers[name] - if !ok { - return nil - } - - (*consumer.close)() - delete(set[topic].consumers, name) - deleteIfEmpty(set, topic) - return nil - } -} - -func (queue queueT) Unsubscribe(topic string, name string) { - queue.subscriptions.write(unsubscribeIfExistsFn(topic, name)) -} - -func cleanSubscriptions(set subscriptionsSetM) error { - for _, topicSub := range set { - for _, consumer := range topicSub.consumers { - (*consumer.close)() - } - for _, waiters := range topicSub.waiters { - for _, waiter := range waiters { - (*waiter.close)() - } - } - } - return nil -} - -func (queue queueT) Close() error { - queue.pinger.close() - return g.WrapErrors( - queue.subscriptions.write(cleanSubscriptions), - queue.queries.close(), - ) -} - - -func topicGetopt(args argsT, w io.Writer) (argsT, bool) { - if len(args.args) == 0 { - fmt.Fprintf(w, "Missing TOPIC.\n") - return args, false - } - - args.topic = args.args[0] - return args, true -} - -func topicConsumerGetopt(args argsT, w io.Writer) (argsT, bool) { - fs := flag.NewFlagSet("", flag.ContinueOnError) - fs.Usage = func() {} - fs.SetOutput(w) - - consumer := fs.String( - "C", - "default-consumer", - "The name of the consumer to be used", - ) - - if fs.Parse(args.args) != nil { - return args, false - } - - subArgs := fs.Args() - if len(subArgs) == 0 { - fmt.Fprintf(w, "Missing TOPIC.\n") - return args, false - } - - args.consumer = *consumer - args.topic = subArgs[0] - return args, true -} - -func inExec( - args argsT, - queries queriesT, - r io.Reader, - w io.Writer, -) (int, error) { - payload, err := io.ReadAll(r) - if err != nil { - return 1, err - } - - unsent := UnsentMessage{ - Topic: args.topic, - FlowID: uuid.New(), - Payload: payload, - } - message, err := queries.publish(unsent, uuid.New()) - if err != nil { - return 1, err - } - - fmt.Fprintf(w, "%s\n", message.uuid.String()) - - return 0, nil -} - -func outExec( - args argsT, - queries queriesT, - _ io.Reader, - w io.Writer, -) (int, error) { - err := queries.take(args.topic, args.consumer) - if err != nil { - return 1, err - } - - message, err := queries.next(args.topic, args.consumer) - - if err == sql.ErrNoRows { - return 3, nil - } - - if err != nil { - return 1, err - } - - fmt.Fprintln(w, string(message.payload)) - - return 0, nil -} - -func commitExec( - args argsT, - queries queriesT, - r io.Reader, - w io.Writer, -) (int, error) { - err := queries.take(args.topic, args.consumer) - if err != nil { - return 1, err - } - - message, err := queries.next(args.topic, args.consumer) - if err != nil { - return 1, err - } - - err = queries.commit(args.consumer, message.uuid) - if err != nil { - return 1, err - } - - return 0, nil -} - -func deadExec( - args argsT, - queries queriesT, - r io.Reader, - w io.Writer, -) (int, error) { - err := queries.take(args.topic, args.consumer) - if err != nil { - return 1, err - } - - message, err := queries.next(args.topic, args.consumer) - if err != nil { - return 1, err - } - - err = queries.toDead(args.consumer, message.uuid, uuid.New()) - if err != nil { - return 1, err - } - - return 0, nil -} - -func listDeadExec( - args argsT, - queries queriesT, - r io.Reader, - w io.Writer, -) (int, error) { - eachFn := func(deadletter deadletterT, _ messageT) error { - fmt.Fprintf( - w, - "%s\t%s\t%s\n", - deadletter.uuid.String(), - deadletter.timestamp.Format(time.RFC3339), - deadletter.consumer, - ) - return nil - } - - err := queries.allDead(args.topic, args.consumer, eachFn) - if err != nil { - return 1, err - } - - return 0, nil -} - -func replayExec( - args argsT, - queries queriesT, - r io.Reader, - w io.Writer, -) (int, error) { - deadletter, err := queries.oneDead(args.topic, args.consumer) - if err != nil { - return 1, err - } - - _, err = queries.replay(deadletter.uuid, uuid.New()) - if err != nil { - return 1, err - } - - return 0, nil -} - -func sizeExec( - args argsT, - queries queriesT, - r io.Reader, - w io.Writer, -) (int, error) { - size, err := queries.size(args.topic) - if err != nil { - return 1, err - } - - fmt.Fprintln(w, size) - - return 0, nil -} - -func countExec( - args argsT, - queries queriesT, - r io.Reader, - w io.Writer, -) (int, error) { - count, err := queries.count(args.topic, args.consumer) - if err != nil { - return 1, err - } - - fmt.Fprintln(w, count) - - return 0, nil -} - -func hasDataExec( - args argsT, - queries queriesT, - r io.Reader, - w io.Writer, -) (int, error) { - hasData, err := queries.hasData(args.topic, args.consumer) - if err != nil { - return 1, err - } - - if hasData { - return 0, nil - } else { - return 1, nil - } -} - -func usage(argv0 string, w io.Writer) { - fmt.Fprintf( - w, - "Usage: %s [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n", - argv0, - ) -} - -func getopt( - allArgs []string, - commandsMap map[string]commandT, - w io.Writer, -) (argsT, commandT, int) { - argv0 := allArgs[0] - argv := allArgs[1:] - fs := flag.NewFlagSet("", flag.ContinueOnError) - fs.Usage = func() {} - fs.SetOutput(w) - databasePath := fs.String( - "f", - "fiinha.db", - "The path to the file where the queue is kept", - ) - prefix := fs.String( - "p", - defaultPrefix, - "The fiinha prefix of the table names", - ) - if fs.Parse(argv) != nil { - usage(argv0, w) - return argsT{}, commandT{}, 2 - } - - subArgs := fs.Args() - if len(subArgs) == 0 { - fmt.Fprintf(w, "Missing COMMAND.\n") - usage(argv0, w) - return argsT{}, commandT{}, 2 - } - - args := argsT{ - databasePath: *databasePath, - prefix: *prefix, - command: subArgs[0], - allArgs: allArgs, - args: subArgs[1:], - } - - command := commandsMap[args.command] - if command.name == "" { - fmt.Fprintf(w, "Bad COMMAND: \"%s\".\n", args.command) - usage(argv0, w) - return argsT{}, commandT{}, 2 - } - - args, ok := command.getopt(args, w) - if !ok { - usage(argv0, w) - return argsT{}, commandT{}, 2 - } - - return args, command, 0 -} - -func runCommand( - args argsT, - command commandT, - stdin io.Reader, - stdout io.Writer, - stderr io.Writer, -) int { - iqueue, err := NewWithPrefix(args.databasePath, args.prefix) - if err != nil { - fmt.Fprintln(stderr, err) - return 1 - } - defer iqueue.Close() - - rc, err := command.exec(args, iqueue.(queueT).queries, stdin, stdout) - if err != nil { - fmt.Fprintln(stderr, err) - } - - return rc -} - -var commands = map[string]commandT{ - "in": commandT{ - name: "in", - getopt: topicGetopt, - exec: inExec, - }, - "out": commandT{ - name: "out", - getopt: topicConsumerGetopt, - exec: outExec, - }, - "commit": commandT{ - name: "commit", - getopt: topicConsumerGetopt, - exec: commitExec, - }, - "dead": commandT{ - name: "dead", - getopt: topicConsumerGetopt, - exec: deadExec, - }, - "ls-dead": commandT{ - name: "ls-dead", - getopt: topicConsumerGetopt, - exec: listDeadExec, - }, - "replay": commandT{ - name: "replay", - getopt: topicConsumerGetopt, - exec: replayExec, - }, - "size": commandT{ - name: "size", - getopt: topicGetopt, - exec: sizeExec, - }, - "count": commandT{ - name: "count", - getopt: topicConsumerGetopt, - exec: countExec, - }, - "has-data": commandT{ - name: "has-data", - getopt: topicConsumerGetopt, - exec: hasDataExec, - }, -} - - - -func Main() { - g.Init() - args, command, rc := getopt(os.Args, commands, os.Stderr) - if rc != 0 { - os.Exit(rc) - } - os.Exit(runCommand(args, command, os.Stdin, os.Stdout, os.Stderr)) -} diff --git a/src/main.go b/src/main.go deleted file mode 100644 index af67e07..0000000 --- a/src/main.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "fiinha" - -func main() { - fiinha.Main() -} diff --git a/tests/benchmarks/deadletters/fiinha.go b/tests/benchmarks/deadletters/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/deadletters/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/deadletters/main.go b/tests/benchmarks/deadletters/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/deadletters/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/lookup/fiinha.go b/tests/benchmarks/lookup/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/lookup/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/lookup/main.go b/tests/benchmarks/lookup/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/lookup/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/multiple-consumers/fiinha.go b/tests/benchmarks/multiple-consumers/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/multiple-consumers/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/multiple-consumers/main.go b/tests/benchmarks/multiple-consumers/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/multiple-consumers/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/multiple-produces/fiinha.go b/tests/benchmarks/multiple-produces/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/multiple-produces/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/multiple-produces/main.go b/tests/benchmarks/multiple-produces/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/multiple-produces/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/reaper/fiinha.go b/tests/benchmarks/reaper/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/reaper/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/reaper/main.go b/tests/benchmarks/reaper/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/reaper/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/replay/fiinha.go b/tests/benchmarks/replay/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/replay/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/replay/main.go b/tests/benchmarks/replay/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/replay/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/single-consumer/fiinha.go b/tests/benchmarks/single-consumer/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/single-consumer/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/single-consumer/main.go b/tests/benchmarks/single-consumer/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/single-consumer/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/single-producer/fiinha.go b/tests/benchmarks/single-producer/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/single-producer/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/single-producer/main.go b/tests/benchmarks/single-producer/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/single-producer/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/subscribe/fiinha.go b/tests/benchmarks/subscribe/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/subscribe/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/subscribe/main.go b/tests/benchmarks/subscribe/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/subscribe/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/unsubscribe/fiinha.go b/tests/benchmarks/unsubscribe/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/unsubscribe/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/unsubscribe/main.go b/tests/benchmarks/unsubscribe/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/unsubscribe/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/waiter/fiinha.go b/tests/benchmarks/waiter/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/waiter/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/waiter/main.go b/tests/benchmarks/waiter/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/waiter/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/cli-opts.sh b/tests/cli-opts.sh deleted file mode 100755 index fcb62ca..0000000 --- a/tests/cli-opts.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -set -eu - -exit diff --git a/tests/fiinha.go b/tests/fiinha.go deleted file mode 100644 index 0901190..0000000 --- a/tests/fiinha.go +++ /dev/null @@ -1,5889 +0,0 @@ -package fiinha - -import ( - "bytes" - "database/sql" - "errors" - "fmt" - "io" - "log/slog" - "os" - "reflect" - "sort" - "strings" - "sync" - "time" - - "golite" - "uuid" - g "gobang" -) - - - -var instanceID = os.Getpid() - - - -func test_defaultPrefix() { - g.TestStart("defaultPrefix") - - g.Testing("the defaultPrefix is valid", func() { - g.TErrorIf(g.ValidateSQLTablePrefix(defaultPrefix)) - }) -} - -func test_tryRollback() { - g.TestStart("tryRollback()") - - myErr := errors.New("bottom error") - - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - defer db.Close() - - - g.Testing("the error is propagated if rollback doesn't fail", func() { - tx, err := db.Begin() - g.TErrorIf(err) - - err = tryRollback(tx, myErr) - g.TAssertEqual(err, myErr) - }) - - g.Testing("a wrapped error when rollback fails", func() { - tx, err := db.Begin() - g.TErrorIf(err) - - err = tx.Commit() - g.TErrorIf(err) - - err = tryRollback(tx, myErr) - g.TAssertEqual(reflect.DeepEqual(err, myErr), false) - g.TAssertEqual(errors.Is(err, myErr), true) - }) -} - -func test_inTx() { - g.TestStart("inTx()") - - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - defer db.Close() - - - g.Testing("when fn() errors, we propagate it", func() { - myErr := errors.New("to be propagated") - err := inTx(db, func(tx *sql.Tx) error { - return myErr - }) - g.TAssertEqual(err, myErr) - }) - - g.Testing("on nil error we get nil", func() { - err := inTx(db, func(tx *sql.Tx) error { - return nil - }) - g.TErrorIf(err) - }) -} - -func test_serialized() { - // FIXME -} - -func test_execSerialized() { - // FIXME -} - -func test_createTables() { - g.TestStart("createTables()") - - const ( - dbpath = golite.InMemory - prefix = defaultPrefix - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - defer db.Close() - - - g.Testing("tables exist afterwards", func() { - const tmpl_read = ` - SELECT id FROM "%s_messages" LIMIT 1; - ` - qRead := fmt.Sprintf(tmpl_read, prefix) - - _, err := db.Exec(qRead) - g.TErrorNil(err) - - err = createTables(db, prefix) - g.TErrorIf(err) - - _, err = db.Exec(qRead) - g.TErrorIf(err) - }) - - g.Testing("we can do it multiple times", func() { - g.TErrorIf(g.SomeError( - createTables(db, prefix), - createTables(db, prefix), - createTables(db, prefix), - )) - }) -} - -func test_takeStmt() { - g.TestStart("takeStmt()") - - const ( - topic = "take() topic" - consumer = "take() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - g.TErrorIf(takeErr) - defer g.SomeFnError( - takeClose, - db.Close, - ) - - const tmpl = ` - SELECT owner_id from "%s_owners" - WHERE - topic = ? AND - consumer = ?; - ` - sqlOwner := fmt.Sprintf(tmpl, prefix) - - - g.Testing("when there is no owner, we become it", func() { - var ownerID int - err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) - g.TAssertEqual(err, sql.ErrNoRows) - - err = take(topic, consumer) - g.TErrorIf(err) - - err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) - g.TErrorIf(err) - g.TAssertEqual(ownerID, instanceID) - }) - - g.Testing("if there is already an owner, we overtake it", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - var ownerID int - err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) - g.TErrorIf(err) - g.TAssertEqual(ownerID, instanceID) - - err = take(topic, consumer) - g.TErrorIf(err) - - err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) - g.TErrorIf(err) - g.TAssertEqual(ownerID, otherCfg.instanceID) - }) - g.Testing("no error if closed more than once", func() { - g.TErrorIf(g.SomeError( - takeClose(), - takeClose(), - takeClose(), - )) - }) -} - -func test_publishStmt() { - g.TestStart("publishStmt()") - - const ( - topic = "publish() topic" - payloadStr = "publish() payload" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - publish, publishClose, publishErr := publishStmt(cfg) - g.TErrorIf(publishErr) - defer g.SomeFnError( - publishClose, - db.Close, - ) - - - g.Testing("we can publish a message", func() { - messageID := uuid.New() - message, err := publish(unsent, messageID) - g.TErrorIf(err) - - g.TAssertEqual(message.id, int64(1)) - g.TAssertEqual(message.uuid, messageID) - g.TAssertEqual(message.topic, topic) - g.TAssertEqual(message.flowID, flowID) - g.TAssertEqual(message.payload, payload) - }) - - g.Testing("we can publish the same message repeatedly", func() { - messageID1 := uuid.New() - messageID2 := uuid.New() - message1, err1 := publish(unsent, messageID1) - message2, err2 := publish(unsent, messageID2) - g.TErrorIf(g.SomeError(err1, err2)) - - g.TAssertEqual(message1.id, message2.id - 1) - g.TAssertEqual(message1.topic, message2.topic) - g.TAssertEqual(message1.flowID, message2.flowID) - g.TAssertEqual(message1.payload, message2.payload) - - g.TAssertEqual(message1.uuid, messageID1) - g.TAssertEqual(message2.uuid, messageID2) - }) - - g.Testing("publishing a message with the same UUID errors", func() { - messageID := uuid.New() - message1, err1 := publish(unsent, messageID) - _, err2 := publish(unsent, messageID) - g.TErrorIf(err1) - - g.TAssertEqual(message1.uuid, messageID) - g.TAssertEqual(message1.topic, topic) - g.TAssertEqual(message1.flowID, flowID) - g.TAssertEqual(message1.payload, payload) - - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - publishClose(), - publishClose(), - publishClose(), - )) - }) -} - -func test_findStmt() { - g.TestStart("findStmt()") - - const ( - topic = "find() topic" - payloadStr = "find() payload" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - publish, publishClose, publishErr := publishStmt(cfg) - find, findClose, findErr := findStmt(cfg) - g.TErrorIf(g.SomeError( - publishErr, - findErr, - )) - defer g.SomeFnError( - publishClose, - findClose, - db.Close, - ) - - pub := func(flowID uuid.UUID) uuid.UUID { - unsentWithFlowID := unsent - unsentWithFlowID.FlowID = flowID - messageID := uuid.New() - _, err := publish(unsentWithFlowID, messageID) - g.TErrorIf(err) - return messageID - } - - - g.Testing("we can find a message by topic and flowID", func() { - flowID := uuid.New() - messageID := pub(flowID) - message, err := find(topic, flowID) - g.TErrorIf(err) - - g.TAssertEqual(message.uuid, messageID) - g.TAssertEqual(message.topic, topic) - g.TAssertEqual(message.flowID, flowID) - g.TAssertEqual(message.payload, payload) - }) - - g.Testing("a non-existent message gives us an error", func() { - message, err := find(topic, uuid.New()) - g.TAssertEqual(message, messageT{}) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("findig twice yields the exact same message", func() { - flowID := uuid.New() - messageID := pub(flowID) - message1, err1 := find(topic, flowID) - message2, err2 := find(topic, flowID) - g.TErrorIf(g.SomeError(err1, err2)) - - g.TAssertEqual(message1.uuid, messageID) - g.TAssertEqual(message1, message2) - }) - - g.Testing("returns the latest entry if multiple are available", func() { - flowID := uuid.New() - - _ , err0 := find(topic, flowID) - pub(flowID) - message1, err1 := find(topic, flowID) - pub(flowID) - message2, err2 := find(topic, flowID) - - g.TAssertEqual(err0, sql.ErrNoRows) - g.TErrorIf(g.SomeError(err1, err2)) - g.TAssertEqual(message1.uuid == message2.uuid, false) - g.TAssertEqual(message1.id < message2.id, true) - }) - - g.Testing("no error if closed more than once", func() { - g.TErrorIf(g.SomeError( - findClose(), - findClose(), - findClose(), - )) - }) -} - -func test_nextStmt() { - g.TestStart("nextStmt()") - - const ( - topic = "next() topic" - payloadStr = "next() payload" - consumer = "next() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - next, nextClose, nextErr := nextStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - g.TErrorIf(takeErr) - g.TErrorIf(publishErr) - g.TErrorIf(nextErr) - g.TErrorIf(commitErr) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - nextErr, - commitErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - nextClose, - commitClose, - db.Close, - ) - - pub := func(topic string) messageT { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message - } - - - g.Testing("we get an error on empty topic", func() { - _, err := next(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("we don't get messages from other topics", func() { - pub("other topic") - _, err := next(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("we can get the next message", func() { - expectedMessage := pub(topic) - pub(topic) - pub(topic) - message, err := next(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(message, expectedMessage) - }) - - g.Testing("we keep getting the next until we commit", func() { - message1, err1 := next(topic, consumer) - message2, err2 := next(topic, consumer) - g.TErrorIf(commit(consumer, message1.uuid)) - message3, err3 := next(topic, consumer) - g.TErrorIf(g.SomeError(err1, err2, err3)) - - g.TAssertEqual(message1, message2) - g.TAssertEqual(message2.uuid != message3.uuid, true) - }) - - g.Testing("each consumer has its own next message", func() { - g.TErrorIf(take(topic, "other consumer")) - message1, err1 := next(topic, consumer) - message2, err2 := next(topic, "other consumer") - g.TErrorIf(g.SomeError(err1, err2)) - g.TAssertEqual(message1.uuid != message2.uuid, true) - }) - - g.Testing("error when we're not the owner", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - _, err := next(topic, consumer) - g.TErrorIf(err) - - err = take(topic, consumer) - g.TErrorIf(err) - - _, err = next(topic, consumer) - g.TAssertEqual(err, fmt.Errorf( - notOwnerErrorFmt, - otherCfg.instanceID, - topic, - consumer, - instanceID, - )) - }) - - g.Testing("we can close more than once", func() { - g.TErrorIf(g.SomeError( - nextClose(), - nextClose(), - nextClose(), - )) - }) -} - -func test_messageEach() { - g.TestStart("messageEach()") - - const ( - topic = "messageEach() topic" - payloadStr = "messageEach() payload" - consumer = "messageEach() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - pending, pendingClose, pendingErr := pendingStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - pendingErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - pendingClose, - db.Close, - ) - - pub := func() uuid.UUID { - message, err := publish(unsent, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - g.TErrorIf(take(topic, consumer)) - - - g.Testing("not called on empty set", func() { - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - messageEach(rows, func(messageT) error { - g.Unreachable() - return nil - }) - }) - - g.Testing("the callback is called once for each entry", func() { - messageIDs := []uuid.UUID{ - pub(), - pub(), - pub(), - } - - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - var collectedIDs []uuid.UUID - err = messageEach(rows, func(message messageT) error { - collectedIDs = append(collectedIDs, message.uuid) - return nil - }) - g.TErrorIf(err) - - g.TAssertEqual(collectedIDs, messageIDs) - }) - - g.Testing("we halt if the timestamp is ill-formatted", func() { - messageID := pub() - message_id_bytes := messageID[:] - pub() - pub() - pub() - - const tmplUpdate = ` - UPDATE "%s_messages" - SET timestamp = '01/01/1970' - WHERE uuid = ?; - ` - sqlUpdate := fmt.Sprintf(tmplUpdate, prefix) - _, err := db.Exec(sqlUpdate, message_id_bytes) - g.TErrorIf(err) - - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - n := 0 - err = messageEach(rows, func(messageT) error { - n++ - return nil - }) - - g.TAssertEqual( - err, - &time.ParseError{ - Layout: time.RFC3339Nano, - Value: "01/01/1970", - LayoutElem: "2006", - ValueElem: "01/01/1970", - Message: "", - }, - ) - g.TAssertEqual(n, 3) - - const tmplDelete = ` - DELETE FROM "%s_messages" - WHERE uuid = ?; - ` - sqlDelete := fmt.Sprintf(tmplDelete, prefix) - _, err = db.Exec(sqlDelete, message_id_bytes) - g.TErrorIf(err) - }) - - g.Testing("we halt if the callback returns an error", func() { - myErr := errors.New("callback error early return") - - rows1, err1 := pending(topic, consumer) - g.TErrorIf(err1) - - n1 := 0 - err1 = messageEach(rows1, func(messageT) error { - n1++ - if n1 == 4 { - return myErr - } - return nil - }) - - rows2, err2 := pending(topic, consumer) - g.TErrorIf(err2) - - n2 := 0 - err2 = messageEach(rows2, func(messageT) error { - n2++ - return nil - }) - - g.TAssertEqual(err1, myErr) - g.TErrorIf(err2) - g.TAssertEqual(n1, 4) - g.TAssertEqual(n2, 6) - }) - - g.Testing("noop when given nil for *sql.Rows", func() { - err := messageEach(nil, func(messageT) error { - g.Unreachable() - return nil - }) - g.TErrorIf(err) - }) -} - -func test_pendingStmt() { - g.TestStart("pendingStmt()") - - const ( - topic = "pending() topic" - payloadStr = "pending() payload" - consumer = "pending() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - pending, pendingClose, pendingErr := pendingStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - pendingErr, - commitErr, - toDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - pendingClose, - commitClose, - toDeadClose, - db.Close, - ) - - pub := func(topic string) messageT { - g.TErrorIf(take(topic, consumer)) - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message - } - g.TErrorIf(take(topic, consumer)) - - collectPending := func(topic string, consumer string) []messageT { - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - var messages []messageT - err = messageEach(rows, func(message messageT) error { - messages = append(messages, message) - return nil - }) - g.TErrorIf(err) - return messages - } - - - g.Testing("an empty database has 0 pending items", func() { - g.TAssertEqual(len(collectPending(topic, consumer)), 0) - }) - - g.Testing("after publishing we get all messages", func() { - expected := []messageT{ - pub(topic), - pub(topic), - } - - g.TAssertEqualI(collectPending(topic, consumer), expected) - }) - - g.Testing("we get the same messages when calling again", func() { - messages1 := collectPending(topic, consumer) - messages2 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - g.TAssertEqualI(messages1, messages2) - }) - - g.Testing("we don't get messages from other topics", func() { - pub("other topic") - - g.TAssertEqual(len(collectPending(topic, consumer)), 2) - g.TAssertEqual(len(collectPending("other topic", consumer)), 1) - }) - - g.Testing("after others commit, pending still returns them", func() { - g.TErrorIf(take(topic, "other consumer")) - messages1 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - g.TErrorIf( - commit("other consumer", messages1[0].uuid), - ) - - messages2 := collectPending(topic, consumer) - g.TAssertEqualI(messages1, messages2) - }) - - g.Testing("committing other topic doesn't change current", func() { - messages1 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - - message := pub("other topic") - - g.TErrorIf(commit(consumer, message.uuid)) - - messages2 := collectPending(topic, consumer) - g.TAssertEqualI(messages1, messages2) - }) - - g.Testing("after commiting, pending doesn't return them again", func() { - messages1 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - - g.TErrorIf(commit(consumer, messages1[0].uuid)) - - messages2 := collectPending(topic, consumer) - g.TAssertEqual(len(messages2), 1) - g.TAssertEqual(messages2[0], messages1[1]) - - g.TErrorIf(commit(consumer, messages1[1].uuid)) - - messages3 := collectPending(topic, consumer) - g.TAssertEqual(len(messages3), 0) - }) - - g.Testing("on deadletter, pending also doesn't return them", func() { - messages0 := collectPending(topic, consumer) - g.TAssertEqual(len(messages0), 0) - - message1 := pub(topic) - message2 := pub(topic) - - messages1 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - - err = toDead(consumer, message1.uuid, uuid.New()) - g.TErrorIf(err) - - messages2 := collectPending(topic, consumer) - g.TAssertEqual(len(messages2), 1) - g.TAssertEqual(messages2[0], message2) - - err = toDead(consumer, message2.uuid, uuid.New()) - g.TErrorIf(err) - - messages3 := collectPending(topic, consumer) - g.TAssertEqual(len(messages3), 0) - }) - - g.Testing("if commits are unordered, pending is still sorted", func() { - message1 := pub(topic) - message2 := pub(topic) - message3 := pub(topic) - - g.TAssertEqual(collectPending(topic, consumer), []messageT{ - message1, - message2, - message3, - }) - - g.TErrorIf(commit(consumer, message2.uuid)) - g.TAssertEqual(collectPending(topic, consumer), []messageT{ - message1, - message3, - }) - - g.TErrorIf(commit(consumer, message1.uuid)) - g.TAssertEqual(collectPending(topic, consumer), []messageT{ - message3, - }) - - g.TErrorIf(commit(consumer, message3.uuid)) - g.TAssertEqual(len(collectPending(topic, consumer)), 0) - }) - - g.Testing("when we're not the owners we get nothing", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - message1 := pub(topic) - message2 := pub(topic) - message3 := pub(topic) - message4 := pub(topic) - message5 := pub(topic) - - expected := []messageT{ - message1, - message2, - message3, - message4, - message5, - } - - g.TAssertEqual(collectPending(topic, consumer), expected) - - err := take(topic, consumer) - g.TErrorIf(err) - - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - err = messageEach(rows, func(messageT) error { - g.Unreachable() - return nil - }) - g.TErrorIf(err) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - pendingClose(), - pendingClose(), - pendingClose(), - )) - }) -} - -func test_commitStmt() { - g.TestStart("commitStmt()") - - const ( - topic = "commit() topic" - payloadStr = "commit() payload" - consumer = "commit() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - commitErr, - toDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - commitClose, - toDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - cmt := func(consumer string, messageID uuid.UUID) error { - g.TErrorIf(take(topic, consumer)) - - return commit(consumer, messageID) - } - - - g.Testing("we can't commit twice", func() { - messageID := pub(topic) - - err1 := cmt(consumer, messageID) - err2 := cmt(consumer, messageID) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("we can't commit non-existent messages", func() { - err := cmt(consumer, uuid.New()) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintNotNull, - ) - }) - - g.Testing("multiple consumers may commit a message", func() { - messageID := pub(topic) - - g.TErrorIf(g.SomeError( - cmt(consumer, messageID), - cmt("other consumer", messageID), - cmt("yet another consumer", messageID), - )) - }) - - g.Testing("a consumer can commit to multiple topics", func() { - messageID1 := pub(topic) - messageID2 := pub("other topic") - messageID3 := pub("yet another topic") - - g.TErrorIf(g.SomeError( - cmt(consumer, messageID1), - cmt(consumer, messageID2), - cmt(consumer, messageID3), - )) - }) - - g.Testing("a consumer can consume many messages from a topic", func() { - messageID1 := pub(topic) - messageID2 := pub(topic) - messageID3 := pub(topic) - - g.TErrorIf(g.SomeError( - cmt(consumer, messageID1), - cmt(consumer, messageID2), - cmt(consumer, messageID3), - )) - }) - - g.Testing("we can't commit a dead message", func() { - messageID := pub(topic) - - err1 := toDead(consumer, messageID, uuid.New()) - err2 := cmt(consumer, messageID) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("error if we don't own the topic/consumer", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - messageID := pub(topic) - - err := take(topic, consumer) - g.TErrorIf(err) - - err = commit(consumer, messageID) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintTrigger, - ) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - commitClose(), - commitClose(), - commitClose(), - )) - }) -} - -func test_toDeadStmt() { - g.TestStart("toDeadStmt()") - - const ( - topic = "toDead() topic" - payloadStr = "toDead() payload" - consumer = "toDead() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - commitErr, - toDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - commitClose, - toDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - asDead := func( - consumer string, - messageID uuid.UUID, - deadletterID uuid.UUID, - ) error { - g.TErrorIf(take(topic, consumer)) - return toDead(consumer, messageID, deadletterID) - } - - - g.Testing("we can't mark as dead twice", func() { - messageID := pub(topic) - - err1 := asDead(consumer, messageID, uuid.New()) - err2 := asDead(consumer, messageID, uuid.New()) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("we can't reuse a deadletter id", func() { - messageID1 := pub(topic) - messageID2 := pub(topic) - deadletterID := uuid.New() - - err1 := asDead(consumer, messageID1, deadletterID) - err2 := asDead(consumer, messageID2, deadletterID) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - - }) - - g.Testing("we can't mark as dead non-existent messages", func() { - err := asDead(consumer, uuid.New(), uuid.New()) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintNotNull, - ) - }) - - g.Testing("multiple consumers may mark a message as dead", func() { - messageID := pub(topic) - - g.TErrorIf(g.SomeError( - asDead(consumer, messageID, uuid.New()), - asDead("another consumer", messageID, uuid.New()), - asDead("yet another consumer", messageID, uuid.New()), - )) - }) - - g.Testing("a consumer can mark as dead in multiple topics", func() { - messageID1 := pub(topic) - messageID2 := pub("other topic") - messageID3 := pub("yet other topic") - - g.TErrorIf(g.SomeError( - asDead(consumer, messageID1, uuid.New()), - asDead(consumer, messageID2, uuid.New()), - asDead(consumer, messageID3, uuid.New()), - )) - }) - - g.Testing("a consumer can produce many deadletters in a topic", func() { - messageID1 := pub(topic) - messageID2 := pub(topic) - messageID3 := pub(topic) - - g.TErrorIf(g.SomeError( - asDead(consumer, messageID1, uuid.New()), - asDead(consumer, messageID2, uuid.New()), - asDead(consumer, messageID3, uuid.New()), - )) - }) - - g.Testing("a consumer can intercalate commits and deadletters", func() { - messageID1 := pub(topic) - messageID2 := pub(topic) - messageID3 := pub(topic) - messageID4 := pub(topic) - messageID5 := pub(topic) - - g.TErrorIf(g.SomeError( - asDead(consumer, messageID1, uuid.New()), - commit(consumer, messageID2), - commit(consumer, messageID3), - asDead(consumer, messageID4, uuid.New()), - commit(consumer, messageID5), - )) - }) - - g.Testing("we can't mark a committed message as dead", func() { - messageID := pub(topic) - - err1 := commit(consumer, messageID) - err2 := asDead(consumer, messageID, uuid.New()) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("error if we don't own the message's consumer/topic", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - messageID1 := pub(topic) - messageID2 := pub(topic) - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - err := toDead(consumer, messageID1, uuid.New()) - g.TErrorIf(err) - - err = take(topic, consumer) - g.TErrorIf(err) - - err = toDead(consumer, messageID2, uuid.New()) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintTrigger, - ) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - toDeadClose(), - toDeadClose(), - toDeadClose(), - )) - }) -} - -func test_replayStmt() { - g.TestStart("replayStmt()") - - const ( - topic = "replay() topic" - payloadStr = "replay() payload" - consumer = "replay() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - replay, replayClose, replayErr := replayStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - replayErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - replayClose, - db.Close, - ) - - pub := func() messageT { - message, err := publish(unsent, uuid.New()) - g.TErrorIf(err) - return message - } - g.TErrorIf(take(topic, consumer)) - - - g.Testing("we can replay a message", func() { - message := pub() - deadletterID := uuid.New() - replayedID := uuid.New() - - err1 := toDead(consumer, message.uuid, deadletterID) - replayed, err2 := replay(deadletterID, replayedID) - g.TErrorIf(g.SomeError(err1, err2)) - - g.TAssertEqual(replayed.uuid, replayedID) - g.TAssertEqual(replayed.id == message.id, false) - g.TAssertEqual(replayed.uuid == message.uuid, false) - }) - - g.Testing("a replayed message keeps its payload", func() { - message := pub() - deadletterID := uuid.New() - err := toDead(consumer, message.uuid, deadletterID) - g.TErrorIf(err) - - replayed, err := replay(deadletterID, uuid.New()) - g.TErrorIf(err) - g.TAssertEqual(message.flowID, replayed.flowID) - g.TAssertEqual(message.payload, replayed.payload) - }) - - g.Testing("we can't replay a dead message twice", func() { - message := pub() - deadletterID := uuid.New() - - err := toDead(consumer, message.uuid, deadletterID) - g.TErrorIf(err) - - _, err1 := replay(deadletterID, uuid.New()) - _, err2 := replay(deadletterID, uuid.New()) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("we cant replay non-existent messages", func() { - _, err := replay(uuid.New(), uuid.New()) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintNotNull, - ) - }) - - g.Testing("messages can die and then be replayed many times", func() { - message := pub() - deadletterID1 := uuid.New() - deadletterID2 := uuid.New() - - err := toDead(consumer, message.uuid, deadletterID1) - g.TErrorIf(err) - - replayed1, err := replay(deadletterID1, uuid.New()) - g.TErrorIf(err) - - err = toDead(consumer, replayed1.uuid, deadletterID2) - g.TErrorIf(err) - - replayed2, err := replay(deadletterID2, uuid.New()) - g.TErrorIf(err) - - g.TAssertEqual(message.flowID, replayed1.flowID) - g.TAssertEqual(replayed1.flowID, replayed2.flowID) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - replayClose(), - replayClose(), - replayClose(), - )) - }) -} - -func test_oneDeadStmt() { - g.TestStart("oneDeadStmt()") - - const ( - topic = "oneDead() topic" - payloadStr = "oneDead() payload" - consumer = "oneDead() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - replay, replayClose, replayErr := replayStmt(cfg) - oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - replayErr, - oneDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - replayClose, - oneDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - - g.Testing("error on missing deadletters", func() { - _, err := oneDead(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("deadletters on other topics don't show for us", func() { - err := toDead(consumer, pub("other topic"), uuid.New()) - g.TErrorIf(err) - - _, err = oneDead(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("deadletters for other consumers don't show for use", func() { - g.TErrorIf(take(topic, "other consumer")) - err := toDead("other consumer", pub(topic), uuid.New()) - g.TErrorIf(err) - - _, err = oneDead(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("after being replayed deadletters aren't returned", func() { - messageID1 := uuid.New() - messageID2 := uuid.New() - messageID3 := uuid.New() - - err1 := toDead(consumer, pub(topic), messageID1) - err2 := toDead(consumer, pub(topic), messageID2) - err3 := toDead(consumer, pub(topic), messageID3) - g.TErrorIf(g.SomeError(err1, err2, err3)) - - deadletter, err := oneDead(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(deadletter.uuid, messageID1) - - _, err = replay(messageID2, uuid.New()) - g.TErrorIf(err) - - deadletter, err = oneDead(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(deadletter.uuid, messageID1) - - _, err = replay(messageID1, uuid.New()) - g.TErrorIf(err) - - deadletter, err = oneDead(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(deadletter.uuid, messageID3) - - _, err = replay(messageID3, uuid.New()) - g.TErrorIf(err) - - _, err = oneDead(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) -} - -func test_deadletterEach() { - g.TestStart("deadletterEach") - - const ( - topic = "deadletterEach() topic" - payloadStr = "deadletterEach() payload" - consumer = "deadletterEach() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - allDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - allDeadClose, - db.Close, - ) - - pub := func() uuid.UUID { - message, err := publish(unsent, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - dead := func(messageID uuid.UUID) uuid.UUID { - deadletterID := uuid.New() - err := toDead(consumer, messageID, deadletterID) - g.TErrorIf(err) - - return deadletterID - } - g.TErrorIf(take(topic, consumer)) - - - g.Testing("not called on empty set", func() { - rows, err := allDead(topic, consumer) - g.TErrorIf(err) - - n := 0 - deadletterEach(rows, func(deadletterT, messageT) error { - n++ - return nil - }) - }) - - g.Testing("the callback is called once for each entry", func() { - expected := []uuid.UUID{ - dead(pub()), - dead(pub()), - dead(pub()), - } - - rows, err := allDead(topic, consumer) - g.TErrorIf(err) - - var deadletterIDs []uuid.UUID - deadletterEach(rows, func( - deadletter deadletterT, - _ messageT, - ) error { - deadletterIDs = append(deadletterIDs, deadletter.uuid) - return nil - }) - - g.TAssertEqual(deadletterIDs, expected) - }) - - g.Testing("we halt if the timestamp is ill-formatted", func() { - messageID := pub() - message_id_bytes := messageID[:] - dead(messageID) - dead(pub()) - dead(pub()) - dead(pub()) - dead(pub()) - - const tmplUpdate = ` - UPDATE "%s_offsets" - SET timestamp = '01-01-1970' - WHERE message_id IN ( - SELECT id FROM "%s_messages" WHERE uuid = ? - ); - ` - sqlUpdate := fmt.Sprintf(tmplUpdate, prefix, prefix) - _, err := db.Exec(sqlUpdate, message_id_bytes) - g.TErrorIf(err) - - rows, err := allDead(topic, consumer) - g.TErrorIf(err) - - n := 0 - err = deadletterEach(rows, func(deadletterT, messageT) error { - n++ - return nil - }) - - g.TAssertEqual( - err, - &time.ParseError{ - Layout: time.RFC3339Nano, - Value: "01-01-1970", - LayoutElem: "2006", - ValueElem: "01-01-1970", - Message: "", - }, - ) - g.TAssertEqual(n, 3) - - const tmplDelete = ` - DELETE FROM "%s_offsets" - WHERE message_id IN ( - SELECT id FROM "%s_messages" WHERE uuid = ? - ); - ` - sqlDelete := fmt.Sprintf(tmplDelete, prefix, prefix) - _, err = db.Exec(sqlDelete, message_id_bytes) - g.TErrorIf(err) - }) - - g.Testing("we halt if the callback returns an error", func() { - myErr := errors.New("early return error") - - rows1, err1 := allDead(topic, consumer) - g.TErrorIf(err1) - - n1 := 0 - err1 = deadletterEach(rows1, func(deadletterT, messageT) error { - n1++ - if n1 == 1 { - return myErr - } - return nil - }) - - rows2, err2 := allDead(topic, consumer) - g.TErrorIf(err2) - - n2 := 0 - err = deadletterEach(rows2, func(deadletterT, messageT) error { - n2++ - return nil - }) - - g.TAssertEqual(err1, myErr) - g.TErrorIf(err2) - g.TAssertEqual(n1, 1) - g.TAssertEqual(n2, 7) - }) -} - -func test_allDeadStmt() { - g.TestStart("allDeadStmt()") - - const ( - topic = "allDead() topic" - payloadStr = "allDead() payload" - consumer = "allDead() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - replay, replayClose, replayErr := replayStmt(cfg) - allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - replayErr, - allDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - replayClose, - allDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - collectAll := func( - topic string, - consumer string, - ) ([]deadletterT, []messageT) { - var ( - deadletters []deadletterT - messages []messageT - ) - eachFn := func( - deadletter deadletterT, - message messageT, - ) error { - deadletters = append(deadletters, deadletter) - messages = append(messages, message) - return nil - } - - rows, err := allDead(topic, consumer) - g.TErrorIf(err) - - err = deadletterEach(rows, eachFn) - g.TErrorIf(err) - - return deadletters, messages - } - - - g.Testing("no entry on empty deadletters", func() { - deadletterIDs, _ := collectAll(topic, consumer) - g.TAssertEqual(len(deadletterIDs), 0) - }) - - g.Testing("deadletters on other topics don't show up", func() { - err := toDead(consumer, pub("other topic"), uuid.New()) - g.TErrorIf(err) - - deadletters, _ := collectAll(topic, consumer) - g.TAssertEqual(len(deadletters), 0) - }) - - g.Testing("deadletters of other consumers don't show up", func() { - g.TErrorIf(take(topic, "other consumer")) - err := toDead("other consumer", pub(topic), uuid.New()) - g.TErrorIf(err) - - deadletterIDs, _ := collectAll(topic, consumer) - g.TAssertEqual(len(deadletterIDs), 0) - }) - - g.Testing("deadletters are given in order", func() { - deadletterIDs := []uuid.UUID{ - uuid.New(), - uuid.New(), - uuid.New(), - } - messageIDs := []uuid.UUID{ - pub(topic), - pub(topic), - pub(topic), - } - - err1 := toDead(consumer, messageIDs[0], deadletterIDs[0]) - err2 := toDead(consumer, messageIDs[1], deadletterIDs[1]) - err3 := toDead(consumer, messageIDs[2], deadletterIDs[2]) - g.TErrorIf(g.SomeError(err1, err2, err3)) - - deadletters, messages := collectAll(topic, consumer) - g.TAssertEqual(deadletters[0].uuid, deadletterIDs[0]) - g.TAssertEqual(deadletters[1].uuid, deadletterIDs[1]) - g.TAssertEqual(deadletters[2].uuid, deadletterIDs[2]) - g.TAssertEqual( messages[0].uuid, messageIDs[0]) - g.TAssertEqual( messages[1].uuid, messageIDs[1]) - g.TAssertEqual( messages[2].uuid, messageIDs[2]) - g.TAssertEqual(len(deadletters), 3) - g.TAssertEqual(len(messages), 3) - }) - - g.Testing("after being replayed, they stop appearing", func() { - deadletters, _ := collectAll(topic, consumer) - g.TAssertEqual(len(deadletters), 3) - - _, err := replay(deadletters[0].uuid, uuid.New()) - g.TErrorIf(err) - collecteds, _ := collectAll(topic, consumer) - g.TAssertEqual(len(collecteds), 2) - - _, err = replay(deadletters[1].uuid, uuid.New()) - g.TErrorIf(err) - collecteds, _ = collectAll(topic, consumer) - g.TAssertEqual(len(collecteds), 1) - - _, err = replay(deadletters[2].uuid, uuid.New()) - g.TErrorIf(err) - collecteds, _ = collectAll(topic, consumer) - g.TAssertEqual(len(collecteds), 0) - }) -} - -func test_sizeStmt() { - g.TestStart("sizeStmt()") - - const ( - topic = "size() topic" - payloadStr = "size() payload" - consumer = "size() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - replay, replayClose, replayErr := replayStmt(cfg) - oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) - size, sizeClose, sizeErr := sizeStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - replayErr, - oneDeadErr, - sizeErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - replayClose, - sizeClose, - oneDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - - g.Testing("0 on empty topic", func() { - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("other topics don't fall into our count", func() { - pub("other topic") - - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("otherwise we just get the sum", func() { - pub(topic) - pub(topic) - pub(topic) - pub(topic) - pub(topic) - - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 5) - }) - - g.Testing("deadletters aren't taken into account", func() { - sixthMessageID := pub(topic) - err := toDead(consumer, sixthMessageID, uuid.New()) - g.TErrorIf(err) - - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 6) - }) - - g.Testing("after replay, deadletters increases the size", func() { - deadletter, err := oneDead(topic, consumer) - g.TErrorIf(err) - - _, err = replay(deadletter.uuid, uuid.New()) - g.TErrorIf(err) - - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 7) - }) -} - -func test_countStmt() { - g.TestStart("countStmt()") - - const ( - topic = "count() topic" - payloadStr = "count() payload" - consumer = "count() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - next, nextClose, nextErr := nextStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - count, countClose, countErr := countStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - nextErr, - commitErr, - toDeadErr, - countErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - nextClose, - commitClose, - toDeadClose, - countClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - - g.Testing("0 on empty topic", func() { - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("other topics don't add to our count", func() { - err := commit(consumer, pub("other topic")) - g.TErrorIf(err) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("other consumers don't influence our count", func() { - g.TErrorIf(take(topic, "other consumer")) - err := commit("other consumer", pub(topic)) - g.TErrorIf(err) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("unconsumed messages don't count", func() { - pub(topic) - pub(topic) - pub(topic) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("consumed messages do count", func() { - message, err := next(topic, consumer) - g.TErrorIf(err) - err = commit(consumer, message.uuid) - g.TErrorIf(err) - - message, err = next(topic, consumer) - g.TErrorIf(err) - err = commit(consumer, message.uuid) - g.TErrorIf(err) - - message, err = next(topic, consumer) - g.TErrorIf(err) - err = commit(consumer, message.uuid) - g.TErrorIf(err) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 3) - }) - - g.Testing("deadletters count as consumed", func() { - message, err := next(topic, consumer) - g.TErrorIf(err) - - err = toDead(consumer, message.uuid, uuid.New()) - g.TErrorIf(err) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 4) - }) -} - -func test_hasDataStmt() { - g.TestStart("hasDataStmt()") - - const ( - topic = "hasData() topic" - payloadStr = "hasData() payload" - consumer = "hasData() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - next, nextClose, nextErr := nextStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - hasData, hasDataClose, hasDataErr := hasDataStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - nextErr, - commitErr, - toDeadErr, - hasDataErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - nextClose, - commitClose, - toDeadClose, - hasDataClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - - g.Testing("false on empty topic", func() { - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, false) - }) - - g.Testing("other topics don't change the response", func() { - pub("other topic") - - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, false) - }) - - g.Testing("published messages flip the flag", func() { - pub(topic) - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, true) - }) - - g.Testing("other consumers don't influence us", func() { - g.TErrorIf(take(topic, "other consumer")) - message, err := next(topic, "other consumer") - g.TErrorIf(err) - - err = commit("other consumer", message.uuid) - g.TErrorIf(err) - - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, true) - }) - - g.Testing("consuming messages unflips the result", func() { - message, err := next(topic, consumer) - g.TErrorIf(err) - - err = commit(consumer, message.uuid) - g.TErrorIf(err) - - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, false) - }) - - g.Testing("same for deadletters", func() { - has0, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has0, false) - - messageID1 := pub(topic) - messageID2 := pub(topic) - - has1, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has1, true) - - err = toDead(consumer, messageID1, uuid.New()) - g.TErrorIf(err) - err = commit(consumer, messageID2) - g.TErrorIf(err) - - has2, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has2, false) - }) -} - -func test_initDB() { - g.TestStart("initDB()") - - const ( - topic = "initDB() topic" - payloadStr = "initDB() payload" - consumer = "initDB() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - var messages []messageT - notifyFn := func(message messageT) { - messages = append(messages, message) - } - - queries, err := initDB(dbpath, prefix, notifyFn, instanceID) - g.TErrorIf(err) - defer queries.close() - - g.TErrorIf(queries.take(topic, consumer)) - - - g.Testing("we can perform all the wrapped operations", func() { - messageID := uuid.New() - newMessageID := uuid.New() - deadletterID := uuid.New() - - messageV1, err := queries.publish(unsent, messageID) - g.TErrorIf(err) - - messageV2, err := queries.next(topic, consumer) - g.TErrorIf(err) - - var messagesV3 []messageT - pendingFn := func(message messageT) error { - messagesV3 = append(messagesV3, message) - return nil - } - err = queries.pending(topic, consumer, pendingFn) - g.TErrorIf(err) - g.TAssertEqual(len(messagesV3), 1) - messageV3 := messagesV3[0] - - err = queries.toDead(consumer, messageID, deadletterID) - g.TErrorIf(err) - - deadletterV1, err := queries.oneDead(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(deadletterV1.uuid, deadletterID) - - var ( - deadlettersV2 []deadletterT - messagesV4 []messageT - ) - deadletterFn := func( - deadletter deadletterT, - message messageT, - ) error { - deadlettersV2 = append(deadlettersV2, deadletter) - messagesV4 = append(messagesV4, message) - return nil - } - err = queries.allDead(topic, consumer, deadletterFn) - g.TErrorIf(err) - g.TAssertEqual(len(deadlettersV2), 1) - g.TAssertEqual(deadlettersV2[0].uuid, deadletterID) - g.TAssertEqual(len(messagesV4), 1) - messageV4 := messagesV4[0] - - g.TAssertEqual(messageV1, messageV2) - g.TAssertEqual(messageV1, messageV3) - g.TAssertEqual(messageV1, messageV4) - - newMessageV1, err := queries.replay(deadletterID, newMessageID) - g.TErrorIf(err) - - err = queries.commit(consumer, newMessageID) - g.TErrorIf(err) - - newMessageV0 := messageV1 - newMessageV0.id = newMessageV1.id - newMessageV0.uuid = newMessageID - newMessageV0.timestamp = newMessageV1.timestamp - g.TAssertEqual(newMessageV1, newMessageV0) - - size, err := queries.size(topic) - g.TErrorIf(err) - g.TAssertEqual(size, 2) - - count, err := queries.count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(count, 2) - - hasData, err := queries.hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(hasData, false) - }) -} - -func test_queriesTclose() { - g.TestStart("queriesT.close()") - - const ( - dbpath = golite.InMemory - prefix = defaultPrefix - ) - - notifyFn := func(messageT) {} - queries, err := initDB(dbpath, prefix, notifyFn, instanceID) - g.TErrorIf(err) - - - g.Testing("closing more than once does not error", func() { - g.TErrorIf(g.SomeError( - queries.close(), - queries.close(), - )) - }) -} - -func test_newPinger() { - g.TestStart("newPinger()") - - g.Testing("onPing() on a closed pinger is a noop", func() { - pinger := newPinger[string]() - pinger.close() - pinger.onPing(func(s string) { - panic(s) - }) - pinger.tryPing("ignored") - }) - - g.Testing("onPing() on a closed pinger with data gets that", func() { - pinger := newPinger[string]() - pinger.tryPing("received") - pinger.tryPing("ignored") - g.TAssertEqual(pinger.closed(), false) - - pinger.close() - g.TAssertEqual(pinger.closed(), true) - - c := make(chan string) - count := 0 - go pinger.onPing(func(s string) { - if count > 0 { - panic(s) - } - count++ - - c <- s - }) - - given := <- c - g.TAssertEqual(given, "received") - }) - - g.Testing("when onPing is late, we loose messages", func() { - pinger := newPinger[string]() - pinger.tryPing("first seen") - pinger.tryPing("second dropped") - pinger.tryPing("third dropped") - - c := make(chan string) - go pinger.onPing(func(s string) { - c <- s - }) - s := <- c - - close(c) - pinger.close() - g.TAssertEqual(s, "first seen") - }) - - g.Testing("if onPing is on time, it may not loose", func() { - pinger := newPinger[string]() - pinger.tryPing("first seen") - - c := make(chan string) - go pinger.onPing(func(s string) { - c <- s - }) - - s1 := <- c - pinger.tryPing("second seen") - s2 := <- c - - close(c) - pinger.close() - g.TAssertEqual(s1, "first seen") - g.TAssertEqual(s2, "second seen") - }) - - g.Testing("if onPing takes too long, it still looses messages", func() { - pinger := newPinger[string]() - pinger.tryPing("first seen") - - c1 := make(chan string) - c2 := make(chan struct{}) - go pinger.onPing(func(s string) { - c1 <- s - c2 <- struct{}{} - }) - - s1 := <- c1 - pinger.tryPing("second seen") - pinger.tryPing("third dropped") - <- c2 - s2 := <- c1 - <- c2 - - close(c2) - close(c1) - pinger.close() - g.TAssertEqual(s1, "first seen") - g.TAssertEqual(s2, "second seen") - }) -} - -func test_makeSubscriptionsFunc() { - g.TestStart("makeSubscriptionsFunc()") - - g.Testing("we can have multiple readers", func() { - subscriptions := makeSubscriptionsFuncs() - - var ( - readStarted sync.WaitGroup - readFinished sync.WaitGroup - ) - c := make(chan struct{}) - readFn := func(subscriptionsSetM) error { - readStarted.Done() - <- c - return nil - } - addReader := func() { - readStarted.Add(1) - readFinished.Add(1) - go func() { - subscriptions.read(readFn) - readFinished.Done() - }() - } - - addReader() - addReader() - addReader() - readStarted.Wait() - c <- struct{}{} - c <- struct{}{} - c <- struct{}{} - readFinished.Wait() - }) - - g.Testing("writers are exclusive", func() { - subscriptions := makeSubscriptionsFuncs() - - var ( - readStarted sync.WaitGroup - readFinished sync.WaitGroup - writeWillStart sync.WaitGroup - writeFinished sync.WaitGroup - ) - c := make(chan string) - readFn := func(subscriptionsSetM) error { - readStarted.Done() - c <- "reader" - return nil - } - addReader := func() { - readStarted.Add(1) - readFinished.Add(1) - go func() { - subscriptions.read(readFn) - readFinished.Done() - }() - } - - writeFn := func(subscriptionsSetM) error { - c <- "writer" - return nil - } - addWriter := func() { - writeWillStart.Add(1) - writeFinished.Add(1) - go func() { - writeWillStart.Done() - subscriptions.write(writeFn) - writeFinished.Done() - }() - } - - addReader() - addReader() - addReader() - readStarted.Wait() - addWriter() - writeWillStart.Wait() - - g.TAssertEqual(<-c, "reader") - g.TAssertEqual(<-c, "reader") - g.TAssertEqual(<-c, "reader") - - readFinished.Wait() - g.TAssertEqual(<-c, "writer") - writeFinished.Wait() - }) -} - -func test_makeNotifyFn() { - g.TestStart("makeNotifyFn()") - - g.Testing("when topic is nil only top pinger gets pinged", func() { - pinger1 := newPinger[struct{}]() - pinger2 := newPinger[[]byte]() - defer pinger1.close() - defer pinger2.close() - - go pinger1.onPing(func(struct{}) { - panic("consumer pinger") - }) - go pinger2.onPing(func(payload []byte) { - panic("waiter pinger") - }) - - flowID := uuid.New() - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer-1": consumerT{ - pinger: pinger1, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter-1": waiterT{ - pinger: pinger2, - }, - }, - }, - }, - } - subsFn := func(fn func(subscriptionsSetM) error) error { - return fn(set) - } - topPinger := newPinger[struct{}]() - defer topPinger.close() - - var wg sync.WaitGroup - wg.Add(1) - go topPinger.onPing(func(struct{}) { - wg.Done() - }) - - notifyFn := makeNotifyFn(subsFn, topPinger) - - message := messageT{ - uuid: uuid.New(), - topic: "nobody is subscribed to this one", - payload: []byte("nobody with get this payload"), - } - notifyFn(message) - wg.Wait() - }) - - g.Testing("otherwise all interested subscribers get pinged", func() { - const topic = "the topic name" - - pinger1 := newPinger[struct{}]() - pinger2 := newPinger[[]byte]() - defer pinger1.close() - defer pinger2.close() - - var wg sync.WaitGroup - go pinger1.onPing(func(struct{}) { - wg.Done() - }) - go pinger2.onPing(func([]byte) { - wg.Done() - }) - wg.Add(2) - - flowID := uuid.New() - - set := subscriptionsSetM{ - topic: topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer-1": consumerT{ - pinger: pinger1, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter-1": waiterT{ - pinger: pinger2, - }, - }, - }, - }, - } - - subsFn := func(fn func(subscriptionsSetM) error) error { - return fn(set) - } - - topPinger := newPinger[struct{}]() - defer topPinger.close() - go topPinger.onPing(func(struct{}) { - wg.Done() - }) - wg.Add(1) - - notifyFn := makeNotifyFn(subsFn, topPinger) - - message := messageT{ - uuid: uuid.New(), - topic: topic, - flowID: flowID, - payload: []byte("ignored in this test"), - } - notifyFn(message) - wg.Wait() - }) -} - -func test_collectClosedWaiters() { - g.TestStart("collectClosedWaiter()") - - g.Testing("collects all the reports to be closed", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - flowID4 := uuid.New() - flowID5 := uuid.New() - - mkwaiter := func(closed bool) waiterT { - fn := func() bool { - return closed - } - return waiterT{ - closed: &fn, - } - } - - set := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": mkwaiter(false), - "waiter-2": mkwaiter(true), - "waiter-3": mkwaiter(true), - }, - flowID2: map[string]waiterT{ - "waiter-4": mkwaiter(true), - "waiter-5": mkwaiter(false), - }, - }, - }, - "topic-2": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID3: map[string]waiterT{ - "waiter-1": mkwaiter(false), - "waiter-2": mkwaiter(false), - }, - flowID4: map[string]waiterT{ - "waiter-3": mkwaiter(true), - "waiter-4": mkwaiter(true), - "waiter-5": mkwaiter(true), - "waiter-6": mkwaiter(true), - }, - }, - }, - "topic-3": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID5: map[string]waiterT{}, - }, - }, - "topic-4": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := map[string]map[uuid.UUID][]string{ - "topic-1": map[uuid.UUID][]string{ - flowID1: []string{ - "waiter-2", - "waiter-3", - }, - flowID2: []string{ - "waiter-4", - }, - }, - "topic-2": map[uuid.UUID][]string{ - flowID3: []string{}, - flowID4: []string{ - "waiter-3", - "waiter-4", - "waiter-5", - "waiter-6", - }, - }, - "topic-3": map[uuid.UUID][]string{ - flowID5: []string{}, - }, - "topic-4": map[uuid.UUID][]string{}, - } - - given := collectClosedWaiters(set) - - // sort names for equality - for _, waitersIndex := range given { - for _, names := range waitersIndex { - sort.Strings(names) - } - } - g.TAssertEqual(given, expected) - }) -} - -func test_trimEmptyLeaves() { - g.TestStart("trimEmptyLeaves()") - - g.Testing("noop on an empty index", func() { - input := map[string]map[uuid.UUID][]string{} - expected := map[string]map[uuid.UUID][]string{} - - trimEmptyLeaves(input) - g.TAssertEqual(input, expected) - }) - - g.Testing("simplifies tree when it can", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - flowID4 := uuid.New() - - input := map[string]map[uuid.UUID][]string{ - "topic-1": map[uuid.UUID][]string{ - flowID1: []string{ - "waiter-1", - }, - flowID2: []string{}, - }, - "topic-2": map[uuid.UUID][]string{ - flowID3: []string{}, - flowID4: []string{}, - }, - "topic-3": map[uuid.UUID][]string{}, - } - - expected := map[string]map[uuid.UUID][]string{ - "topic-1": map[uuid.UUID][]string{ - flowID1: []string{ - "waiter-1", - }, - }, - } - - trimEmptyLeaves(input) - g.TAssertEqual(input, expected) - }) - - g.Testing("fully prune tree if possible", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - flowID4 := uuid.New() - flowID5 := uuid.New() - - input := map[string]map[uuid.UUID][]string{ - "topic-1": map[uuid.UUID][]string{}, - "topic-2": map[uuid.UUID][]string{}, - "topic-3": map[uuid.UUID][]string{}, - "topic-4": map[uuid.UUID][]string{ - flowID1: []string{}, - }, - "topic-5": map[uuid.UUID][]string{ - flowID2: []string{}, - flowID3: []string{}, - flowID4: []string{}, - flowID5: []string{}, - }, - } - - expected := map[string]map[uuid.UUID][]string{} - - trimEmptyLeaves(input) - g.TAssertEqual(input, expected) - }) -} - -func test_deleteIfEmpty() { - g.TestStart("deleteIfEmpty()") - - g.Testing("noop if there are consumers", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - }, - } - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - }, - } - expected2 := subscriptionsSetM{} - - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected1) - - delete(set["topic"].consumers, "consumer") - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected2) - }) - - g.Testing("noop if there are waiters", func() { - flowID := uuid.New() - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: nil, - }, - }, - } - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: nil, - }, - }, - } - expected2 := subscriptionsSetM{} - - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected1) - - delete(set["topic"].waiters, flowID) - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected2) - }) - - g.Testing("otherwise deletes when empty", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{} - - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected) - }) - - g.Testing("unrelated topics are left untouched", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - deleteIfEmpty(set, "another-topic") - g.TAssertEqual(set, expected) - }) -} - -func test_deleteEmptyTopics() { - g.TestStart("deleteEmptyTopics()") - - g.Testing("cleans up all empty topics from the set", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - - set := subscriptionsSetM{ - "empty": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - "has-consumers": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - "has-waiters": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: nil, - }, - }, - "has-both": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID2: nil, - }, - }, - "has-neither": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{ - "has-consumers": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - "has-waiters": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: nil, - }, - }, - "has-both": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID2: nil, - }, - }, - } - - deleteEmptyTopics(set) - g.TAssertEqual(set, expected) - }) -} - -func test_removeClosedWaiter() { - g.TestStart("removeClosedWaiter()") - - g.Testing("removes from set all that we request", func() { - flowID0 := uuid.New() - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - - set := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": waiterT{}, - "waiter-2": waiterT{}, - }, - flowID2: map[string]waiterT{ - "waiter-3": waiterT{}, - "waiter-4": waiterT{}, - "waiter-5": waiterT{}, - }, - }, - }, - "topic-2": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID3: map[string]waiterT{ - "waiter-6": waiterT{}, - "waiter-7": waiterT{}, - "waiter-8": waiterT{}, - }, - }, - }, - "topic-3": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - input := map[string]map[uuid.UUID][]string{ - "topic-0": map[uuid.UUID][]string{ - flowID0: []string{ - "waiter-0", - }, - }, - "topic-1": map[uuid.UUID][]string{ - flowID1: []string{ - "waiter-2", - }, - flowID2: []string{ - "waiter-3", - "waiter-4", - "waiter-5", - }, - }, - "topic-2": map[uuid.UUID][]string{ - flowID3: []string{ - "waiter-6", - "waiter-7", - "waiter-8", - }, - }, - } - - expected := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": waiterT{}, - }, - }, - }, - } - - removeClosedWaiters(set, input) - g.TAssertEqual(set, expected) - }) - - g.Testing("empty flowIDs from input GET LEAKED", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - - input := map[string]map[uuid.UUID][]string{ - "topic-2": map[uuid.UUID][]string{ - flowID2: []string{ - "waiter", - }, - }, - } - - set := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{}, - }, - }, - "topic-2": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID2: map[string]waiterT{ - "waiter": waiterT{}, - }, - }, - }, - } - - expected := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{}, - }, - }, - } - - removeClosedWaiters(set, input) - g.TAssertEqual(set, expected) - }) -} - -func test_reapClosedWaiters() { - g.TestStart("reapClosedWaiters()") - - g.Testing("trimEmptyLeaves() prevents removal of empty flows", func() { - var ( - set subscriptionsSetM - readCount = 0 - writeCount = 0 - ) - readFn := func(fn func(subscriptionsSetM) error) error { - readCount++ - return fn(set) - } - writeFn := func(fn func(subscriptionsSetM) error) error { - writeCount++ - return fn(set) - } - - openFn := func() bool { - return false - } - closedFn := func() bool { - return true - } - open := waiterT{ closed: &openFn } - closed := waiterT{ closed: &closedFn } - flowID1 := uuid.New() - flowID2 := uuid.New() - - set = subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": open, - "waiter-2": open, - "waiter-3": open, - }, - flowID2: map[string]waiterT{ - "waiter-4": open, - }, - }, - }, - } - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": open, - "waiter-2": open, - "waiter-3": open, - }, - flowID2: map[string]waiterT{ - "waiter-4": open, - }, - }, - }, - } - - expected2 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-2": open, - "waiter-3": open, - }, - flowID2: map[string]waiterT{ - "waiter-4": open, - }, - }, - }, - } - - expected3 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-2": open, - "waiter-3": open, - }, - }, - }, - } - - reapClosedWaiters(readFn, writeFn) - g.TAssertEqual(readCount, 1) - g.TAssertEqual(writeCount, 0) - g.TAssertEqual(set, expected1) - - set["topic"].waiters[flowID1]["waiter-1"] = closed - reapClosedWaiters(readFn, writeFn) - g.TAssertEqual(readCount, 2) - g.TAssertEqual(writeCount, 1) - g.TAssertEqual(set, expected2) - - set["topic"].waiters[flowID2]["waiter-4"] = closed - reapClosedWaiters(readFn, writeFn) - g.TAssertEqual(readCount, 3) - g.TAssertEqual(writeCount, 2) - g.TAssertEqual(set, expected3) - }) -} - -func test_everyNthCall() { - g.TestStart("everyNthCall()") - - g.Testing("0 (incorrect) would make fn never be called", func() { - never := everyNthCall[int](0, func(int) { - panic("unreachable") - }) - for i := 0; i < 100; i++ { - never(i) - } - }) - - g.Testing("the first call is delayed to be the nth", func() { - count := 0 - values := []string{} - fn := everyNthCall[string](3, func(s string) { - count++ - values = append(values, s) - }) - - fn("1 ignored") - g.TAssertEqual(count, 0) - fn("2 ignored") - g.TAssertEqual(count, 0) - fn("3 not ignored") - g.TAssertEqual(count, 1) - - fn("4 ignored") - fn("5 ignored") - g.TAssertEqual(count, 1) - - fn("6 not ignored") - fn("7 ignored") - fn("8 ignored") - g.TAssertEqual(count, 2) - - fn("9 not ignored") - g.TAssertEqual(count, 3) - - expected := []string{ - "3 not ignored", - "6 not ignored", - "9 not ignored", - } - g.TAssertEqual(values, expected) - }) -} - -func test_runReaper() { - g.TestStart("runReaper()") - - g.Testing("debounce reapClosedWaiters `reaperSkipCount` times", func() { - set := subscriptionsSetM{} - - var ( - readCount = 0 - writeCount = 0 - ) - readFn := func(fn func(subscriptionsSetM) error) error { - readCount++ - return fn(set) - } - writeFn := func(fn func(subscriptionsSetM) error) error { - writeCount++ - return fn(set) - } - - var iterCount int - onPing := func(fn func(struct{})) { - for i := 0; i < iterCount; i++ { - fn(struct{}{}) - } - } - - iterCount = reaperSkipCount - 1 - runReaper(onPing, readFn, writeFn) - g.TAssertEqual(readCount, 0) - g.TAssertEqual(writeCount, 0) - - iterCount = reaperSkipCount - runReaper(onPing, readFn, writeFn) - g.TAssertEqual(readCount, 1) - g.TAssertEqual(writeCount, 0) - }) -} - -func test_NewWithPrefix() { - g.TestStart("NewWithPrefix()") - - g.Testing("we get an error with a bad prefix", func() { - _, err := NewWithPrefix(golite.InMemory, "a bad prefix") - g.TAssertEqual(err, g.ErrBadSQLTablePrefix) - }) - - g.Testing("otherwise we have a queueT and no errors", func() { - queue, err := NewWithPrefix(golite.InMemory, "good") - g.TErrorIf(err) - queue.Close() - - g.TAssertEqual(queue.(queueT).pinger.closed(), true) - }) -} - -func test_New() { - g.TestStart("New()") - - g.Testing("smoke test that we get a queueT", func() { - queue, err := New(golite.InMemory) - g.TErrorIf(err) - queue.Close() - - g.TAssertEqual(queue.(queueT).pinger.closed(), true) - }) -} - -func test_asPublicMessage() { - g.TestStart("asPublicMessage()") - - g.Testing("it picks the correct fields 🤷", func() { - input := messageT{ - uuid: uuid.New(), - timestamp: time.Now(), - topic: "topic", - flowID: uuid.New(), - payload: []byte("payload"), - } - - expected := Message{ - ID: input.uuid, - Timestamp: input.timestamp, - Topic: input.topic, - FlowID: input.flowID, - Payload: input.payload, - } - - given := asPublicMessage(input) - g.TAssertEqual(given, expected) - }) -} - -func test_queueT_Publish() { - g.TestStart("queueT.Publish()") - - const ( - topic = "queueT.Publish() topic" - payloadStr = "queueT.Publish() payload" - dbpath = golite.InMemory - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - queue, err := New(dbpath) - g.TErrorIf(err) - defer queue.Close() - - - g.Testing("it just publishes and returns", func() { - message, err := queue.Publish(unsent) - g.TErrorIf(err) - - g.TAssertEqual(message.Timestamp == time.Time{}, false) - g.TAssertEqual(message.Topic, topic) - g.TAssertEqual(message.FlowID, flowID) - g.TAssertEqual(message.Payload, payload) - }) - - queue.Close() - g.TAssertEqual(queue.(queueT).pinger.closed(), true) -} - -func test_registerConsumerFn() { - g.TestStart("registerConsumerFn()") - - g.Testing("adds a new topicSubscriptionT{} if needed", func() { - consumer := consumerT{ - data: consumerDataT{ - topic: "topic", - name: "consumer", - }, - } - - set := subscriptionsSetM{} - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumer, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - registerConsumerFn(consumer)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("otherwise it just uses what exists", func() { - flowID := uuid.New() - - consumer := consumerT{ - data: consumerDataT{ - topic: "topic", - name: "consumer", - }, - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "other-consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{}, - }, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumer, - "other-consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{}, - }, - }, - } - - registerConsumerFn(consumer)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("overwrites existing consumer if desired", func() { - close1 := func() {} - consumer1 := consumerT{ - data: consumerDataT{ - topic: "topic", - name: "consumer", - }, - close: &close1, - } - - close2 := func() {} - consumer2 := consumerT{ - data: consumerDataT{ - topic: "topic", - name: "consumer", - }, - close: &close2, - } - - set := subscriptionsSetM{} - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumer1, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected2 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumer2, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - registerConsumerFn(consumer1)(set) - g.TAssertEqual(set, expected1) - g.TAssertEqual(reflect.DeepEqual(set, expected2), false) - - registerConsumerFn(consumer2)(set) - g.TAssertEqual(set, expected2) - g.TAssertEqual(reflect.DeepEqual(set, expected1), false) - }) -} - -func test_registerWaiterFn() { - g.TestStart("registerWaiterFn()") - - g.Testing("adds a new topicSubscriptionT{} if needed", func() { - flowID := uuid.New() - - waiter := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - } - - set := subscriptionsSetM{} - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter, - }, - }, - }, - } - - registerWaiterFn(waiter)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("adds a new waiters map if needed", func() { - flowID := uuid.New() - - waiter := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter, - }, - }, - }, - } - - registerWaiterFn(waiter)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("otherwise it just uses what exists", func() { - flowID := uuid.New() - - waiter := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "other-waiter": waiterT{}, - }, - }, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter, - "other-waiter": waiterT{}, - }, - }, - }, - } - - registerWaiterFn(waiter)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("overwrites existing waiter if desired", func() { - flowID := uuid.New() - - close1 := func() {} - waiter1 := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - close: &close1, - } - - close2 := func() {} - waiter2 := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - close: &close2, - } - - set := subscriptionsSetM{} - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter1, - }, - }, - }, - } - - expected2 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter2, - }, - }, - }, - } - - registerWaiterFn(waiter1)(set) - g.TAssertEqual(set, expected1) - g.TAssertEqual(reflect.DeepEqual(set, expected2), false) - - registerWaiterFn(waiter2)(set) - g.TAssertEqual(set, expected2) - g.TAssertEqual(reflect.DeepEqual(set, expected1), false) - }) -} - -func test_makeConsumeOneFn() { - g.TestStart("makeConsumeOneFn()") - - savedLogger := slog.Default() - - s := new(strings.Builder) - g.SetLoggerOutput(s) - - var ( - successCount int - errorCount int - callbackErr error - successFnErr error - errorFnErr error - messages []Message - successNames []string - successIDs []uuid.UUID - errorNames []string - errorIDs []uuid.UUID - deadletterIDs []uuid.UUID - ) - - data := consumerDataT{ - topic: "topic", - name: "name", - } - - callback := func(message Message) error { - messages = append(messages, message) - return callbackErr - } - - successFn := func(name string, messageID uuid.UUID) error { - successCount++ - successNames = append(successNames, name) - successIDs = append(successIDs, messageID) - return successFnErr - } - - errorFn := func( - name string, - messageID uuid.UUID, - deadletterID uuid.UUID, - ) error { - errorCount++ - errorNames = append(errorNames, name) - errorIDs = append(errorIDs, messageID) - deadletterIDs = append(deadletterIDs, deadletterID) - - return errorFnErr - } - - consumeOneFn := makeConsumeOneFn( - data, - callback, - successFn, - errorFn, - ) - - message1 := messageT{ uuid: uuid.New() } - message2 := messageT{ uuid: uuid.New() } - message3 := messageT{ uuid: uuid.New() } - message4 := messageT{ uuid: uuid.New() } - - - g.Testing("error from successFn() is propagated", func() { - err := consumeOneFn(message1) - g.TErrorIf(err) - g.TAssertEqual(successCount, 1) - g.TAssertEqual(errorCount, 0) - - successFnErr = errors.New("successFn() error") - err = consumeOneFn(message2) - g.TAssertEqual(err, successFnErr) - g.TAssertEqual(successCount, 2) - g.TAssertEqual(errorCount, 0) - - g.TAssertEqual(s.String(), "") - }) - - g.Testing("error from callback() is logged and dropped", func() { - *s = strings.Builder{} - - callbackErr = errors.New("callback() error") - err := consumeOneFn(message3) - g.TErrorIf(err) - g.TAssertEqual(successCount, 2) - g.TAssertEqual(errorCount, 1) - g.TAssertEqual(s.String() == "", false) - }) - - g.Testing("error from errorFn() is propagated", func() { - *s = strings.Builder{} - - errorFnErr = errors.New("errorFn() error") - err := consumeOneFn(message4) - g.TAssertEqual(err, errorFnErr) - g.TAssertEqual(successCount, 2) - g.TAssertEqual(errorCount, 2) - g.TAssertEqual(s.String() == "", false) - }) - - g.Testing("calls happened with the expected arguments", func() { - expectedMessages := []Message{ - asPublicMessage(message1), - asPublicMessage(message2), - asPublicMessage(message3), - asPublicMessage(message4), - } - - g.TAssertEqual(messages, expectedMessages) - g.TAssertEqual(successNames, []string{ "name", "name" }) - g.TAssertEqual(errorNames, []string{ "name", "name" }) - g.TAssertEqual(successIDs, []uuid.UUID{ - message1.uuid, - message2.uuid, - }) - g.TAssertEqual(errorIDs, []uuid.UUID{ - message3.uuid, - message4.uuid, - }) - g.TAssertEqual(deadletterIDs[0] == message3.uuid, false) - g.TAssertEqual(deadletterIDs[1] == message4.uuid, false) - }) - - slog.SetDefault(savedLogger) -} - -func test_makeConsumeAllFn() { - g.TestStart("makeConsumeAllFn()") - - savedLogger := slog.Default() - - s := new(strings.Builder) - g.SetLoggerOutput(s) - - data := consumerDataT{} - - consumeOneFn := func(messageT) error { - return nil - } - - var eachFnErr error - eachFn := func(string, string, func(messageT) error) error { - return eachFnErr - } - - consumeAllFn := makeConsumeAllFn(data, consumeOneFn, eachFn) - - - g.Testing("silent when eachFn() returns no error", func() { - consumeAllFn(struct{}{}) - g.TAssertEqual(s.String(), "") - }) - - g.Testing("logs warning otherwise", func() { - eachFnErr = errors.New("eachFn() error") - consumeAllFn(struct{}{}) - g.TAssertEqual(s.String() == "", false) - }) - - slog.SetDefault(savedLogger) -} - -func test_makeWaitFn() { - g.TestStart("makeWaitFn()") - - g.Testing("all it does is send the data and close things", func() { - callCount := 0 - closeFn := func() { - callCount++ - } - - c := make(chan []byte, 1) - waitFn := makeWaitFn(c, closeFn) - - payload := []byte("payload") - waitFn(payload) - given := <- c - - g.TAssertEqual(given, payload) - g.TAssertEqual(callCount, 1) - }) - - g.Testing("you can call it twice for cases of race condition", func() { - callCount := 0 - closeFn := func() { - callCount++ - } - - c := make(chan []byte, 1) - waitFn := makeWaitFn(c, closeFn) - - payload := []byte("something") - waitFn(payload) - waitFn(payload) - given1 := <- c - given2 := <- c - - g.TAssertEqual(given1, payload) - g.TAssertEqual(given2, []byte(nil)) - }) -} - -func test_runConsumer() { - g.TestStart("runConsumer()") - - g.Testing("calls consumeAllFn() at least one", func() { - onPing := func(fn func(struct{})) {} - - count := 0 - consumeAllFn := func(struct{}) { - count++ - } - - runConsumer(onPing, consumeAllFn) - g.TAssertEqual(count, 1) - }) - - g.Testing("can call consumeAllFn() (onPing + 1) times", func() { - onPing := func(fn func(struct{})) { - fn(struct{}{}) - fn(struct{}{}) - fn(struct{}{}) - } - - count := 0 - consumeAllFn := func(struct{}) { - count++ - } - - runConsumer(onPing, consumeAllFn) - g.TAssertEqual(count, 4) - }) -} - -func test_tryFinding() { - g.TestStart("tryFinding()") - - g.Testing("noop in case of failure", func() { - myErr := errors.New("find() error") - findFn := func(string, uuid.UUID) (messageT, error) { - return messageT{}, myErr - } - - count := 0 - waitFn := func([]byte) { - count++ - } - - - tryFinding(findFn, "topic", uuid.New(), waitFn) - g.TAssertEqual(count, 0) - }) - - g.Testing("calls waitFn in case of success", func() { - payload := []byte("find() payload") - - findFn := func(string, uuid.UUID) (messageT, error) { - return messageT{ payload: payload }, nil - } - - payloads := [][]byte{} - waitFn := func(payload []byte) { - payloads = append(payloads, payload) - } - - - tryFinding(findFn, "topic", uuid.New(), waitFn) - g.TAssertEqual(payloads, [][]byte{ payload }) - }) -} - -func test_queueT_Subscribe() { - g.TestStart("queueT.Subscribe()") - - set := subscriptionsSetM{} - consumed := []uuid.UUID{} - messages := []messageT{ - messageT{ uuid: uuid.New() }, - messageT{ uuid: uuid.New() }, - } - - var takeErr error - queue := queueT{ - queries: queriesT{ - take: func(string, string) error { - return takeErr - }, - pending: func( - topic string, - consumer string, - fn func(messageT) error, - ) error { - for _, message := range messages { - consumed = append( - consumed, - message.uuid, - ) - _ = fn(message) - } - return nil - }, - commit: func( - consumer string, - messageID uuid.UUID, - ) error { - return nil - }, - toDead: func(string, uuid.UUID, uuid.UUID) error { - g.Unreachable() - return nil - }, - }, - subscriptions: subscriptionsT{ - write: func(fn func(subscriptionsSetM) error) error { - return fn(set) - }, - }, - } - - - g.Testing("registers our callback in the set and calls it", func() { - var wg sync.WaitGroup - wg.Add(2) - - queue.Subscribe("topic", "consumer-1", func(Message) error { - wg.Done() - return nil - }) - defer queue.Unsubscribe("topic", "consumer-1") - - wg.Wait() - g.TAssertEqual(consumed, []uuid.UUID{ - messages[0].uuid, - messages[1].uuid, - }) - }) - - g.Testing("our callback also gets called when pinged", func() { - consumed = []uuid.UUID{} - - var wg sync.WaitGroup - wg.Add(4) - - queue.Subscribe("topic", "consumer-2", func(m Message) error { - wg.Done() - return nil - }) - defer queue.Unsubscribe("topic", "consumer-2") - - consumer := set["topic"].consumers["consumer-2"] - consumer.pinger.tryPing(struct{}{}) - - wg.Wait() - - g.TAssertEqual(consumed, []uuid.UUID{ - messages[0].uuid, - messages[1].uuid, - messages[0].uuid, - messages[1].uuid, - }) - }) - - g.Testing("we try to own the topic", func() { - takeErr = errors.New("queueT.Subscribe() 1") - - err := queue.Subscribe("topic", "name", func(Message) error { - g.Unreachable() - return nil - }) - - g.TAssertEqual(err, takeErr) - takeErr = nil - }) - - g.Testing("if we can't own the topic, we don't get registered", func() { - takeErr = errors.New("queueT.Subscribe() 2") - - err := queue.Subscribe("topic", "name", func(Message) error { - g.Unreachable() - return nil - }) - g.TErrorNil(err) - - expected := subscriptionsSetM{} - g.TAssertEqual(set, expected) - - takeErr = nil - }) -} - -func test_queueT_WaitFor() { - g.TestStart("queueT.WaitFor()") - - findErr := errors.New("find() error") - message := messageT{ - payload: []byte("queueT.WaitFor() payload"), - } - - set := subscriptionsSetM{} - - queue := queueT{ - queries: queriesT{ - find: func( - topic string, - flowID uuid.UUID, - ) (messageT, error) { - return message, findErr - }, - }, - subscriptions: subscriptionsT{ - write: func(fn func(subscriptionsSetM) error) error { - return fn(set) - }, - read: func(fn func(subscriptionsSetM) error) error { - return fn(set) - }, - }, - } - - - g.Testing("registers the waiter in the set", func() { - flowID := uuid.New() - - defer queue.WaitFor("topic", flowID, "waiter-1").Close() - - expected := waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter-1", - } - - g.TAssertEqual( - set["topic"].waiters[flowID]["waiter-1"].data, - expected, - ) - }) - - g.Testing("the channel gets a message when waiter is pinged", func() { - flowID := uuid.New() - payload := []byte("sent payload") - - w := queue.WaitFor("topic", flowID, "waiter-2") - defer w.Close() - - waiter := set["topic"].waiters[flowID]["waiter-2"] - waiter.pinger.tryPing(payload) - - given := <- w.Channel - - g.TAssertEqual(given, payload) - g.TAssertEqual((*waiter.closed)(), true) - }) - - g.Testing("we can also WaitFor() after publishing the message", func() { - findErr = nil - flowID := uuid.New() - - w := queue.WaitFor("topic", flowID, "waiter-3") - defer w.Close() - - given := <- w.Channel - - waiter := set["topic"].waiters[flowID]["waiter-3"] - waiter.pinger.tryPing([]byte("ignored")) - - g.TAssertEqual(given, message.payload) - g.TAssertEqual((*waiter.closed)(), true) - }) - - g.Testing("if the data already exists we get it immediatelly", func() { - flowID := uuid.New() - - w := queue.WaitFor("topic", flowID, "waiter-4") - defer w.Close() - - waiter := set["topic"].waiters[flowID]["waiter-4"] - g.TAssertEqual((*waiter.closed)(), true) - - given := <- w.Channel - g.TAssertEqual(given, message.payload) - }) -} - -func test_unsubscribeIfExistsFn() { - g.TestStart("unsubscribeIfExistsFn()") - - g.Testing("noop on empty set", func() { - set := subscriptionsSetM{} - expected := subscriptionsSetM{} - unsubscribeIfExistsFn("topic", "consumer")(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("noop on missing topic", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{}, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{}, - } - - unsubscribeIfExistsFn("other-topic", "consumer")(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("noop on missing consumer", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - }, - } - - unsubscribeIfExistsFn("topic", "other-consumer")(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("closes consumer and removes it from set", func() { - flowID := uuid.New() - - count := 0 - close := func() { - count++ - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{ - close: &close, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{}, - }, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{}, - }, - }, - } - - unsubscribeIfExistsFn("topic", "consumer")(set) - g.TAssertEqual(set, expected) - g.TAssertEqual(count, 1) - }) - - g.Testing("empty topics are also removed", func() { - count := 0 - close := func() { - count++ - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{ - close: &close, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{} - - unsubscribeIfExistsFn("topic", "consumer")(set) - g.TAssertEqual(set, expected) - g.TAssertEqual(count, 1) - }) -} - -func test_queueT_Unsubscribe() { - g.TestStart("queueT.Unsubscribe()") - - g.Testing("calls unsubscribesIfExists() via writeFn()", func() { - closed := false - close := func() { - closed = true - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{ - close: &close, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{} - - queue := queueT{ - subscriptions: subscriptionsT{ - write: func( - fn func(subscriptionsSetM) error, - ) error { - return fn(set) - }, - }, - } - - queue.Unsubscribe("topic", "consumer") - g.TAssertEqual(set, expected) - g.TAssertEqual(closed, true) - }) -} - -func test_cleanSubscriptions() { - g.TestStart("cleanSubscriptions()") - - g.Testing("all consumers and waiters get close()'d", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - - type pairT struct{ - closed func() bool - fn func() - } - - mkclose := func() pairT { - closed := false - return pairT{ - closed: func() bool { - return closed - }, - fn: func() { - closed = true - }, - } - } - - c := mkclose() - g.TAssertEqual(c.closed(), false) - c.fn() - var x bool = c.closed() - g.TAssertEqual(x, true) - - close1 := mkclose() - close2 := mkclose() - close3 := mkclose() - close4 := mkclose() - close5 := mkclose() - close6 := mkclose() - close7 := mkclose() - close8 := mkclose() - - set := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer-1": consumerT{ - close: &close1.fn, - }, - "consumer-2": consumerT{ - close: &close2.fn, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": waiterT{ - close: &close3.fn, - }, - }, - flowID2: map[string]waiterT{ - "waiter-2": waiterT{ - close: &close4.fn, - }, - "waiter-3": waiterT{ - close: &close5.fn, - }, - }, - }, - }, - "topic-2": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer-3": consumerT{ - close: &close6.fn, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID3: map[string]waiterT{ - "waiter-4": waiterT{ - close: &close7.fn, - }, - }, - }, - }, - "topic-3": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - cleanSubscriptions(set) - - given := []bool{ - close1.closed(), - close2.closed(), - close3.closed(), - close4.closed(), - close5.closed(), - close6.closed(), - close7.closed(), - close8.closed(), - } - - expected := []bool{ - true, - true, - true, - true, - true, - true, - true, - false, - } - - g.TAssertEqualI(given, expected) - }) -} - -func test_queueT_Close() { - g.TestStart("queueT.Close()") - - g.Testing("clean pinger, subscriptions and queries", func() { - var ( - pingerCount = 0 - subscriptionsCount = 0 - queriesCount = 0 - - subscriptionsErr = errors.New("subscriptionsT{} error") - queriesErr = errors.New("queriesT{} error") - ) - - queue := queueT{ - queries: queriesT{ - close: func() error{ - queriesCount++ - return queriesErr - }, - }, - subscriptions: subscriptionsT{ - write: func( - func(subscriptionsSetM) error, - ) error { - subscriptionsCount++ - return subscriptionsErr - }, - }, - pinger: pingerT[struct{}]{ - close: func() { - pingerCount++ - }, - }, - } - - err := queue.Close() - g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr)) - g.TAssertEqual(pingerCount, 1) - g.TAssertEqual(subscriptionsCount, 1) - g.TAssertEqual(queriesCount, 1) - }) -} - - -func test_topicGetopt() { - g.TestStart("topicGetopt()") - - g.Testing("checks for required positional argument", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{}, - } - - argsOut, ok := topicGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "Missing TOPIC.\n") - g.TAssertEqual(ok, false) - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("success otherwise", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"a topic"}, - } - - argsOut, ok := topicGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(ok, true) - argsIn.topic = "a topic" - g.TAssertEqual(argsOut, argsIn) - }) -} - -func test_topicConsumerGetopt() { - g.TestStart("topicConsumerGetopt()") - - g.Testing("checks for TOPIC argument", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{}, - } - - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "Missing TOPIC.\n") - g.TAssertEqual(ok, false) - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("we get an error on unsupported flag", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"-Z"}, - } - - const message = "flag provided but not defined: -Z\n" - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), message) - g.TAssertEqual(ok, false) - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("we also get an error on incorrect usage of flags", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"-C"}, - } - - const message = "flag needs an argument: -C\n" - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), message) - g.TAssertEqual(ok, false) - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("we can customize the CONSUMER", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"-C", "custom consumer", "this topic"}, - } - - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(ok, true) - argsIn.topic = "this topic" - argsIn.consumer = "custom consumer" - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("otherwise we get the default one", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"T"}, - } - - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(ok, true) - argsIn.topic = "T" - argsIn.consumer = "default-consumer" - g.TAssertEqual(argsOut, argsIn) - }) -} - -func test_inExec() { - g.TestStart("inExec()") - - const ( - topic = "inExec topic" - payloadStr = "inExec payload" - ) - var ( - payload = []byte(payloadStr) - args = argsT{ - topic: topic, - } - ) - - var ( - publishErr error - messages []messageT - id int64 = 0 - ) - queries := queriesT{ - publish: func( - unsent UnsentMessage, - messageID uuid.UUID, - ) (messageT, error) { - if publishErr != nil { - return messageT{}, publishErr - } - - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: messageID, - topic: unsent.Topic, - flowID: unsent.FlowID, - payload: unsent.Payload, - } - messages = append(messages, message) - return message, nil - }, - } - - - g.Testing("messageID to output when successful", func() { - r := bytes.NewReader(payload) - var w strings.Builder - rc, err := inExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(messages[0].topic, topic) - g.TAssertEqual(messages[0].payload, payload) - g.TAssertEqual(rc, 0) - g.TAssertEqual(w.String(), messages[0].uuid.String() + "\n") - }) - - g.Testing("if reading fails, we return the error", func() { - var r *os.File - var w strings.Builder - rc, err := inExec(args, queries, r, &w) - g.TAssertEqual( - err.Error(), - "invalid argument", - ) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) - - g.Testing("if publishing fails, we return the error", func() { - publishErr = errors.New("publish() error") - r := strings.NewReader("read but not published") - var w strings.Builder - rc, err := inExec(args, queries, r, &w) - g.TAssertEqual(err, publishErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_outExec() { - g.TestStart("outExec()") - - const ( - topic = "outExec topic" - consumer = "outExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - takeErr error - nextErr error - messages []messageT - id int64 = 0 - ) - queries := queriesT{ - take: func(string, string) error { - return takeErr - }, - next: func(string, string) (messageT, error) { - if nextErr != nil { - return messageT{}, nextErr - } - - if len(messages) == 0 { - return messageT{}, sql.ErrNoRows - } - - return messages[0], nil - }, - } - pub := func(payload []byte) { - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - - - g.Testing("exit code 1 when we can't take", func() { - takeErr = errors.New("outExec() take error") - - var w strings.Builder - - rc, err := outExec(args, queries, r, &w) - g.TAssertEqual(err, takeErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - - takeErr = nil - }) - - g.Testing("exit code 3 when no message is available", func() { - var w strings.Builder - - rc, err := outExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 3) - }) - - g.Testing("we get the same message until we commit", func() { - var ( - w1 strings.Builder - w2 strings.Builder - w3 strings.Builder - ) - args := argsT{ - topic: topic, - consumer: consumer, - } - - pub([]byte("first payload")) - pub([]byte("second payload")) - - rc1, err1 := outExec(args, queries, r, &w1) - rc2, err2 := outExec(args, queries, r, &w2) - messages = messages[1:] - rc3, err3 := outExec(args, queries, r, &w3) - - g.TErrorIf(g.SomeError(err1, err2, err3)) - g.TAssertEqual(w1.String(), "first payload\n") - g.TAssertEqual(w2.String(), "first payload\n") - g.TAssertEqual(w3.String(), "second payload\n") - g.TAssertEqual(rc1, 0) - g.TAssertEqual(rc2, 0) - g.TAssertEqual(rc3, 0) - }) - - g.Testing("we propagate the error when the query fails", func() { - nextErr = errors.New("next() error") - var w strings.Builder - rc, err := outExec(args, queries, r, &w) - g.TAssertEqual(err, nextErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_commitExec() { - g.TestStart("commitExec()") - - const ( - topic = "commitExec topic" - consumer = "commitExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - takeErr error - nextErr error - commitErr error - messages []messageT - id int64 = 0 - ) - queries := queriesT{ - take: func(string, string) error { - return takeErr - }, - next: func(string, string) (messageT, error) { - if nextErr != nil { - return messageT{}, nextErr - } - - if len(messages) == 0 { - return messageT{}, sql.ErrNoRows - } - - return messages[0], nil - }, - commit: func(string, uuid.UUID) error { - if commitErr != nil { - return commitErr - } - - messages = messages[1:] - return nil - }, - } - pub := func(payload []byte) { - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - - - g.Testing("error when we can't take", func() { - takeErr = errors.New("commitExec() take error") - - var w strings.Builder - - rc, err := commitExec(args, queries, r, &w) - g.TAssertEqual(err, takeErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - - takeErr = nil - }) - - g.Testing("error when there is nothing to commit", func() { - var w strings.Builder - - rc, err := commitExec(args, queries, r, &w) - g.TAssertEqual(err, sql.ErrNoRows) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) - - g.Testing("messages get committed in order", func() { - var w strings.Builder - - pub([]byte("first payload")) - pub([]byte("second payload")) - pub([]byte("third payload")) - - message1 := messages[0] - g.TAssertEqual(message1.payload, []byte("first payload")) - - rc, err := commitExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - message2 := messages[0] - g.TAssertEqual(message2.payload, []byte("second payload")) - - rc, err = commitExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - message3 := messages[0] - g.TAssertEqual(message3.payload, []byte("third payload")) - - rc, err = commitExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - g.TAssertEqual(len(messages), 0) - }) - - g.Testing("when next() query fails, we propagate its result", func() { - nextErr = errors.New("next() error") - var w strings.Builder - rc, err := commitExec(args, queries, r, &w) - g.TAssertEqual(err, nextErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - nextErr = nil - }) - - g.Testing("we also propagate the error on commit() failure", func() { - commitErr = errors.New("commit() error") - pub([]byte{}) - var w strings.Builder - rc, err := commitExec(args, queries, r, &w) - g.TAssertEqual(err, commitErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_deadExec() { - g.TestStart("deadExec()") - - const ( - topic = "deadExec topic" - consumer = "deadExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - takeErr error - nextErr error - toDeadErr error - messages []messageT - id int64 = 0 - ) - queries := queriesT{ - take: func(string, string) error { - return takeErr - }, - next: func(string, string) (messageT, error) { - if nextErr != nil { - return messageT{}, nextErr - } - - if len(messages) == 0 { - return messageT{}, sql.ErrNoRows - } - - return messages[0], nil - }, - toDead: func( - _ string, - _ uuid.UUID, - deadletterID uuid.UUID, - ) error { - if toDeadErr != nil { - return toDeadErr - } - - messages = messages[1:] - return nil - }, - } - pub := func(payload []byte) { - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - - - g.Testing("error when we can't take", func() { - takeErr = errors.New("deadExec() take error") - - var w strings.Builder - - rc, err := deadExec(args, queries, r, &w) - g.TAssertEqual(err, takeErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - - takeErr = nil - }) - - g.Testing("error when there is nothing to mark as dead", func() { - var w strings.Builder - - rc, err := deadExec(args, queries, r, &w) - g.TAssertEqual(err, sql.ErrNoRows) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) - - g.Testing("the latest message becomes a deadletter", func() { - var w strings.Builder - - pub([]byte("first payload")) - pub([]byte("second payload")) - - message1 := messages[0] - g.TAssertEqual(message1.payload, []byte("first payload")) - - rc, err := deadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - message2 := messages[0] - g.TAssertEqual(message2.payload, []byte("second payload")) - - rc, err = deadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - g.TAssertEqual(len(messages), 0) - }) - - g.Testing("next() error is propagated", func() { - nextErr = errors.New("next() error") - var w strings.Builder - rc, err := deadExec(args, queries, r, &w) - g.TAssertEqual(err, nextErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - nextErr = nil - }) - - g.Testing("toDead() error is propagated", func() { - toDeadErr = errors.New("toDead() error") - pub([]byte{}) - var w strings.Builder - rc, err := deadExec(args, queries, r, &w) - g.TAssertEqual(err, toDeadErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_listDeadExec() { - g.TestStart("listDeadExec()") - - const ( - topic = "listDeadExec topic" - consumer = "listDeadExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - messages []messageT - deadletters []deadletterT - id int64 = 0 - errorIndex = -1 - allDeadErr = errors.New("allDead() error") - ) - queries := queriesT{ - allDead: func( - _ string, - _ string, - callback func(deadletterT, messageT) error, - ) error { - for i, deadletter := range deadletters { - if i == errorIndex { - return allDeadErr - } - - callback(deadletter, messageT{}) - } - - return nil - }, - } - pub := func() { - payload := []byte("ignored payload for this test") - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - commit := func() { - messages = messages[1:] - } - dead := func() { - message := messages[0] - now := time.Now() - deadletter := deadletterT{ - uuid: uuid.New(), - timestamp: now, - consumer: consumer, - messageID: message.uuid, - } - - messages = messages[1:] - deadletters = append(deadletters, deadletter) - } - replay := func() { - pub() - deadletters = deadletters[1:] - } - - - g.Testing("nothing is shown if topic is empty", func() { - var w strings.Builder - - rc, err := listDeadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - }) - - g.Testing("deadletters are printed in order", func() { - var w strings.Builder - - pub() - pub() - pub() - - rc, err := listDeadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - - dead() - commit() - dead() - - expected := fmt.Sprintf( - "%s\t%s\t%s\n%s\t%s\t%s\n", - deadletters[0].uuid.String(), - deadletters[0].timestamp.Format(time.RFC3339), - deadletters[0].consumer, - deadletters[1].uuid.String(), - deadletters[1].timestamp.Format(time.RFC3339), - deadletters[1].consumer, - ) - - rc, err = listDeadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), expected) - g.TAssertEqual(rc, 0) - }) - - g.Testing("deadletters disappear after being replayed", func() { - var ( - w1 strings.Builder - w2 strings.Builder - ) - - replay() - - deadletter := deadletters[0] - expected := fmt.Sprintf( - "%s\t%s\t%s\n", - deadletter.uuid.String(), - deadletter.timestamp.Format(time.RFC3339), - deadletter.consumer, - ) - - rc, err := listDeadExec(args, queries, r, &w1) - g.TErrorIf(err) - g.TAssertEqual(w1.String(), expected) - g.TAssertEqual(rc, 0) - - replay() - - rc, err = listDeadExec(args, queries, r, &w2) - g.TErrorIf(err) - g.TAssertEqual(w2.String(), "") - g.TAssertEqual(rc, 0) - }) - - g.Testing("a database failure interrupts the output", func() { - var w strings.Builder - - pub() - pub() - pub() - dead() - dead() - dead() - - deadletter := deadletters[0] - expected := fmt.Sprintf( - "%s\t%s\t%s\n", - deadletter.uuid.String(), - deadletter.timestamp.Format(time.RFC3339), - deadletter.consumer, - ) - - errorIndex = 1 - rc, err := listDeadExec(args, queries, r, &w) - g.TAssertEqual(err, allDeadErr) - g.TAssertEqual(w.String(), expected) - g.TAssertEqual(rc, 1) - }) -} - -func test_replayExec() { - g.TestStart("replayExec()") - - const ( - topic = "replayExec topic" - consumer = "replayExec consumer" - ) - var ( - w strings.Builder - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - oneDeadErr error - replayErr error - messages []messageT - deadletters []deadletterT - deadMessages []messageT - id int64 = 0 - ) - queries := queriesT{ - oneDead: func(string, string) (deadletterT, error) { - if oneDeadErr != nil { - return deadletterT{}, oneDeadErr - } - - if len(deadletters) == 0 { - return deadletterT{}, sql.ErrNoRows - } - - return deadletters[0], nil - }, - replay: func(uuid.UUID, uuid.UUID) (messageT, error) { - if replayErr != nil { - return messageT{}, replayErr - } - - message := deadMessages[0] - messages = append(messages, message) - deadletters = deadletters[1:] - deadMessages = deadMessages[1:] - return message, nil - }, - } - pub := func(payload []byte) { - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - commit := func() { - messages = messages[1:] - } - dead := func() { - message := messages[0] - deadletter := deadletterT{ uuid: uuid.New() } - - messages = messages[1:] - deadletters = append(deadletters, deadletter) - deadMessages = append(deadMessages, message) - } - next := func() string { - return string(messages[0].payload) - } - - - g.Testing("error when there is nothing to replay", func() { - rc, err := replayExec(args, queries, r, &w) - g.TAssertEqual(err, sql.ErrNoRows) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - - pub([]byte("first payload")) - pub([]byte("second payload")) - pub([]byte("third payload")) - pub([]byte("fourth payload")) - - rc, err = replayExec(args, queries, r, &w) - g.TAssertEqual(err, sql.ErrNoRows) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) - - g.Testing("deadletters are replayed in order", func() { - dead() - commit() - dead() - commit() - - rc, err := replayExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(next(), "first payload") - - commit() - - rc, err = replayExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(next(), "third payload") - }) - - g.Testing("oneDead() error is forwarded", func() { - oneDeadErr = errors.New("oneDead() error") - rc, err := replayExec(args, queries, r, &w) - g.TAssertEqual(err, oneDeadErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - oneDeadErr = nil - }) - - g.Testing("replay() error is also forwarded", func() { - pub([]byte{}) - dead() - - replayErr = errors.New("replay() error") - rc, err := replayExec(args, queries, r, &w) - g.TAssertEqual(err, replayErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_sizeExec() { - g.TestStart("sizeExec()") - - const ( - topic = "sizeExec topic" - consumer = "sizeExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var sizeErr error - queries := queriesT{ - size: func(string) (int, error) { - if sizeErr != nil { - return -1, sizeErr - } - - return 123, nil - }, - } - - - g.Testing("it propagates the error when the query fails", func() { - sizeErr = errors.New("size() error") - var w strings.Builder - rc, err := sizeExec(args, queries, r, &w) - g.TAssertEqual(err, sizeErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - sizeErr = nil - }) - - g.Testing("otherwise it just prints what is was given", func() { - var w strings.Builder - rc, err := sizeExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "123\n") - g.TAssertEqual(rc, 0) - }) -} - -func test_countExec() { - g.TestStart("countExec()") - - const ( - topic = "countExec topic" - consumer = "countExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var countErr error - queries := queriesT{ - count: func(string, string) (int, error) { - if countErr != nil { - return -1, countErr - } - - return 2222, nil - }, - } - - - g.Testing("it propagates the query error", func() { - countErr = errors.New("count() error") - var w strings.Builder - rc, err := countExec(args, queries, r, &w) - g.TAssertEqual(err, countErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - countErr = nil - }) - - g.Testing("otherwise it prints the given count", func() { - var w strings.Builder - rc, err := countExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "2222\n") - g.TAssertEqual(rc, 0) - }) -} - -func test_hasDataExec() { - g.TestStart("hasData()") - - const ( - topic = "hasData topic" - consumer = "hasData consumer" - ) - var ( - w strings.Builder - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - hasData := true - var hasDataErr error - queries := queriesT{ - hasData: func(string, string) (bool, error) { - if hasDataErr != nil { - return false, hasDataErr - } - - return hasData, nil - }, - } - - - g.Testing("it propagates the query error", func() { - hasDataErr = errors.New("hasData() error") - rc, err := hasDataExec(args, queries, r, &w) - g.TAssertEqual(err, hasDataErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - hasDataErr = nil - }) - - g.Testing("otherwise if just returns (not prints) the flag", func() { - hasData = true - rc, err := hasDataExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - hasData = false - rc, err = hasDataExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 1) - - g.TAssertEqual(w.String(), "") - }) -} - -func test_usage() { - g.TestStart("usage()") - - g.Testing("it just writes to io.Writer", func() { - var w strings.Builder - usage("xxx", &w) - const message = - "Usage: xxx [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" - g.TAssertEqual(w.String(), message) - }) - - g.Testing("noop on io.Discard for io.Writer", func() { - usage("AN ERROR IF SEEN ANYWHERE!", io.Discard) - }) -} - -func test_getopt() { - g.TestStart("getopt()") - - const warning = "Missing COMMAND.\n" - const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" - - commandsMap := map[string]commandT{ - "good": commandT{ - name: "good", - getopt: func(args argsT, _ io.Writer) (argsT, bool) { - return args, true - }, - }, - "bad": commandT{ - name: "bad", - getopt: func(args argsT, w io.Writer) (argsT, bool) { - if len(args.args) == 0 { - fmt.Fprintln(w, "no required arg") - return args, false - } - - if args.args[0] != "required" { - fmt.Fprintln(w, "not correct one") - return args, false - } - - args.topic = "a topic" - args.consumer = "a consumer" - return args, true - }, - }, - } - - - g.Testing("we suppress the default error message", func() { - var w strings.Builder - argv := []string{"$0", "-h"} - _, _, rc := getopt(argv, commandsMap, &w) - - g.TAssertEqual(w.String(), usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("we get an error on unsupported flag", func() { - var w strings.Builder - argv := []string{"$0", "-X"} - _, _, rc := getopt(argv, commandsMap, &w) - - const message = "flag provided but not defined: -X\n" - g.TAssertEqual(w.String(), message + usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("we also get an error on incorrect usage of flags", func() { - var w strings.Builder - argv := []string{"$0", "-f"} - _, _, rc := getopt(argv, commandsMap, &w) - - const message = "flag needs an argument: -f\n" - g.TAssertEqual(w.String(), message + usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("error when not given a command", func() { - var w strings.Builder - argv := []string{"$0"} - _, _, rc := getopt(argv, commandsMap, &w) - - g.TAssertEqual(w.String(), warning + usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("error on unknown command", func() { - var w strings.Builder - argv := []string{"$0", "unknown"} - _, _, rc := getopt(argv, commandsMap, &w) - - const message = `Bad COMMAND: "unknown".` + "\n" - g.TAssertEqual(w.String(), message + usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("checks the command usage", func() { - var ( - w1 strings.Builder - w2 strings.Builder - w3 strings.Builder - ) - - argv1 := []string{"$0", "bad"} - argv2 := []string{"$0", "bad", "arg"} - argv3 := []string{"$0", "bad", "required"} - _, _, rc1 := getopt(argv1, commandsMap, &w1) - _, _, rc2 := getopt(argv2, commandsMap, &w2) - args, command, rc3 := getopt(argv3, commandsMap, &w3) - expectedArgs := argsT{ - databasePath: "fiinha.db", - prefix: "fiinha", - command: "bad", - allArgs: argv3, - args: argv3[2:], - topic: "a topic", - consumer: "a consumer", - } - - g.TAssertEqual(w1.String(), "no required arg\n" + usage) - g.TAssertEqual(w2.String(), "not correct one\n" + usage) - g.TAssertEqual(w3.String(), "") - g.TAssertEqual(rc1, 2) - g.TAssertEqual(rc2, 2) - g.TAssertEqual(rc3, 0) - g.TAssertEqual(args, expectedArgs) - g.TAssertEqual(command.name, "bad") - }) - - g.Testing("when given a command we the default values", func() { - var w strings.Builder - args, command, rc := getopt( - []string{"$0", "good"}, - commandsMap, - &w, - ) - expectedArgs := argsT{ - databasePath: "fiinha.db", - prefix: "fiinha", - command: "good", - allArgs: []string{"$0", "good"}, - args: []string{}, - } - - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(args, expectedArgs) - g.TAssertEqual(command.name, "good") - }) - - g.Testing("we can customize both values", func() { - var w strings.Builder - argv := []string{"$0", "-f", "F", "-p", "P", "good"} - args, command, rc := getopt(argv, commandsMap, &w) - expectedArgs := argsT{ - databasePath: "F", - prefix: "P", - command: "good", - allArgs: argv, - args: []string{}, - } - - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(args, expectedArgs) - g.TAssertEqual(command.name, "good") - }) - - g.Testing("a command can have its own commands and options", func() { - var w strings.Builder - argv := []string{"$0", "-f", "F", "good", "-f", "-f", "SUB"} - args, command, rc := getopt(argv, commandsMap, &w) - expectedArgs := argsT{ - databasePath: "F", - prefix: "fiinha", - command: "good", - allArgs: argv, - args: []string{"-f", "-f", "SUB"}, - } - - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(args, expectedArgs) - g.TAssertEqual(command.name, "good") - }) -} - -func test_runCommand() { - g.TestStart("runCommand()") - - g.Testing("returns an error on bad prefix", func() { - stdin := strings.NewReader("") - var ( - stdout strings.Builder - stderr strings.Builder - ) - args := argsT{ - prefix: "a bad prefix", - command: "in", - allArgs: []string{"$0"}, - args: []string{"some topic name"}, - } - rc := runCommand(args, commands["in"], stdin, &stdout, &stderr) - - g.TAssertEqual(rc, 1) - g.TAssertEqual(stdout.String(), "") - g.TAssertEqual(stderr.String(), "Invalid table prefix\n") - }) - - g.Testing("otherwise it build a queueT and calls command", func() { - stdin := strings.NewReader("") - var ( - stdout1 strings.Builder - stdout2 strings.Builder - stderr1 strings.Builder - stderr2 strings.Builder - ) - args1 := argsT{ - prefix: defaultPrefix, - command: "good", - } - args2 := argsT{ - prefix: defaultPrefix, - command: "bad", - } - myErr := errors.New("an error") - good := commandT{ - exec: func( - _ argsT, - _ queriesT, - _ io.Reader, - w io.Writer, - ) (int, error) { - fmt.Fprintf(w, "good text\n") - return 0, nil - }, - } - bad := commandT{ - exec: func( - _ argsT, - _ queriesT, - _ io.Reader, - w io.Writer, - ) (int, error) { - fmt.Fprintf(w, "bad text\n") - return 123, myErr - }, - } - rc1 := runCommand(args1, good, stdin, &stdout1, &stderr1) - rc2 := runCommand(args2, bad, stdin, &stdout2, &stderr2) - - g.TAssertEqual(stdout1.String(), "good text\n") - g.TAssertEqual(stdout2.String(), "bad text\n") - g.TAssertEqual(stderr1.String(), "") - g.TAssertEqual(stderr2.String(), "an error\n") - g.TAssertEqual(rc1, 0) - g.TAssertEqual(rc2, 123) - }) -} - -func test_commands() { - g.TestStart("commands") - - g.Testing("ensure map key and name are in sync", func() { - for key, command := range commands { - g.TAssertEqual(command.name, key) - } - }) -} - - -func dumpQueries() { - queries := []struct{name string; fn func(string) queryT}{ - { "createTables", createTablesSQL }, - { "take", takeSQL }, - { "publish", publishSQL }, - { "find", findSQL }, - { "next", nextSQL }, - { "pending", pendingSQL }, - { "commit", commitSQL }, - { "toDead", toDeadSQL }, - { "replay", replaySQL }, - { "oneDead", oneDeadSQL }, - { "allDead", allDeadSQL }, - { "size", sizeSQL }, - { "count", countSQL }, - { "hasData", hasDataSQL }, - } - for _, query := range queries { - q := query.fn(defaultPrefix) - fmt.Printf("\n-- %s.sql:", query.name) - fmt.Printf("\n-- write:%s\n", q.write) - fmt.Printf("\n-- read:%s\n", q.read) - fmt.Printf("\n-- owner:%s\n", q.owner) - } -} - - - -func MainTest() { - if os.Getenv("TESTING_DUMP_SQL_QUERIES") != "" { - dumpQueries() - return - } - - g.Init() - test_defaultPrefix() - test_serialized() - test_execSerialized() - test_tryRollback() - test_inTx() - test_createTables() - test_takeStmt() - test_publishStmt() - test_findStmt() - test_nextStmt() - test_messageEach() - test_pendingStmt() - test_commitStmt() - test_toDeadStmt() - test_replayStmt() - test_oneDeadStmt() - test_deadletterEach() - test_allDeadStmt() - test_sizeStmt() - test_countStmt() - test_hasDataStmt() - test_initDB() - test_queriesTclose() - test_newPinger() - test_makeSubscriptionsFunc() - test_makeNotifyFn() - test_collectClosedWaiters() - test_trimEmptyLeaves() - test_deleteIfEmpty() - test_deleteEmptyTopics() - test_removeClosedWaiter() - test_reapClosedWaiters() - test_everyNthCall() - test_runReaper() - test_NewWithPrefix() - test_New() - test_asPublicMessage() - test_queueT_Publish() - test_registerConsumerFn() - test_registerWaiterFn() - test_makeConsumeOneFn() - test_makeConsumeAllFn() - test_makeWaitFn() - test_runConsumer() - test_tryFinding() - test_queueT_Subscribe() - test_queueT_WaitFor() - test_unsubscribeIfExistsFn() - test_queueT_Unsubscribe() - test_cleanSubscriptions() - test_queueT_Close() - test_topicGetopt() - test_topicConsumerGetopt() - test_inExec() - test_outExec() - test_commitExec() - test_deadExec() - test_listDeadExec() - test_replayExec() - test_sizeExec() - test_countExec() - test_hasDataExec() - test_usage() - test_getopt() - test_runCommand() - test_commands() -} diff --git a/tests/functional/consume-one-produce-many/fiinha.go b/tests/functional/consume-one-produce-many/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/consume-one-produce-many/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/consume-one-produce-many/main.go b/tests/functional/consume-one-produce-many/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/consume-one-produce-many/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/consumer-with-deadletter/fiinha.go b/tests/functional/consumer-with-deadletter/fiinha.go deleted file mode 100644 index 292d327..0000000 --- a/tests/functional/consumer-with-deadletter/fiinha.go +++ /dev/null @@ -1,85 +0,0 @@ -package fiinha - -import ( - "errors" - "runtime" - - "uuid" - g "gobang" -) - - - -const ( - topicX = "new-event-x" - topicY = "new-event-y" -) - -var forbidden3Err = errors.New("we don't like 3") - - - -func processNewEventXToY(message Message) (UnsentMessage, error) { - payload := string(message.Payload) - if payload == "event 3" { - return UnsentMessage{}, forbidden3Err - } - - newPayload := []byte("processed " + payload) - unsent := UnsentMessage{ - Topic: topicY, - FlowID: message.FlowID, - Payload: newPayload, - } - return unsent, nil -} - - - -func MainTest() { - g.SetLevel(g.LogLevel_None) - - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - databasePath := file + ".db" - queue, err := New(databasePath) - g.TErrorIf(err) - defer queue.Close() - - pub := func(payload []byte, flowID uuid.UUID) { - unsent := UnsentMessage{ - Topic: topicX, - FlowID: flowID, - Payload: payload, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) - } - - - g.Testing("we can WaitFor() a message after a deadletter", func() { - flowID := uuid.New() - - handlerFn := func(message Message) error { - messageY, err := processNewEventXToY(message) - if err != nil { - return err - } - - _, err = queue.Publish(messageY) - return err - } - queue.Subscribe(topicX, "main-worker", handlerFn) - defer queue.Unsubscribe(topicX, "main-worker") - - pub([]byte("event 1"), uuid.New()) - pub([]byte("event 2"), uuid.New()) - pub([]byte("event 3"), uuid.New()) - pub([]byte("event 4"), uuid.New()) - pub([]byte("event 5"), flowID) - - given := <- queue.WaitFor(topicY, flowID, "waiter").Channel - g.TAssertEqual(given, []byte("processed event 5")) - }) -} diff --git a/tests/functional/consumer-with-deadletter/main.go b/tests/functional/consumer-with-deadletter/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/consumer-with-deadletter/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/custom-prefix/fiinha.go b/tests/functional/custom-prefix/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/custom-prefix/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/custom-prefix/main.go b/tests/functional/custom-prefix/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/custom-prefix/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/distinct-consumers-separate-instances/fiinha.go b/tests/functional/distinct-consumers-separate-instances/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/distinct-consumers-separate-instances/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/distinct-consumers-separate-instances/main.go b/tests/functional/distinct-consumers-separate-instances/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/distinct-consumers-separate-instances/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/flow-id/fiinha.go b/tests/functional/flow-id/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/flow-id/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/flow-id/main.go b/tests/functional/flow-id/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/flow-id/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/idempotency/fiinha.go b/tests/functional/idempotency/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/idempotency/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/idempotency/main.go b/tests/functional/idempotency/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/idempotency/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/new-instance-takeover/fiinha.go b/tests/functional/new-instance-takeover/fiinha.go deleted file mode 100644 index 5e6ad4b..0000000 --- a/tests/functional/new-instance-takeover/fiinha.go +++ /dev/null @@ -1,109 +0,0 @@ -package fiinha - -import ( - "runtime" - "os" - - "uuid" - g "gobang" -) - - - -const topic = "topic" - - - -func pub(queue IQueue, topic string, flowID uuid.UUID) { - unsent := UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: []byte{}, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) -} - -func handlerFn(publish func(uuid.UUID)) func(Message) error { - return func(message Message) error { - publish(message.FlowID) - return nil - } -} - -func startInstance( - dbpath string, - instanceID int, - name string, -) (IQueue, error) { - iqueue, err := New(dbpath) - g.TErrorIf(err) - queue := iqueue.(queueT) - - notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger) - queries, err := initDB(dbpath, defaultPrefix, notifyFn, instanceID) - g.TErrorIf(err) - - err = queue.queries.close() - g.TErrorIf(err) - - queue.queries = queries - - pub_ := func(topic string) func(uuid.UUID) { - return func(flowID uuid.UUID) { - pub(queue, topic, flowID) - } - } - - individual := "individual-" + name - shared := "shared" - - queue.Subscribe(topic, individual, handlerFn(pub_(individual))) - queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name))) - - return queue, nil -} - - - -func MainTest() { - g.Init() - - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - dbpath := file + ".db" - instanceID1 := os.Getpid() - instanceID2 := instanceID1 + 1 - - flowID1 := uuid.New() - flowID2 := uuid.New() - - g.Testing("new instances take ownership of topic+name combo", func() { - q1, err := startInstance(dbpath, instanceID1, "first") - g.TErrorIf(err) - defer q1.Close() - - pub(q1, topic, uuid.New()) - pub(q1, topic, uuid.New()) - pub(q1, topic, flowID1) - - <- q1.WaitFor("individual-first", flowID1, "w").Channel - <- q1.WaitFor( "shared-first", flowID1, "w").Channel - - q2, err := startInstance(dbpath, instanceID2, "second") - g.TErrorIf(err) - defer q2.Close() - - <- q2.WaitFor("individual-second", flowID1, "w").Channel - - pub(q2, topic, uuid.New()) - pub(q2, topic, uuid.New()) - pub(q2, topic, flowID2) - - // FIXME: notify multiple instances so we can add this: - // <- q2.WaitFor("individual-first", flowID2, "w").Channel - <- q2.WaitFor("individual-second", flowID2, "w").Channel - <- q2.WaitFor( "shared-second", flowID2, "w").Channel - }) -} diff --git a/tests/functional/new-instance-takeover/main.go b/tests/functional/new-instance-takeover/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/new-instance-takeover/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/wait-after-publish/fiinha.go b/tests/functional/wait-after-publish/fiinha.go deleted file mode 100644 index 71b9b56..0000000 --- a/tests/functional/wait-after-publish/fiinha.go +++ /dev/null @@ -1,54 +0,0 @@ -package fiinha - -import ( - "runtime" - - "uuid" - g "gobang" -) - - - -const topic = "topic" - - - -func MainTest() { - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - databasePath := file + ".db" - queue, err := New(databasePath) - g.TErrorIf(err) - defer queue.Close() - - pub := func(flowID uuid.UUID, payload []byte) { - unsent := UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) - } - - - g.Testing("we can WaitFor() a message before its publishing", func() { - flowID := uuid.New() - waiter := queue.WaitFor(topic, flowID, "waiter").Channel - - pub(flowID, []byte("payload before")) - - given := <- waiter - g.TAssertEqual(given, []byte("payload before")) - }) - - g.Testing("we can also do it after its publishing", func() { - flowID := uuid.New() - - pub(flowID, []byte("payload after")) - - given := <- queue.WaitFor(topic, flowID, "waiter").Channel - g.TAssertEqual(given, []byte("payload after")) - }) -} diff --git a/tests/functional/wait-after-publish/main.go b/tests/functional/wait-after-publish/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/wait-after-publish/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/waiter/fiinha.go b/tests/functional/waiter/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/waiter/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/waiter/main.go b/tests/functional/waiter/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/waiter/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/api-check/fiinha.go b/tests/fuzz/api-check/fiinha.go deleted file mode 100644 index 86801de..0000000 --- a/tests/fuzz/api-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func api(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - // FIXME - if n > 1 { - if n < 2 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - fuzzTargets := []testing.InternalFuzzTarget{ - { "api", api }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/api-check/main.go b/tests/fuzz/api-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/api-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/cli-check/fiinha.go b/tests/fuzz/cli-check/fiinha.go deleted file mode 100644 index 1cb6f37..0000000 --- a/tests/fuzz/cli-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/cli-check/main.go b/tests/fuzz/cli-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/cli-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/equal-produced-consumed-order-check/fiinha.go b/tests/fuzz/equal-produced-consumed-order-check/fiinha.go deleted file mode 100644 index ef2e72a..0000000 --- a/tests/fuzz/equal-produced-consumed-order-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME: produced order is identical to consumed order - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/equal-produced-consumed-order-check/main.go b/tests/fuzz/equal-produced-consumed-order-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/equal-produced-consumed-order-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/exactly-once-check/fiinha.go b/tests/fuzz/exactly-once-check/fiinha.go deleted file mode 100644 index 6ac1fb1..0000000 --- a/tests/fuzz/exactly-once-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME: a message is consumed exactly once - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/exactly-once-check/main.go b/tests/fuzz/exactly-once-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/exactly-once-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/queries-check/fiinha.go b/tests/fuzz/queries-check/fiinha.go deleted file mode 100644 index 1cb6f37..0000000 --- a/tests/fuzz/queries-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/queries-check/main.go b/tests/fuzz/queries-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/queries-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/total-order-check/fiinha.go b/tests/fuzz/total-order-check/fiinha.go deleted file mode 100644 index cb5aa61..0000000 --- a/tests/fuzz/total-order-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME: a consumer gets the messages in total order - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/total-order-check/main.go b/tests/fuzz/total-order-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/total-order-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/integration.sh b/tests/integration.sh deleted file mode 100755 index fcb62ca..0000000 --- a/tests/integration.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -set -eu - -exit diff --git a/tests/main.go b/tests/main.go deleted file mode 100644 index 789b267..0000000 --- a/tests/main.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "fiinha" - -func main() { - fiinha.MainTest() -} diff --git a/tests/queries.sql b/tests/queries.sql deleted file mode 100644 index 241f419..0000000 --- a/tests/queries.sql +++ /dev/null @@ -1,387 +0,0 @@ - --- createTables.sql: --- write: - CREATE TABLE IF NOT EXISTS "fiinha_payloads" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), - topic TEXT NOT NULL, - payload BLOB NOT NULL - ) STRICT; - CREATE INDEX IF NOT EXISTS "fiinha_payloads_topic" - ON "fiinha_payloads"(topic); - - CREATE TABLE IF NOT EXISTS "fiinha_messages" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), - uuid BLOB NOT NULL UNIQUE, - flow_id BLOB NOT NULL, - payload_id INTEGER NOT NULL - REFERENCES "fiinha_payloads"(id) - ) STRICT; - CREATE INDEX IF NOT EXISTS "fiinha_messages_flow_id" - ON "fiinha_messages"(flow_id); - - CREATE TABLE IF NOT EXISTS "fiinha_offsets" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), - consumer TEXT NOT NULL, - message_id INTEGER NOT NULL - REFERENCES "fiinha_messages"(id), - instance_id INTEGER NOT NULL, - UNIQUE (consumer, message_id) - ) STRICT; - CREATE INDEX IF NOT EXISTS "fiinha_offsets_consumer" - ON "fiinha_offsets"(consumer); - - CREATE TABLE IF NOT EXISTS "fiinha_deadletters" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - uuid BLOB NOT NULL UNIQUE, - consumer TEXT NOT NULL, - message_id INTEGER NOT NULL - REFERENCES "fiinha_messages"(id), - instance_id INTEGER NOT NULL, - UNIQUE (consumer, message_id) - ) STRICT; - CREATE INDEX IF NOT EXISTS "fiinha_deadletters_consumer" - ON "fiinha_deadletters"(consumer); - - CREATE TABLE IF NOT EXISTS "fiinha_replays" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - deadletter_id INTEGER NOT NULL UNIQUE - REFERENCES "fiinha_deadletters"(id) , - message_id INTEGER NOT NULL UNIQUE - REFERENCES "fiinha_messages"(id) - ) STRICT; - - CREATE TABLE IF NOT EXISTS "fiinha_owners" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - topic TEXT NOT NULL, - consumer TEXT NOT NULL, - owner_id INTEGER NOT NULL, - UNIQUE (topic, consumer) - ) STRICT; - - CREATE TRIGGER IF NOT EXISTS "fiinha_check_instance_owns_topic" - BEFORE INSERT ON "fiinha_offsets" - WHEN NEW.instance_id != ( - SELECT owner_id FROM "fiinha_owners" - WHERE topic = ( - SELECT "fiinha_payloads".topic - FROM "fiinha_payloads" - JOIN "fiinha_messages" ON "fiinha_payloads".id = - "fiinha_messages".payload_id - WHERE "fiinha_messages".id = NEW.message_id - ) AND consumer = NEW.consumer - ) - BEGIN - SELECT RAISE( - ABORT, - 'instance does not own topic/consumer combo' - ); - END; - - CREATE TRIGGER IF NOT EXISTS "fiinha_check_can_publish_deadletter" - BEFORE INSERT ON "fiinha_deadletters" - WHEN NEW.instance_id != ( - SELECT owner_id FROM "fiinha_owners" - WHERE topic = ( - SELECT "fiinha_payloads".topic - FROM "fiinha_payloads" - JOIN "fiinha_messages" ON "fiinha_payloads".id = - "fiinha_messages".payload_id - WHERE "fiinha_messages".id = NEW.message_id - ) AND consumer = NEW.consumer - ) - BEGIN - SELECT RAISE( - ABORT, - 'Instance does not own topic/consumer combo' - ); - END; - - --- read: - --- owner: - --- take.sql: --- write: - INSERT INTO "fiinha_owners" (topic, consumer, owner_id) - VALUES (?, ?, ?) - ON CONFLICT (topic, consumer) DO - UPDATE SET owner_id=excluded.owner_id; - - --- read: - --- owner: - --- publish.sql: --- write: - INSERT INTO "fiinha_payloads" (topic, payload) - VALUES (?, ?); - - INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id) - VALUES (?, ?, last_insert_rowid()); - - --- read: - SELECT id, timestamp FROM "fiinha_messages" - WHERE uuid = ?; - - --- owner: - --- find.sql: --- write: - --- read: - SELECT - "fiinha_messages".id, - "fiinha_messages".timestamp, - "fiinha_messages".uuid, - "fiinha_payloads".payload - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_messages".flow_id = ? - ORDER BY "fiinha_messages".id DESC - LIMIT 1; - - --- owner: - --- next.sql: --- write: - --- read: - SELECT - ( - SELECT owner_id FROM "fiinha_owners" - WHERE - topic = ? AND - consumer = ? - LIMIT 1 - ) AS owner_id, - "fiinha_messages".id, - "fiinha_messages".timestamp, - "fiinha_messages".uuid, - "fiinha_messages".flow_id, - "fiinha_payloads".payload - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_messages".id NOT IN ( - SELECT message_id FROM "fiinha_offsets" - WHERE consumer = ? - ) - ORDER BY "fiinha_messages".id ASC - LIMIT 1; - - --- owner: - --- pending.sql: --- write: - --- read: - SELECT - "fiinha_messages".id, - "fiinha_messages".timestamp, - "fiinha_messages".uuid, - "fiinha_messages".flow_id, - "fiinha_payloads".topic, - "fiinha_payloads".payload - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_messages".id NOT IN ( - SELECT message_id FROM "fiinha_offsets" - WHERE consumer = ? - ) - ORDER BY "fiinha_messages".id ASC; - - --- owner: - SELECT owner_id FROM "fiinha_owners" - WHERE - topic = ? AND - consumer = ?; - - --- commit.sql: --- write: - INSERT INTO "fiinha_offsets" (consumer, message_id, instance_id) - VALUES (?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?); - - --- read: - --- owner: - --- toDead.sql: --- write: - INSERT INTO "fiinha_offsets" - ( consumer, message_id, instance_id) - VALUES ( ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?); - - INSERT INTO "fiinha_deadletters" - (uuid, consumer, message_id, instance_id) - VALUES (?, ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?); - - --- read: - --- owner: - --- replay.sql: --- write: - INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id) - SELECT - ?, - "fiinha_messages".flow_id, - "fiinha_messages".payload_id - FROM "fiinha_messages" - JOIN "fiinha_deadletters" ON - "fiinha_messages".id = "fiinha_deadletters".message_id - WHERE "fiinha_deadletters".uuid = ?; - - INSERT INTO "fiinha_replays" (deadletter_id, message_id) - VALUES ( - (SELECT id FROM "fiinha_deadletters" WHERE uuid = ?), - last_insert_rowid() - ); - - --- read: - SELECT - "fiinha_messages".id, - "fiinha_messages".timestamp, - "fiinha_messages".flow_id, - "fiinha_payloads".topic, - "fiinha_payloads".payload - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE "fiinha_messages".uuid = ?; - - --- owner: - --- oneDead.sql: --- write: - --- read: - SELECT - "fiinha_deadletters".uuid, - "fiinha_offsets".timestamp, - "fiinha_messages".uuid - FROM "fiinha_deadletters" - JOIN "fiinha_offsets" ON - "fiinha_deadletters".message_id = "fiinha_offsets".message_id - JOIN "fiinha_messages" ON - "fiinha_deadletters".message_id = "fiinha_messages".id - JOIN "fiinha_payloads" ON - "fiinha_messages".payload_id = "fiinha_payloads".id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_deadletters".consumer = ? AND - "fiinha_offsets".consumer = ? AND - "fiinha_deadletters".id NOT IN ( - SELECT deadletter_id FROM "fiinha_replays" - ) - ORDER BY "fiinha_deadletters".id ASC - LIMIT 1; - - --- owner: - --- allDead.sql: --- write: - --- read: - SELECT - "fiinha_deadletters".uuid, - "fiinha_deadletters".message_id, - "fiinha_offsets".timestamp, - "fiinha_offsets".consumer, - "fiinha_messages".timestamp, - "fiinha_messages".uuid, - "fiinha_messages".flow_id, - "fiinha_payloads".topic, - "fiinha_payloads".payload - FROM "fiinha_deadletters" - JOIN "fiinha_offsets" ON - "fiinha_deadletters".message_id = "fiinha_offsets".message_id - JOIN "fiinha_messages" ON - "fiinha_deadletters".message_id = "fiinha_messages".id - JOIN "fiinha_payloads" ON - "fiinha_messages".payload_id = "fiinha_payloads".id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_deadletters".consumer = ? AND - "fiinha_offsets".consumer = ? AND - "fiinha_deadletters".id NOT IN ( - SELECT deadletter_id FROM "fiinha_replays" - ) - ORDER BY "fiinha_deadletters".id ASC; - - --- owner: - --- size.sql: --- write: - --- read: - SELECT - COUNT(1) as size - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_messages".payload_id = "fiinha_payloads".id - WHERE "fiinha_payloads".topic = ?; - - --- owner: - --- count.sql: --- write: - --- read: - SELECT - COUNT(1) as count - FROM "fiinha_messages" - JOIN "fiinha_offsets" ON - "fiinha_messages".id = "fiinha_offsets".message_id - JOIN "fiinha_payloads" ON - "fiinha_messages".payload_id = "fiinha_payloads".id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_offsets".consumer = ?; - - --- owner: - --- hasData.sql: --- write: - --- read: - SELECT 1 as data - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_messages".id NOT IN ( - SELECT message_id FROM "fiinha_offsets" - WHERE consumer = ? - ) - LIMIT 1; - - --- owner: |
