aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2025-12-05 19:57:18 -0300
committerEuAndreh <eu@euandre.org>2025-12-05 19:57:18 -0300
commit30a25019a627e23ae6696455a280985f4844d9a8 (patch)
tree4b825dc642cb6eb9a060e54bf8d69288fbee4904
parentUpdate to latest project skeleton: i18n and fine-grained fuzzing (diff)
downloadfiinha-30a25019a627e23ae6696455a280985f4844d9a8.tar.gz
fiinha-30a25019a627e23ae6696455a280985f4844d9a8.tar.xz
rm -rf *: setup change to Clojure
-rw-r--r--.gitignore20
-rw-r--r--Makefile209
-rw-r--r--deps.mk281
-rw-r--r--doc/fiinha.en.0.adoc5
-rw-r--r--meta.capim9
-rwxr-xr-xmkdeps.sh49
-rw-r--r--po/doc/de.po22
-rw-r--r--po/doc/doc.pot22
-rw-r--r--po/doc/eo.po22
-rw-r--r--po/doc/es.po22
-rw-r--r--po/doc/fr.po22
-rw-r--r--po/doc/note.txt5
-rw-r--r--po/doc/po4a.cfg5
-rw-r--r--po/doc/pt.po22
-rw-r--r--po/fiinha/de.po5
-rw-r--r--po/fiinha/eo.po5
-rw-r--r--po/fiinha/es.po5
-rw-r--r--po/fiinha/fiinha.pot5
-rw-r--r--po/fiinha/fr.po5
-rw-r--r--po/fiinha/po4a.cfg3
-rw-r--r--po/fiinha/pt.po5
-rw-r--r--src/fiinha.go2517
-rw-r--r--src/main.go7
-rw-r--r--tests/benchmarks/deadletters/fiinha.go24
l---------tests/benchmarks/deadletters/main.go1
-rw-r--r--tests/benchmarks/lookup/fiinha.go24
l---------tests/benchmarks/lookup/main.go1
-rw-r--r--tests/benchmarks/multiple-consumers/fiinha.go24
l---------tests/benchmarks/multiple-consumers/main.go1
-rw-r--r--tests/benchmarks/multiple-produces/fiinha.go24
l---------tests/benchmarks/multiple-produces/main.go1
-rw-r--r--tests/benchmarks/reaper/fiinha.go24
l---------tests/benchmarks/reaper/main.go1
-rw-r--r--tests/benchmarks/replay/fiinha.go24
l---------tests/benchmarks/replay/main.go1
-rw-r--r--tests/benchmarks/single-consumer/fiinha.go24
l---------tests/benchmarks/single-consumer/main.go1
-rw-r--r--tests/benchmarks/single-producer/fiinha.go24
l---------tests/benchmarks/single-producer/main.go1
-rw-r--r--tests/benchmarks/subscribe/fiinha.go24
l---------tests/benchmarks/subscribe/main.go1
-rw-r--r--tests/benchmarks/unsubscribe/fiinha.go24
l---------tests/benchmarks/unsubscribe/main.go1
-rw-r--r--tests/benchmarks/waiter/fiinha.go24
l---------tests/benchmarks/waiter/main.go1
-rwxr-xr-xtests/cli-opts.sh4
-rw-r--r--tests/fiinha.go5889
-rw-r--r--tests/functional/consume-one-produce-many/fiinha.go5
l---------tests/functional/consume-one-produce-many/main.go1
-rw-r--r--tests/functional/consumer-with-deadletter/fiinha.go85
l---------tests/functional/consumer-with-deadletter/main.go1
-rw-r--r--tests/functional/custom-prefix/fiinha.go5
l---------tests/functional/custom-prefix/main.go1
-rw-r--r--tests/functional/distinct-consumers-separate-instances/fiinha.go5
l---------tests/functional/distinct-consumers-separate-instances/main.go1
-rw-r--r--tests/functional/flow-id/fiinha.go5
l---------tests/functional/flow-id/main.go1
-rw-r--r--tests/functional/idempotency/fiinha.go5
l---------tests/functional/idempotency/main.go1
-rw-r--r--tests/functional/new-instance-takeover/fiinha.go109
l---------tests/functional/new-instance-takeover/main.go1
-rw-r--r--tests/functional/wait-after-publish/fiinha.go54
l---------tests/functional/wait-after-publish/main.go1
-rw-r--r--tests/functional/waiter/fiinha.go5
l---------tests/functional/waiter/main.go1
-rw-r--r--tests/fuzz/api-check/fiinha.go35
l---------tests/fuzz/api-check/main.go1
-rw-r--r--tests/fuzz/cli-check/fiinha.go35
l---------tests/fuzz/cli-check/main.go1
-rw-r--r--tests/fuzz/equal-produced-consumed-order-check/fiinha.go35
l---------tests/fuzz/equal-produced-consumed-order-check/main.go1
-rw-r--r--tests/fuzz/exactly-once-check/fiinha.go35
l---------tests/fuzz/exactly-once-check/main.go1
-rw-r--r--tests/fuzz/queries-check/fiinha.go35
l---------tests/fuzz/queries-check/main.go1
-rw-r--r--tests/fuzz/total-order-check/fiinha.go35
l---------tests/fuzz/total-order-check/main.go1
-rwxr-xr-xtests/integration.sh4
-rw-r--r--tests/main.go7
-rw-r--r--tests/queries.sql387
80 files changed, 0 insertions, 10341 deletions
diff --git a/.gitignore b/.gitignore
deleted file mode 100644
index 114f11d..0000000
--- a/.gitignore
+++ /dev/null
@@ -1,20 +0,0 @@
-/doc/*
-!/doc/*.en.*.adoc
-/po/*/*.mo
-/src/meta.go
-/*.bin
-/*.db*
-/src/*.a
-/src/*.bin
-/src/*cgo*
-/tests/*.a
-/tests/*.bin
-/tests/functional/*/*.a
-/tests/functional/*/*.bin
-/tests/functional/*/*.go.db*
-/tests/fuzz/*/*.a
-/tests/fuzz/*/*.bin
-/tests/benchmarks/*/*.a
-/tests/benchmarks/*/*.bin
-/tests/benchmarks/*/*.txt
-/tests/fuzz/corpus/
diff --git a/Makefile b/Makefile
deleted file mode 100644
index 5eec04a..0000000
--- a/Makefile
+++ /dev/null
@@ -1,209 +0,0 @@
-.POSIX:
-DATE = 1970-01-01
-VERSION = 0.1.0
-NAME = fiinha
-NAME_UC = $(NAME)
-## Installation prefix. Defaults to "/usr".
-PREFIX = /usr
-BINDIR = $(PREFIX)/bin
-LIBDIR = $(PREFIX)/lib
-GOLIBDIR = $(LIBDIR)/go
-INCLUDEDIR = $(PREFIX)/include
-SRCDIR = $(PREFIX)/src/$(NAME)
-SHAREDIR = $(PREFIX)/share
-LOCALEDIR = $(SHAREDIR)/locale
-MANDIR = $(SHAREDIR)/man
-EXEC = ./
-## Where to store the installation. Empty by default.
-DESTDIR =
-LDLIBS = --static -lsqlite3 -lm
-GOCFLAGS = -I $(GOLIBDIR)
-GOLDFLAGS = -L $(GOLIBDIR)
-N = `nproc`
-
-
-
-.SUFFIXES:
-.SUFFIXES: .go .a .bin .bin-check .adoc .po .mo
-
-.go.a:
- go tool compile -I $(@D) $(GOCFLAGS) -o $@ -p $(*F) \
- `find $< $$(if [ $(*F) != main ]; then \
- echo src/$(NAME).go src/meta.go; fi) | uniq`
-
-.a.bin:
- go tool link -L $(@D) $(GOLDFLAGS) -o $@ --extldflags '$(LDLIBS)' $<
-
-.adoc:
- asciidoctor -b manpage -o $@ $<
-
-.po.mo:
- msgfmt -cfv -o $@ $<
-
-
-
-all:
-include deps.mk
-
-
-libs.a = $(libs.go:.go=.a)
-mains.a = $(mains.go:.go=.a)
-mains.bin = $(mains.go:.go=.bin)
-functional/lib.a = $(functional/lib.go:.go=.a)
-fuzz/lib.a = $(fuzz/lib.go:.go=.a)
-benchmarks/lib.a = $(benchmarks/lib.go:.go=.a)
-manpages.N.adoc = $(manpages.en.N.adoc) $(manpages.XX.N.adoc)
-manpages.N = $(manpages.N.adoc:.adoc=)
-sources.mo = $(sources.po:.po=.mo)
-
-sources = \
- src/$(NAME).go \
- src/meta.go \
- src/main.go \
-
-
-derived-assets = \
- src/meta.go \
- $(libs.a) \
- $(mains.a) \
- $(mains.bin) \
- $(NAME).bin \
- $(manpages.XX.N.adoc) \
- $(manpages.N) \
- $(sources.mo) \
-
-side-assets = \
- $(NAME).db* \
- tests/functional/*/*.go.db* \
- tests/fuzz/corpus/ \
- tests/benchmarks/*/main.txt \
-
-
-
-## Default target. Builds all artifacts required for testing
-## and installation.
-all: $(derived-assets)
-
-
-$(libs.a): Makefile deps.mk
-$(libs.a): src/$(NAME).go src/meta.go
-
-
-$(fuzz/lib.a):
- go tool compile $(GOCFLAGS) -o $@ -p $(NAME) -d=libfuzzer \
- $*.go src/$(NAME).go src/meta.go
-
-src/meta.go: Makefile
- echo 'package $(NAME)' > $@
- echo 'const Version = "$(VERSION)"' >> $@
- echo 'const Name = "$(NAME)"' >> $@
- echo 'const LOCALEDIR = "$(LOCALEDIR)"' >> $@
-
-$(NAME).bin: src/main.bin
- ln -fs src/main.bin $@
-
-$(manpages.XX.N.adoc): po/doc/po4a.cfg
- po4a --no-update --translate-only $@ po/doc/po4a.cfg
-
-
-.PRECIOUS: tests/queries.sql
-tests/queries.sql: tests/main.bin ALWAYS
- env TESTING_DUMP_SQL_QUERIES=1 $(EXEC)tests/main.bin | diff -U10 $@ -
-
-tests.bin-check = \
- tests/main.bin-check \
- $(functional/main.go:.go=.bin-check) \
-
-$(tests.bin-check):
- $(EXEC)$*.bin
-
-check-unit: $(tests.bin-check)
-check-unit: tests/queries.sql
-
-
-integration-tests = \
- tests/cli-opts.sh \
- tests/integration.sh \
-
-.PRECIOUS: $(integration-tests)
-$(integration-tests): $(NAME).bin
-$(integration-tests): ALWAYS
- sh $@
-
-check-integration: $(integration-tests)
-check-integration: fuzz
-
-
-## Run all tests. Each test suite is isolated, so that a parallel
-## build can run tests at the same time. The required artifacts
-## are created if missing.
-check: check-unit check-integration
-
-
-
-FUZZSEC=1
-fuzz/main.bin-check = $(fuzz/main.go:.go=.bin-check)
-$(fuzz/main.bin-check):
- $(EXEC)$*.bin --test.fuzztime=$(FUZZSEC)s --test.parallel=$N \
- --test.fuzz='.*' --test.fuzzcachedir=tests/fuzz/corpus
-
-fuzz: $(fuzz/main.bin-check)
-
-
-
-benchmarks/main.bin-check = $(benchmarks/main.go:.go=.bin-check)
-$(benchmarks/main.bin-check):
- printf '%s\n' '$(EXEC)$*.bin' > $*.txt
- LANG=POSIX.UTF-8 time -p $(EXEC)$*.bin 2>> $*.txt
- printf '%s\n' '$*.txt'
-
-bench: $(benchmarks/main.bin-check)
-
-
-
-i18n-doc:
- po4a po/doc/po4a.cfg
-
-i18n-code:
- gotext src/$(NAME).go > po/$(NAME)/$(NAME).pot
- po4a po/$(NAME)/po4a.cfg
-
-i18n: i18n-doc i18n-code
-
-
-
-## Remove *all* derived artifacts produced during the build.
-## A dedicated test asserts that this is always true.
-clean:
- rm -rf $(derived-assets) $(side-assets)
-
-
-## Installs into $(DESTDIR)$(PREFIX). Its dependency target
-## ensures that all installable artifacts are crafted beforehand.
-install: all
- mkdir -p \
- '$(DESTDIR)$(BINDIR)' \
- '$(DESTDIR)$(GOLIBDIR)' \
- '$(DESTDIR)$(SRCDIR)' \
-
- cp $(NAME).bin '$(DESTDIR)$(BINDIR)'/$(NAME)
- cp src/$(NAME).a '$(DESTDIR)$(GOLIBDIR)'
- cp $(sources) '$(DESTDIR)$(SRCDIR)'
- instool '$(DESTDIR)$(MANDIR)' install man $(manpages.N)
- instool '$(DESTDIR)$(LOCALEDIR)' install mo $(sources.mo)
-
-## Uninstalls from $(DESTDIR)$(PREFIX). This is a perfect mirror
-## of the "install" target, and removes *all* that was installed.
-## A dedicated test asserts that this is always true.
-uninstall:
- rm -rf \
- '$(DESTDIR)$(BINDIR)'/$(NAME) \
- '$(DESTDIR)$(GOLIBDIR)'/$(NAME).a \
- '$(DESTDIR)$(SRCDIR)' \
-
- instool '$(DESTDIR)$(MANDIR)' uninstall man $(manpages.N)
- instool '$(DESTDIR)$(LOCALEDIR)' uninstall mo $(sources.mo)
-
-
-
-ALWAYS:
diff --git a/deps.mk b/deps.mk
deleted file mode 100644
index 12012ff..0000000
--- a/deps.mk
+++ /dev/null
@@ -1,281 +0,0 @@
-libs.go = \
- src/fiinha.go \
- tests/benchmarks/deadletters/fiinha.go \
- tests/benchmarks/lookup/fiinha.go \
- tests/benchmarks/multiple-consumers/fiinha.go \
- tests/benchmarks/multiple-produces/fiinha.go \
- tests/benchmarks/reaper/fiinha.go \
- tests/benchmarks/replay/fiinha.go \
- tests/benchmarks/single-consumer/fiinha.go \
- tests/benchmarks/single-producer/fiinha.go \
- tests/benchmarks/subscribe/fiinha.go \
- tests/benchmarks/unsubscribe/fiinha.go \
- tests/benchmarks/waiter/fiinha.go \
- tests/fiinha.go \
- tests/functional/consume-one-produce-many/fiinha.go \
- tests/functional/consumer-with-deadletter/fiinha.go \
- tests/functional/custom-prefix/fiinha.go \
- tests/functional/distinct-consumers-separate-instances/fiinha.go \
- tests/functional/flow-id/fiinha.go \
- tests/functional/idempotency/fiinha.go \
- tests/functional/new-instance-takeover/fiinha.go \
- tests/functional/wait-after-publish/fiinha.go \
- tests/functional/waiter/fiinha.go \
- tests/fuzz/api-check/fiinha.go \
- tests/fuzz/cli-check/fiinha.go \
- tests/fuzz/equal-produced-consumed-order-check/fiinha.go \
- tests/fuzz/exactly-once-check/fiinha.go \
- tests/fuzz/queries-check/fiinha.go \
- tests/fuzz/total-order-check/fiinha.go \
-
-mains.go = \
- src/main.go \
- tests/benchmarks/deadletters/main.go \
- tests/benchmarks/lookup/main.go \
- tests/benchmarks/multiple-consumers/main.go \
- tests/benchmarks/multiple-produces/main.go \
- tests/benchmarks/reaper/main.go \
- tests/benchmarks/replay/main.go \
- tests/benchmarks/single-consumer/main.go \
- tests/benchmarks/single-producer/main.go \
- tests/benchmarks/subscribe/main.go \
- tests/benchmarks/unsubscribe/main.go \
- tests/benchmarks/waiter/main.go \
- tests/functional/consume-one-produce-many/main.go \
- tests/functional/consumer-with-deadletter/main.go \
- tests/functional/custom-prefix/main.go \
- tests/functional/distinct-consumers-separate-instances/main.go \
- tests/functional/flow-id/main.go \
- tests/functional/idempotency/main.go \
- tests/functional/new-instance-takeover/main.go \
- tests/functional/wait-after-publish/main.go \
- tests/functional/waiter/main.go \
- tests/fuzz/api-check/main.go \
- tests/fuzz/cli-check/main.go \
- tests/fuzz/equal-produced-consumed-order-check/main.go \
- tests/fuzz/exactly-once-check/main.go \
- tests/fuzz/queries-check/main.go \
- tests/fuzz/total-order-check/main.go \
- tests/main.go \
-
-manpages.en.N.adoc = \
- doc/fiinha.en.0.adoc \
-
-manpages.XX.N.adoc = \
- doc/fiinha.de.0.adoc \
- doc/fiinha.eo.0.adoc \
- doc/fiinha.es.0.adoc \
- doc/fiinha.fr.0.adoc \
- doc/fiinha.pt.0.adoc \
-
-sources.po = \
- po/fiinha/de.po \
- po/fiinha/eo.po \
- po/fiinha/es.po \
- po/fiinha/fr.po \
- po/fiinha/pt.po \
-
-functional/lib.go = \
- tests/functional/consume-one-produce-many/fiinha.go \
- tests/functional/consumer-with-deadletter/fiinha.go \
- tests/functional/custom-prefix/fiinha.go \
- tests/functional/distinct-consumers-separate-instances/fiinha.go \
- tests/functional/flow-id/fiinha.go \
- tests/functional/idempotency/fiinha.go \
- tests/functional/new-instance-takeover/fiinha.go \
- tests/functional/wait-after-publish/fiinha.go \
- tests/functional/waiter/fiinha.go \
-
-functional/main.go = \
- tests/functional/consume-one-produce-many/main.go \
- tests/functional/consumer-with-deadletter/main.go \
- tests/functional/custom-prefix/main.go \
- tests/functional/distinct-consumers-separate-instances/main.go \
- tests/functional/flow-id/main.go \
- tests/functional/idempotency/main.go \
- tests/functional/new-instance-takeover/main.go \
- tests/functional/wait-after-publish/main.go \
- tests/functional/waiter/main.go \
-
-fuzz/lib.go = \
- tests/fuzz/api-check/fiinha.go \
- tests/fuzz/cli-check/fiinha.go \
- tests/fuzz/equal-produced-consumed-order-check/fiinha.go \
- tests/fuzz/exactly-once-check/fiinha.go \
- tests/fuzz/queries-check/fiinha.go \
- tests/fuzz/total-order-check/fiinha.go \
-
-fuzz/main.go = \
- tests/fuzz/api-check/main.go \
- tests/fuzz/cli-check/main.go \
- tests/fuzz/equal-produced-consumed-order-check/main.go \
- tests/fuzz/exactly-once-check/main.go \
- tests/fuzz/queries-check/main.go \
- tests/fuzz/total-order-check/main.go \
-
-benchmarks/lib.go = \
- tests/benchmarks/deadletters/fiinha.go \
- tests/benchmarks/lookup/fiinha.go \
- tests/benchmarks/multiple-consumers/fiinha.go \
- tests/benchmarks/multiple-produces/fiinha.go \
- tests/benchmarks/reaper/fiinha.go \
- tests/benchmarks/replay/fiinha.go \
- tests/benchmarks/single-consumer/fiinha.go \
- tests/benchmarks/single-producer/fiinha.go \
- tests/benchmarks/subscribe/fiinha.go \
- tests/benchmarks/unsubscribe/fiinha.go \
- tests/benchmarks/waiter/fiinha.go \
-
-benchmarks/main.go = \
- tests/benchmarks/deadletters/main.go \
- tests/benchmarks/lookup/main.go \
- tests/benchmarks/multiple-consumers/main.go \
- tests/benchmarks/multiple-produces/main.go \
- tests/benchmarks/reaper/main.go \
- tests/benchmarks/replay/main.go \
- tests/benchmarks/single-consumer/main.go \
- tests/benchmarks/single-producer/main.go \
- tests/benchmarks/subscribe/main.go \
- tests/benchmarks/unsubscribe/main.go \
- tests/benchmarks/waiter/main.go \
-
-src/fiinha.a: src/fiinha.go
-src/main.a: src/main.go
-tests/benchmarks/deadletters/fiinha.a: tests/benchmarks/deadletters/fiinha.go
-tests/benchmarks/deadletters/main.a: tests/benchmarks/deadletters/main.go
-tests/benchmarks/lookup/fiinha.a: tests/benchmarks/lookup/fiinha.go
-tests/benchmarks/lookup/main.a: tests/benchmarks/lookup/main.go
-tests/benchmarks/multiple-consumers/fiinha.a: tests/benchmarks/multiple-consumers/fiinha.go
-tests/benchmarks/multiple-consumers/main.a: tests/benchmarks/multiple-consumers/main.go
-tests/benchmarks/multiple-produces/fiinha.a: tests/benchmarks/multiple-produces/fiinha.go
-tests/benchmarks/multiple-produces/main.a: tests/benchmarks/multiple-produces/main.go
-tests/benchmarks/reaper/fiinha.a: tests/benchmarks/reaper/fiinha.go
-tests/benchmarks/reaper/main.a: tests/benchmarks/reaper/main.go
-tests/benchmarks/replay/fiinha.a: tests/benchmarks/replay/fiinha.go
-tests/benchmarks/replay/main.a: tests/benchmarks/replay/main.go
-tests/benchmarks/single-consumer/fiinha.a: tests/benchmarks/single-consumer/fiinha.go
-tests/benchmarks/single-consumer/main.a: tests/benchmarks/single-consumer/main.go
-tests/benchmarks/single-producer/fiinha.a: tests/benchmarks/single-producer/fiinha.go
-tests/benchmarks/single-producer/main.a: tests/benchmarks/single-producer/main.go
-tests/benchmarks/subscribe/fiinha.a: tests/benchmarks/subscribe/fiinha.go
-tests/benchmarks/subscribe/main.a: tests/benchmarks/subscribe/main.go
-tests/benchmarks/unsubscribe/fiinha.a: tests/benchmarks/unsubscribe/fiinha.go
-tests/benchmarks/unsubscribe/main.a: tests/benchmarks/unsubscribe/main.go
-tests/benchmarks/waiter/fiinha.a: tests/benchmarks/waiter/fiinha.go
-tests/benchmarks/waiter/main.a: tests/benchmarks/waiter/main.go
-tests/fiinha.a: tests/fiinha.go
-tests/functional/consume-one-produce-many/fiinha.a: tests/functional/consume-one-produce-many/fiinha.go
-tests/functional/consume-one-produce-many/main.a: tests/functional/consume-one-produce-many/main.go
-tests/functional/consumer-with-deadletter/fiinha.a: tests/functional/consumer-with-deadletter/fiinha.go
-tests/functional/consumer-with-deadletter/main.a: tests/functional/consumer-with-deadletter/main.go
-tests/functional/custom-prefix/fiinha.a: tests/functional/custom-prefix/fiinha.go
-tests/functional/custom-prefix/main.a: tests/functional/custom-prefix/main.go
-tests/functional/distinct-consumers-separate-instances/fiinha.a: tests/functional/distinct-consumers-separate-instances/fiinha.go
-tests/functional/distinct-consumers-separate-instances/main.a: tests/functional/distinct-consumers-separate-instances/main.go
-tests/functional/flow-id/fiinha.a: tests/functional/flow-id/fiinha.go
-tests/functional/flow-id/main.a: tests/functional/flow-id/main.go
-tests/functional/idempotency/fiinha.a: tests/functional/idempotency/fiinha.go
-tests/functional/idempotency/main.a: tests/functional/idempotency/main.go
-tests/functional/new-instance-takeover/fiinha.a: tests/functional/new-instance-takeover/fiinha.go
-tests/functional/new-instance-takeover/main.a: tests/functional/new-instance-takeover/main.go
-tests/functional/wait-after-publish/fiinha.a: tests/functional/wait-after-publish/fiinha.go
-tests/functional/wait-after-publish/main.a: tests/functional/wait-after-publish/main.go
-tests/functional/waiter/fiinha.a: tests/functional/waiter/fiinha.go
-tests/functional/waiter/main.a: tests/functional/waiter/main.go
-tests/fuzz/api-check/fiinha.a: tests/fuzz/api-check/fiinha.go
-tests/fuzz/api-check/main.a: tests/fuzz/api-check/main.go
-tests/fuzz/cli-check/fiinha.a: tests/fuzz/cli-check/fiinha.go
-tests/fuzz/cli-check/main.a: tests/fuzz/cli-check/main.go
-tests/fuzz/equal-produced-consumed-order-check/fiinha.a: tests/fuzz/equal-produced-consumed-order-check/fiinha.go
-tests/fuzz/equal-produced-consumed-order-check/main.a: tests/fuzz/equal-produced-consumed-order-check/main.go
-tests/fuzz/exactly-once-check/fiinha.a: tests/fuzz/exactly-once-check/fiinha.go
-tests/fuzz/exactly-once-check/main.a: tests/fuzz/exactly-once-check/main.go
-tests/fuzz/queries-check/fiinha.a: tests/fuzz/queries-check/fiinha.go
-tests/fuzz/queries-check/main.a: tests/fuzz/queries-check/main.go
-tests/fuzz/total-order-check/fiinha.a: tests/fuzz/total-order-check/fiinha.go
-tests/fuzz/total-order-check/main.a: tests/fuzz/total-order-check/main.go
-tests/main.a: tests/main.go
-src/main.bin: src/main.a
-tests/benchmarks/deadletters/main.bin: tests/benchmarks/deadletters/main.a
-tests/benchmarks/lookup/main.bin: tests/benchmarks/lookup/main.a
-tests/benchmarks/multiple-consumers/main.bin: tests/benchmarks/multiple-consumers/main.a
-tests/benchmarks/multiple-produces/main.bin: tests/benchmarks/multiple-produces/main.a
-tests/benchmarks/reaper/main.bin: tests/benchmarks/reaper/main.a
-tests/benchmarks/replay/main.bin: tests/benchmarks/replay/main.a
-tests/benchmarks/single-consumer/main.bin: tests/benchmarks/single-consumer/main.a
-tests/benchmarks/single-producer/main.bin: tests/benchmarks/single-producer/main.a
-tests/benchmarks/subscribe/main.bin: tests/benchmarks/subscribe/main.a
-tests/benchmarks/unsubscribe/main.bin: tests/benchmarks/unsubscribe/main.a
-tests/benchmarks/waiter/main.bin: tests/benchmarks/waiter/main.a
-tests/functional/consume-one-produce-many/main.bin: tests/functional/consume-one-produce-many/main.a
-tests/functional/consumer-with-deadletter/main.bin: tests/functional/consumer-with-deadletter/main.a
-tests/functional/custom-prefix/main.bin: tests/functional/custom-prefix/main.a
-tests/functional/distinct-consumers-separate-instances/main.bin: tests/functional/distinct-consumers-separate-instances/main.a
-tests/functional/flow-id/main.bin: tests/functional/flow-id/main.a
-tests/functional/idempotency/main.bin: tests/functional/idempotency/main.a
-tests/functional/new-instance-takeover/main.bin: tests/functional/new-instance-takeover/main.a
-tests/functional/wait-after-publish/main.bin: tests/functional/wait-after-publish/main.a
-tests/functional/waiter/main.bin: tests/functional/waiter/main.a
-tests/fuzz/api-check/main.bin: tests/fuzz/api-check/main.a
-tests/fuzz/cli-check/main.bin: tests/fuzz/cli-check/main.a
-tests/fuzz/equal-produced-consumed-order-check/main.bin: tests/fuzz/equal-produced-consumed-order-check/main.a
-tests/fuzz/exactly-once-check/main.bin: tests/fuzz/exactly-once-check/main.a
-tests/fuzz/queries-check/main.bin: tests/fuzz/queries-check/main.a
-tests/fuzz/total-order-check/main.bin: tests/fuzz/total-order-check/main.a
-tests/main.bin: tests/main.a
-src/main.bin-check: src/main.bin
-tests/benchmarks/deadletters/main.bin-check: tests/benchmarks/deadletters/main.bin
-tests/benchmarks/lookup/main.bin-check: tests/benchmarks/lookup/main.bin
-tests/benchmarks/multiple-consumers/main.bin-check: tests/benchmarks/multiple-consumers/main.bin
-tests/benchmarks/multiple-produces/main.bin-check: tests/benchmarks/multiple-produces/main.bin
-tests/benchmarks/reaper/main.bin-check: tests/benchmarks/reaper/main.bin
-tests/benchmarks/replay/main.bin-check: tests/benchmarks/replay/main.bin
-tests/benchmarks/single-consumer/main.bin-check: tests/benchmarks/single-consumer/main.bin
-tests/benchmarks/single-producer/main.bin-check: tests/benchmarks/single-producer/main.bin
-tests/benchmarks/subscribe/main.bin-check: tests/benchmarks/subscribe/main.bin
-tests/benchmarks/unsubscribe/main.bin-check: tests/benchmarks/unsubscribe/main.bin
-tests/benchmarks/waiter/main.bin-check: tests/benchmarks/waiter/main.bin
-tests/functional/consume-one-produce-many/main.bin-check: tests/functional/consume-one-produce-many/main.bin
-tests/functional/consumer-with-deadletter/main.bin-check: tests/functional/consumer-with-deadletter/main.bin
-tests/functional/custom-prefix/main.bin-check: tests/functional/custom-prefix/main.bin
-tests/functional/distinct-consumers-separate-instances/main.bin-check: tests/functional/distinct-consumers-separate-instances/main.bin
-tests/functional/flow-id/main.bin-check: tests/functional/flow-id/main.bin
-tests/functional/idempotency/main.bin-check: tests/functional/idempotency/main.bin
-tests/functional/new-instance-takeover/main.bin-check: tests/functional/new-instance-takeover/main.bin
-tests/functional/wait-after-publish/main.bin-check: tests/functional/wait-after-publish/main.bin
-tests/functional/waiter/main.bin-check: tests/functional/waiter/main.bin
-tests/fuzz/api-check/main.bin-check: tests/fuzz/api-check/main.bin
-tests/fuzz/cli-check/main.bin-check: tests/fuzz/cli-check/main.bin
-tests/fuzz/equal-produced-consumed-order-check/main.bin-check: tests/fuzz/equal-produced-consumed-order-check/main.bin
-tests/fuzz/exactly-once-check/main.bin-check: tests/fuzz/exactly-once-check/main.bin
-tests/fuzz/queries-check/main.bin-check: tests/fuzz/queries-check/main.bin
-tests/fuzz/total-order-check/main.bin-check: tests/fuzz/total-order-check/main.bin
-tests/main.bin-check: tests/main.bin
-src/main.a: src/$(NAME).a
-tests/benchmarks/deadletters/main.a: tests/benchmarks/deadletters/$(NAME).a
-tests/benchmarks/lookup/main.a: tests/benchmarks/lookup/$(NAME).a
-tests/benchmarks/multiple-consumers/main.a: tests/benchmarks/multiple-consumers/$(NAME).a
-tests/benchmarks/multiple-produces/main.a: tests/benchmarks/multiple-produces/$(NAME).a
-tests/benchmarks/reaper/main.a: tests/benchmarks/reaper/$(NAME).a
-tests/benchmarks/replay/main.a: tests/benchmarks/replay/$(NAME).a
-tests/benchmarks/single-consumer/main.a: tests/benchmarks/single-consumer/$(NAME).a
-tests/benchmarks/single-producer/main.a: tests/benchmarks/single-producer/$(NAME).a
-tests/benchmarks/subscribe/main.a: tests/benchmarks/subscribe/$(NAME).a
-tests/benchmarks/unsubscribe/main.a: tests/benchmarks/unsubscribe/$(NAME).a
-tests/benchmarks/waiter/main.a: tests/benchmarks/waiter/$(NAME).a
-tests/functional/consume-one-produce-many/main.a: tests/functional/consume-one-produce-many/$(NAME).a
-tests/functional/consumer-with-deadletter/main.a: tests/functional/consumer-with-deadletter/$(NAME).a
-tests/functional/custom-prefix/main.a: tests/functional/custom-prefix/$(NAME).a
-tests/functional/distinct-consumers-separate-instances/main.a: tests/functional/distinct-consumers-separate-instances/$(NAME).a
-tests/functional/flow-id/main.a: tests/functional/flow-id/$(NAME).a
-tests/functional/idempotency/main.a: tests/functional/idempotency/$(NAME).a
-tests/functional/new-instance-takeover/main.a: tests/functional/new-instance-takeover/$(NAME).a
-tests/functional/wait-after-publish/main.a: tests/functional/wait-after-publish/$(NAME).a
-tests/functional/waiter/main.a: tests/functional/waiter/$(NAME).a
-tests/fuzz/api-check/main.a: tests/fuzz/api-check/$(NAME).a
-tests/fuzz/cli-check/main.a: tests/fuzz/cli-check/$(NAME).a
-tests/fuzz/equal-produced-consumed-order-check/main.a: tests/fuzz/equal-produced-consumed-order-check/$(NAME).a
-tests/fuzz/exactly-once-check/main.a: tests/fuzz/exactly-once-check/$(NAME).a
-tests/fuzz/queries-check/main.a: tests/fuzz/queries-check/$(NAME).a
-tests/fuzz/total-order-check/main.a: tests/fuzz/total-order-check/$(NAME).a
-tests/main.a: tests/$(NAME).a
diff --git a/doc/fiinha.en.0.adoc b/doc/fiinha.en.0.adoc
deleted file mode 100644
index 2c4d896..0000000
--- a/doc/fiinha.en.0.adoc
+++ /dev/null
@@ -1,5 +0,0 @@
-= fiinha(0)
-
-== NAME
-
-fiinha - .
diff --git a/meta.capim b/meta.capim
deleted file mode 100644
index 0288ec1..0000000
--- a/meta.capim
+++ /dev/null
@@ -1,9 +0,0 @@
-{
- :dependencies {
- :build #{
- gobang
- golite
- gotext
- }
- }
-}
diff --git a/mkdeps.sh b/mkdeps.sh
deleted file mode 100755
index ae6fffc..0000000
--- a/mkdeps.sh
+++ /dev/null
@@ -1,49 +0,0 @@
-#!/bin/sh
-set -eu
-
-export LANG=POSIX.UTF-8
-
-
-libs() {
- find src tests -name '*.go' |
- grep -Ev '/(main|meta)\.go$' |
- grep -Ev '/_cgo_(import|gotypes)\.go$' |
- grep -Ev '\.cgo1\.go$'
-}
-
-mains() {
- find src tests -name '*.go' | grep '/main\.go$'
-}
-
-docs() {
- find doc/*.en.*.adoc
-}
-
-xdocs() {
- for l in `find po/doc/*.po | xargs -I% basename % .po`; do
- docs | sed 's|/\(.*\)\.en\.\(.*\)$|/\1.'"$l"'.\2|'
- done
-}
-
-pos() {
- find po/ -name '*.po' | grep -Ev '^po/(doc|tests)/'
-}
-
-
-libs | varlist 'libs.go'
-mains | varlist 'mains.go'
-docs | varlist 'manpages.en.N.adoc'
-xdocs | varlist 'manpages.XX.N.adoc'
-pos | varlist 'sources.po'
-
-find tests/functional/*/*.go -not -name main.go | varlist 'functional/lib.go'
-find tests/functional/*/main.go | varlist 'functional/main.go'
-find tests/fuzz/*/*.go -not -name main.go | varlist 'fuzz/lib.go'
-find tests/fuzz/*/main.go | varlist 'fuzz/main.go'
-find tests/benchmarks/*/*.go -not -name main.go | varlist 'benchmarks/lib.go'
-find tests/benchmarks/*/main.go | varlist 'benchmarks/main.go'
-
-{ libs; mains; } | sort | sed 's/^\(.*\)\.go$/\1.a:\t\1.go/'
-mains | sort | sed 's/^\(.*\)\.go$/\1.bin:\t\1.a/'
-mains | sort | sed 's/^\(.*\)\.go$/\1.bin-check:\t\1.bin/'
-mains | sort | sed 's|^\(.*\)/main\.go$|\1/main.a:\t\1/$(NAME).a|'
diff --git a/po/doc/de.po b/po/doc/de.po
deleted file mode 100644
index fd6472f..0000000
--- a/po/doc/de.po
+++ /dev/null
@@ -1,22 +0,0 @@
-msgid ""
-msgstr ""
-"Language: de\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n != 1);\n"
-
-#. type: Title =
-#: doc/fiinha.en.0.adoc:1
-#, no-wrap
-msgid "fiinha(0)"
-msgstr ""
-
-#. type: Title ==
-#: doc/fiinha.en.0.adoc:3
-#, no-wrap
-msgid "NAME"
-msgstr ""
-
-#. type: Plain text
-#: doc/fiinha.en.0.adoc:5
-msgid "fiinha - ."
-msgstr ""
diff --git a/po/doc/doc.pot b/po/doc/doc.pot
deleted file mode 100644
index 5bc8d37..0000000
--- a/po/doc/doc.pot
+++ /dev/null
@@ -1,22 +0,0 @@
-#, fuzzy
-msgid ""
-msgstr ""
-"Language: \n"
-"Content-Type: text/plain; charset=UTF-8\n"
-
-#. type: Title =
-#: doc/fiinha.en.0.adoc:1
-#, no-wrap
-msgid "fiinha(0)"
-msgstr ""
-
-#. type: Title ==
-#: doc/fiinha.en.0.adoc:3
-#, no-wrap
-msgid "NAME"
-msgstr ""
-
-#. type: Plain text
-#: doc/fiinha.en.0.adoc:5
-msgid "fiinha - ."
-msgstr ""
diff --git a/po/doc/eo.po b/po/doc/eo.po
deleted file mode 100644
index e9e1f2f..0000000
--- a/po/doc/eo.po
+++ /dev/null
@@ -1,22 +0,0 @@
-msgid ""
-msgstr ""
-"Language: eo\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n != 1);\n"
-
-#. type: Title =
-#: doc/fiinha.en.0.adoc:1
-#, no-wrap
-msgid "fiinha(0)"
-msgstr ""
-
-#. type: Title ==
-#: doc/fiinha.en.0.adoc:3
-#, no-wrap
-msgid "NAME"
-msgstr ""
-
-#. type: Plain text
-#: doc/fiinha.en.0.adoc:5
-msgid "fiinha - ."
-msgstr ""
diff --git a/po/doc/es.po b/po/doc/es.po
deleted file mode 100644
index cf72172..0000000
--- a/po/doc/es.po
+++ /dev/null
@@ -1,22 +0,0 @@
-msgid ""
-msgstr ""
-"Language: es\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n != 1);\n"
-
-#. type: Title =
-#: doc/fiinha.en.0.adoc:1
-#, no-wrap
-msgid "fiinha(0)"
-msgstr ""
-
-#. type: Title ==
-#: doc/fiinha.en.0.adoc:3
-#, no-wrap
-msgid "NAME"
-msgstr ""
-
-#. type: Plain text
-#: doc/fiinha.en.0.adoc:5
-msgid "fiinha - ."
-msgstr ""
diff --git a/po/doc/fr.po b/po/doc/fr.po
deleted file mode 100644
index 0a2ca5a..0000000
--- a/po/doc/fr.po
+++ /dev/null
@@ -1,22 +0,0 @@
-msgid ""
-msgstr ""
-"Language: fr\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n > 1);\n"
-
-#. type: Title =
-#: doc/fiinha.en.0.adoc:1
-#, no-wrap
-msgid "fiinha(0)"
-msgstr ""
-
-#. type: Title ==
-#: doc/fiinha.en.0.adoc:3
-#, no-wrap
-msgid "NAME"
-msgstr ""
-
-#. type: Plain text
-#: doc/fiinha.en.0.adoc:5
-msgid "fiinha - ."
-msgstr ""
diff --git a/po/doc/note.txt b/po/doc/note.txt
deleted file mode 100644
index 45279a4..0000000
--- a/po/doc/note.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-PO4A-HEADER: mode=eof
-
-
-
-// Generated from po4a(1).
diff --git a/po/doc/po4a.cfg b/po/doc/po4a.cfg
deleted file mode 100644
index aaf1cb1..0000000
--- a/po/doc/po4a.cfg
+++ /dev/null
@@ -1,5 +0,0 @@
-[options] --keep 0 --master-charset UTF-8 --localized-charset UTF-8 --addendum-charset UTF-8
-
-[po_directory] po/doc
-
-[type: asciidoc] doc/fiinha.en.0.adoc $lang:doc/fiinha.$lang.0.adoc
diff --git a/po/doc/pt.po b/po/doc/pt.po
deleted file mode 100644
index 65a0b0e..0000000
--- a/po/doc/pt.po
+++ /dev/null
@@ -1,22 +0,0 @@
-msgid ""
-msgstr ""
-"Language: pt\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n != 1);\n"
-
-#. type: Title =
-#: doc/fiinha.en.0.adoc:1
-#, no-wrap
-msgid "fiinha(0)"
-msgstr ""
-
-#. type: Title ==
-#: doc/fiinha.en.0.adoc:3
-#, no-wrap
-msgid "NAME"
-msgstr ""
-
-#. type: Plain text
-#: doc/fiinha.en.0.adoc:5
-msgid "fiinha - ."
-msgstr ""
diff --git a/po/fiinha/de.po b/po/fiinha/de.po
deleted file mode 100644
index 947f71d..0000000
--- a/po/fiinha/de.po
+++ /dev/null
@@ -1,5 +0,0 @@
-msgid ""
-msgstr ""
-"Language: de\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n != 1);\n"
diff --git a/po/fiinha/eo.po b/po/fiinha/eo.po
deleted file mode 100644
index 444f1ed..0000000
--- a/po/fiinha/eo.po
+++ /dev/null
@@ -1,5 +0,0 @@
-msgid ""
-msgstr ""
-"Language: eo\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n != 1);\n"
diff --git a/po/fiinha/es.po b/po/fiinha/es.po
deleted file mode 100644
index 9288fad..0000000
--- a/po/fiinha/es.po
+++ /dev/null
@@ -1,5 +0,0 @@
-msgid ""
-msgstr ""
-"Language: es\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n != 1);\n"
diff --git a/po/fiinha/fiinha.pot b/po/fiinha/fiinha.pot
deleted file mode 100644
index c9591aa..0000000
--- a/po/fiinha/fiinha.pot
+++ /dev/null
@@ -1,5 +0,0 @@
-msgid ""
-msgstr ""
-"Language: \n"
-"Content-Type: text/plain; charset=UTF-8\n"
-
diff --git a/po/fiinha/fr.po b/po/fiinha/fr.po
deleted file mode 100644
index 6a18324..0000000
--- a/po/fiinha/fr.po
+++ /dev/null
@@ -1,5 +0,0 @@
-msgid ""
-msgstr ""
-"Language: fr\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n > 1);\n"
diff --git a/po/fiinha/po4a.cfg b/po/fiinha/po4a.cfg
deleted file mode 100644
index a9de01a..0000000
--- a/po/fiinha/po4a.cfg
+++ /dev/null
@@ -1,3 +0,0 @@
-[options] --keep 0 --master-charset UTF-8 --localized-charset UTF-8 --addendum-charset UTF-8
-
-[po_directory] po/fiinha
diff --git a/po/fiinha/pt.po b/po/fiinha/pt.po
deleted file mode 100644
index 22c6066..0000000
--- a/po/fiinha/pt.po
+++ /dev/null
@@ -1,5 +0,0 @@
-msgid ""
-msgstr ""
-"Language: pt\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Plural-Forms: nplurals=2; plural=(n != 1);\n"
diff --git a/src/fiinha.go b/src/fiinha.go
deleted file mode 100644
index a819557..0000000
--- a/src/fiinha.go
+++ /dev/null
@@ -1,2517 +0,0 @@
-package fiinha
-
-import (
- "database/sql"
- "flag"
- "fmt"
- "io"
- "log/slog"
- "os"
- "sync"
- "time"
-
- "golite"
- "uuid"
- g "gobang"
-)
-
-
-
-const (
- defaultPrefix = "fiinha"
- reaperSkipCount = 1000
- notOwnerErrorFmt = "%v owns %#v as %#v, not us (%v)"
- rollbackErrorFmt = "rollback error: %w; while executing: %w"
-)
-
-
-
-type dbconfigT struct{
- shared *sql.DB
- dbpath string
- prefix string
- instanceID int
-}
-
-type queryT struct{
- write string
- read string
- owner string
-}
-
-type queriesT struct{
- take func(string, string) error
- publish func(UnsentMessage, uuid.UUID) (messageT, error)
- find func(string, uuid.UUID) (messageT, error)
- next func(string, string) (messageT, error)
- pending func(string, string, func(messageT) error) error
- commit func(string, uuid.UUID) error
- toDead func(string, uuid.UUID, uuid.UUID) error
- replay func(uuid.UUID, uuid.UUID) (messageT, error)
- oneDead func(string, string) (deadletterT, error)
- allDead func(string, string, func(deadletterT, messageT) error) error
- size func(string) (int, error)
- count func(string, string) (int, error)
- hasData func(string, string) (bool, error)
- close func() error
-}
-
-type messageT struct{
- id int64
- timestamp time.Time
- uuid uuid.UUID
- topic string
- flowID uuid.UUID
- payload []byte
-}
-
-type UnsentMessage struct{
- Topic string
- FlowID uuid.UUID
- Payload []byte
-}
-
-type Message struct{
- ID uuid.UUID
- Timestamp time.Time
- Topic string
- FlowID uuid.UUID
- Payload []byte
-}
-
-type deadletterT struct{
- uuid uuid.UUID
- timestamp time.Time
- consumer string
- messageID uuid.UUID
-}
-
-type pingerT[T any] struct{
- tryPing func(T)
- onPing func(func(T))
- closed func() bool
- close func()
-}
-
-type consumerDataT struct{
- topic string
- name string
-}
-
-type waiterDataT struct{
- topic string
- flowID uuid.UUID
- name string
-
-}
-
-type consumerT struct{
- data consumerDataT
- callback func(Message) error
- pinger pingerT[struct{}]
- close *func()
-}
-
-type waiterT struct{
- data waiterDataT
- pinger pingerT[[]byte]
- closed *func() bool
- close *func()
-}
-
-type topicSubscriptionT struct{
- consumers map[string]consumerT
- waiters map[uuid.UUID]map[string]waiterT
-}
-
-type subscriptionsSetM map[string]topicSubscriptionT
-
-type subscriptionsT struct {
- read func(func(subscriptionsSetM) error) error
- write func(func(subscriptionsSetM) error) error
-}
-
-type queueT struct{
- queries queriesT
- subscriptions subscriptionsT
- pinger pingerT[struct{}]
-}
-
-type argsT struct{
- databasePath string
- prefix string
- command string
- allArgs []string
- args []string
- topic string
- consumer string
-}
-
-type commandT struct{
- name string
- getopt func(argsT, io.Writer) (argsT, bool)
- exec func(argsT, queriesT, io.Reader, io.Writer) (int, error)
-}
-
-type IQueue interface{
- Publish(UnsentMessage) (Message, error)
- Subscribe( string, string, func(Message) error) error
- Unsubscribe(string, string)
- WaitFor(string, uuid.UUID, string) Waiter
- Close() error
-}
-
-
-
-func tryRollback(tx *sql.Tx, err error) error {
- rollbackErr := tx.Rollback()
- if rollbackErr != nil {
- return fmt.Errorf(
- rollbackErrorFmt,
- rollbackErr,
- err,
- )
- }
-
- return err
-}
-
-func inTx(db *sql.DB, fn func(*sql.Tx) error) error {
- tx, err := db.Begin()
- if err != nil {
- return err
- }
-
- err = fn(tx)
- if err != nil {
- return tryRollback(tx, err)
- }
-
- err = tx.Commit()
- if err != nil {
- return tryRollback(tx, err)
- }
-
- return nil
-}
-
-func serialized[A any, B any](callback func(...A) B) (func(...A) B, func()) {
- in := make(chan []A)
- out := make(chan B)
-
- closed := false
- var (
- closeWg sync.WaitGroup
- closeMutex sync.Mutex
- )
- closeWg.Add(1)
-
- go func() {
- for input := range in {
- out <- callback(input...)
- }
- close(out)
- closeWg.Done()
- }()
-
- fn := func(input ...A) B {
- in <- input
- return (<- out)
- }
-
- closeFn := func() {
- closeMutex.Lock()
- defer closeMutex.Unlock()
- if closed {
- return
- }
- close(in)
- closed = true
- closeWg.Wait()
- }
-
- return fn, closeFn
-}
-
-func execSerialized(query string, db *sql.DB) (func(...any) error, func()) {
- return serialized(func(args ...any) error {
- return inTx(db, func(tx *sql.Tx) error {
- _, err := tx.Exec(query, args...)
- return err
- })
- })
-}
-
-func createTablesSQL(prefix string) queryT {
- const tmpl_write = `
- CREATE TABLE IF NOT EXISTS "%s_payloads" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- topic TEXT NOT NULL,
- payload BLOB NOT NULL
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "%s_payloads_topic"
- ON "%s_payloads"(topic);
-
- CREATE TABLE IF NOT EXISTS "%s_messages" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- uuid BLOB NOT NULL UNIQUE,
- flow_id BLOB NOT NULL,
- payload_id INTEGER NOT NULL
- REFERENCES "%s_payloads"(id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "%s_messages_flow_id"
- ON "%s_messages"(flow_id);
-
- CREATE TABLE IF NOT EXISTS "%s_offsets" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- consumer TEXT NOT NULL,
- message_id INTEGER NOT NULL
- REFERENCES "%s_messages"(id),
- instance_id INTEGER NOT NULL,
- UNIQUE (consumer, message_id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "%s_offsets_consumer"
- ON "%s_offsets"(consumer);
-
- CREATE TABLE IF NOT EXISTS "%s_deadletters" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- uuid BLOB NOT NULL UNIQUE,
- consumer TEXT NOT NULL,
- message_id INTEGER NOT NULL
- REFERENCES "%s_messages"(id),
- instance_id INTEGER NOT NULL,
- UNIQUE (consumer, message_id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "%s_deadletters_consumer"
- ON "%s_deadletters"(consumer);
-
- CREATE TABLE IF NOT EXISTS "%s_replays" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- deadletter_id INTEGER NOT NULL UNIQUE
- REFERENCES "%s_deadletters"(id) ,
- message_id INTEGER NOT NULL UNIQUE
- REFERENCES "%s_messages"(id)
- ) STRICT;
-
- CREATE TABLE IF NOT EXISTS "%s_owners" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- topic TEXT NOT NULL,
- consumer TEXT NOT NULL,
- owner_id INTEGER NOT NULL,
- UNIQUE (topic, consumer)
- ) STRICT;
-
- CREATE TRIGGER IF NOT EXISTS "%s_check_instance_owns_topic"
- BEFORE INSERT ON "%s_offsets"
- WHEN NEW.instance_id != (
- SELECT owner_id FROM "%s_owners"
- WHERE topic = (
- SELECT "%s_payloads".topic
- FROM "%s_payloads"
- JOIN "%s_messages" ON "%s_payloads".id =
- "%s_messages".payload_id
- WHERE "%s_messages".id = NEW.message_id
- ) AND consumer = NEW.consumer
- )
- BEGIN
- SELECT RAISE(
- ABORT,
- 'instance does not own topic/consumer combo'
- );
- END;
-
- CREATE TRIGGER IF NOT EXISTS "%s_check_can_publish_deadletter"
- BEFORE INSERT ON "%s_deadletters"
- WHEN NEW.instance_id != (
- SELECT owner_id FROM "%s_owners"
- WHERE topic = (
- SELECT "%s_payloads".topic
- FROM "%s_payloads"
- JOIN "%s_messages" ON "%s_payloads".id =
- "%s_messages".payload_id
- WHERE "%s_messages".id = NEW.message_id
- ) AND consumer = NEW.consumer
- )
- BEGIN
- SELECT RAISE(
- ABORT,
- 'Instance does not own topic/consumer combo'
- );
- END;
- `
- return queryT{
- write: fmt.Sprintf(
- tmpl_write,
- prefix,
- g.SQLiteNow,
- prefix,
- prefix,
- prefix,
- g.SQLiteNow,
- prefix,
- prefix,
- prefix,
- prefix,
- g.SQLiteNow,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func createTables(db *sql.DB, prefix string) error {
- q := createTablesSQL(prefix)
-
- return inTx(db, func(tx *sql.Tx) error {
- _, err := tx.Exec(q.write)
- return err
- })
-}
-
-func takeSQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_owners" (topic, consumer, owner_id)
- VALUES (?, ?, ?)
- ON CONFLICT (topic, consumer) DO
- UPDATE SET owner_id=excluded.owner_id;
- `
- return queryT{
- write: fmt.Sprintf(tmpl_write, prefix),
- }
-}
-
-func takeStmt(
- cfg dbconfigT,
-) (func(string, string) error, func() error, error) {
- q := takeSQL(cfg.prefix)
-
- writeStmt, err := cfg.shared.Prepare(q.write)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) error {
- _, err := writeStmt.Exec(
- topic,
- consumer,
- cfg.instanceID,
- )
- return err
- }
-
- return fn, writeStmt.Close, nil
-}
-
-func publishSQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_payloads" (topic, payload)
- VALUES (?, ?);
-
- INSERT INTO "%s_messages" (uuid, flow_id, payload_id)
- VALUES (?, ?, last_insert_rowid());
- `
- const tmpl_read = `
- SELECT id, timestamp FROM "%s_messages"
- WHERE uuid = ?;
- `
- return queryT{
- write: fmt.Sprintf(tmpl_write, prefix, prefix),
- read: fmt.Sprintf(tmpl_read, prefix),
- }
-}
-
-func publishStmt(
- cfg dbconfigT,
-) (func(UnsentMessage, uuid.UUID) (messageT, error), func() error, error) {
- q := publishSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- privateDB, err := sql.Open(golite.DriverName, cfg.dbpath)
- if err != nil {
- readStmt.Close()
- return nil, nil, err
- }
-
- writeFn, writeFnClose := execSerialized(q.write, privateDB)
-
- fn := func(
- unsentMessage UnsentMessage,
- messageID uuid.UUID,
- ) (messageT, error) {
- message := messageT{
- uuid: messageID,
- topic: unsentMessage.Topic,
- flowID: unsentMessage.FlowID,
- payload: unsentMessage.Payload,
- }
-
- message_id_bytes := messageID[:]
- flow_id_bytes := unsentMessage.FlowID[:]
- err := writeFn(
- unsentMessage.Topic,
- unsentMessage.Payload,
- message_id_bytes,
- flow_id_bytes,
- )
- if err != nil {
- return messageT{}, err
- }
-
- var timestr string
- err = readStmt.QueryRow(message_id_bytes).Scan(
- &message.id,
- &timestr,
- )
- if err != nil {
- return messageT{}, err
- }
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- return messageT{}, err
- }
-
- return message, nil
- }
-
- closeFn := func() error {
- writeFnClose()
- return g.SomeError(privateDB.Close(), readStmt.Close())
- }
-
- return fn, closeFn, nil
-}
-
-func findSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- "%s_messages".id,
- "%s_messages".timestamp,
- "%s_messages".uuid,
- "%s_payloads".payload
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_messages".flow_id = ?
- ORDER BY "%s_messages".id DESC
- LIMIT 1;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func findStmt(
- cfg dbconfigT,
-) (func(string, uuid.UUID) (messageT, error), func() error, error) {
- q := findSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, flowID uuid.UUID) (messageT, error) {
- message := messageT{
- topic: topic,
- flowID: flowID,
- }
-
- var (
- timestr string
- message_id_bytes []byte
- )
- flow_id_bytes := flowID[:]
- err = readStmt.QueryRow(topic, flow_id_bytes).Scan(
- &message.id,
- &timestr,
- &message_id_bytes,
- &message.payload,
- )
- if err != nil {
- return messageT{}, err
- }
- message.uuid = uuid.UUID(message_id_bytes)
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- return messageT{}, err
- }
-
- return message, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func nextSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- (
- SELECT owner_id FROM "%s_owners"
- WHERE
- topic = ? AND
- consumer = ?
- LIMIT 1
- ) AS owner_id,
- "%s_messages".id,
- "%s_messages".timestamp,
- "%s_messages".uuid,
- "%s_messages".flow_id,
- "%s_payloads".payload
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_messages".id NOT IN (
- SELECT message_id FROM "%s_offsets"
- WHERE consumer = ?
- )
- ORDER BY "%s_messages".id ASC
- LIMIT 1;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func nextStmt(
- cfg dbconfigT,
-) (func(string, string) (messageT, error), func() error, error) {
- q := nextSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (messageT, error) {
- message := messageT{
- topic: topic,
- }
-
- var (
- ownerID int
- timestr string
- message_id_bytes []byte
- flow_id_bytes []byte
- )
-
- err = readStmt.QueryRow(topic, consumer, topic, consumer).Scan(
- &ownerID,
- &message.id,
- &timestr,
- &message_id_bytes,
- &flow_id_bytes,
- &message.payload,
- )
- if err != nil {
- return messageT{}, err
- }
-
- if ownerID != cfg.instanceID {
- err := fmt.Errorf(
- notOwnerErrorFmt,
- ownerID,
- topic,
- consumer,
- cfg.instanceID,
- )
- return messageT{}, err
- }
- message.uuid = uuid.UUID(message_id_bytes)
- message.flowID = uuid.UUID(flow_id_bytes)
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- return messageT{}, err
- }
-
- return message, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func messageEach(rows *sql.Rows, callback func(messageT) error) error {
- if rows == nil {
- return nil
- }
-
- for rows.Next() {
- var (
- message messageT
- timestr string
- message_id_bytes []byte
- flow_id_bytes []byte
- )
- err := rows.Scan(
- &message.id,
- &timestr,
- &message_id_bytes,
- &flow_id_bytes,
- &message.topic,
- &message.payload,
- )
- if err != nil {
- rows.Close()
- return err
- }
- message.uuid = uuid.UUID(message_id_bytes)
- message.flowID = uuid.UUID(flow_id_bytes)
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- rows.Close()
- return err
- }
-
- err = callback(message)
- if err != nil {
- rows.Close()
- return err
- }
- }
-
- return g.WrapErrors(rows.Err(), rows.Close())
-}
-
-func pendingSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- "%s_messages".id,
- "%s_messages".timestamp,
- "%s_messages".uuid,
- "%s_messages".flow_id,
- "%s_payloads".topic,
- "%s_payloads".payload
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_messages".id NOT IN (
- SELECT message_id FROM "%s_offsets"
- WHERE consumer = ?
- )
- ORDER BY "%s_messages".id ASC;
- `
- const tmpl_owner = `
- SELECT owner_id FROM "%s_owners"
- WHERE
- topic = ? AND
- consumer = ?;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- owner: fmt.Sprintf(tmpl_owner, prefix),
- }
-}
-
-func pendingStmt(
- cfg dbconfigT,
-) (func(string, string) (*sql.Rows, error), func() error, error) {
- q := pendingSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- ownerStmt, err := cfg.shared.Prepare(q.owner)
- if err != nil {
- readStmt.Close()
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (*sql.Rows, error) {
- var ownerID int
- err := ownerStmt.QueryRow(topic, consumer).Scan(&ownerID)
- if err != nil {
- return nil, err
- }
-
- // best effort check, the final one is done during
- // commit within a transaction
- if ownerID != cfg.instanceID {
- return nil, nil
- }
-
- return readStmt.Query(topic, consumer)
- }
-
- closeFn := func() error {
- return g.SomeFnError(readStmt.Close, ownerStmt.Close)
- }
-
- return fn, closeFn, nil
-}
-
-func commitSQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_offsets" (consumer, message_id, instance_id)
- VALUES (?, (SELECT id FROM "%s_messages" WHERE uuid = ?), ?);
- `
- return queryT{
- write: fmt.Sprintf(tmpl_write, prefix, prefix),
- }
-}
-
-func commitStmt(
- cfg dbconfigT,
-) (func(string, uuid.UUID) error, func() error, error) {
- q := commitSQL(cfg.prefix)
-
- writeStmt, err := cfg.shared.Prepare(q.write)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(consumer string, messageID uuid.UUID) error {
- message_id_bytes := messageID[:]
- _, err = writeStmt.Exec(
- consumer,
- message_id_bytes,
- cfg.instanceID,
- )
- return err
- }
-
- return fn, writeStmt.Close, nil
-}
-
-func toDeadSQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_offsets"
- ( consumer, message_id, instance_id)
- VALUES ( ?, (SELECT id FROM "%s_messages" WHERE uuid = ?), ?);
-
- INSERT INTO "%s_deadletters"
- (uuid, consumer, message_id, instance_id)
- VALUES (?, ?, (SELECT id FROM "%s_messages" WHERE uuid = ?), ?);
- `
- return queryT{
- write: fmt.Sprintf(tmpl_write, prefix, prefix, prefix, prefix),
- }
-}
-
-func toDeadStmt(
- cfg dbconfigT,
-) (
- func(string, uuid.UUID, uuid.UUID) error,
- func() error,
- error,
-) {
- q := toDeadSQL(cfg.prefix)
-
- privateDB, err := sql.Open(golite.DriverName, cfg.dbpath)
- if err != nil {
- return nil, nil, err
- }
-
- writeFn, writeFnClose := execSerialized(q.write, privateDB)
-
- fn := func(
- consumer string,
- messageID uuid.UUID,
- deadletterID uuid.UUID,
- ) error {
- message_id_bytes := messageID[:]
- deadletter_id_bytes := deadletterID[:]
- return writeFn(
- consumer,
- message_id_bytes,
- cfg.instanceID,
- deadletter_id_bytes,
- consumer,
- message_id_bytes,
- cfg.instanceID,
- )
- }
-
- closeFn := func() error {
- writeFnClose()
- return privateDB.Close()
- }
-
-
- return fn, closeFn, nil
-}
-
-func replaySQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_messages" (uuid, flow_id, payload_id)
- SELECT
- ?,
- "%s_messages".flow_id,
- "%s_messages".payload_id
- FROM "%s_messages"
- JOIN "%s_deadletters" ON
- "%s_messages".id = "%s_deadletters".message_id
- WHERE "%s_deadletters".uuid = ?;
-
- INSERT INTO "%s_replays" (deadletter_id, message_id)
- VALUES (
- (SELECT id FROM "%s_deadletters" WHERE uuid = ?),
- last_insert_rowid()
- );
- `
- const tmpl_read = `
- SELECT
- "%s_messages".id,
- "%s_messages".timestamp,
- "%s_messages".flow_id,
- "%s_payloads".topic,
- "%s_payloads".payload
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE "%s_messages".uuid = ?;
- `
- return queryT{
- write: fmt.Sprintf(
- tmpl_write,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func replayStmt(
- cfg dbconfigT,
-) (func(uuid.UUID, uuid.UUID) (messageT, error), func() error, error) {
- q := replaySQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- privateDB, err := sql.Open(golite.DriverName, cfg.dbpath)
- if err != nil {
- readStmt.Close()
- return nil, nil, err
- }
-
- writeFn, writeFnClose := execSerialized(q.write, privateDB)
-
- fn := func(
- deadletterID uuid.UUID,
- messageID uuid.UUID,
- ) (messageT, error) {
- deadletter_id_bytes := deadletterID[:]
- message_id_bytes := messageID[:]
- err := writeFn(
- message_id_bytes,
- deadletter_id_bytes,
- deadletter_id_bytes,
- )
- if err != nil {
- return messageT{}, err
- }
-
- message := messageT{
- uuid: messageID,
- }
-
- var (
- timestr string
- flow_id_bytes []byte
- )
- err = readStmt.QueryRow(message_id_bytes).Scan(
- &message.id,
- &timestr,
- &flow_id_bytes,
- &message.topic,
- &message.payload,
- )
- if err != nil {
- return messageT{}, err
- }
- message.flowID = uuid.UUID(flow_id_bytes)
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- return messageT{}, err
- }
-
- return message, nil
- }
-
- closeFn := func() error {
- writeFnClose()
- return g.SomeError(privateDB.Close(), readStmt.Close())
- }
-
- return fn, closeFn, nil
-}
-
-func oneDeadSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- "%s_deadletters".uuid,
- "%s_offsets".timestamp,
- "%s_messages".uuid
- FROM "%s_deadletters"
- JOIN "%s_offsets" ON
- "%s_deadletters".message_id = "%s_offsets".message_id
- JOIN "%s_messages" ON
- "%s_deadletters".message_id = "%s_messages".id
- JOIN "%s_payloads" ON
- "%s_messages".payload_id = "%s_payloads".id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_deadletters".consumer = ? AND
- "%s_offsets".consumer = ? AND
- "%s_deadletters".id NOT IN (
- SELECT deadletter_id FROM "%s_replays"
- )
- ORDER BY "%s_deadletters".id ASC
- LIMIT 1;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func oneDeadStmt(
- cfg dbconfigT,
-) (func(string, string) (deadletterT, error), func() error, error) {
- q := oneDeadSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (deadletterT, error) {
- deadletter := deadletterT{
- consumer: consumer,
- }
-
- var (
- deadletter_id_bytes []byte
- timestr string
- message_id_bytes []byte
- )
- err := readStmt.QueryRow(topic, consumer, consumer).Scan(
- &deadletter_id_bytes,
- &timestr,
- &message_id_bytes,
- )
- if err != nil {
- return deadletterT{}, err
- }
- deadletter.uuid = uuid.UUID(deadletter_id_bytes)
- deadletter.messageID = uuid.UUID(message_id_bytes)
-
- deadletter.timestamp, err = time.Parse(
- time.RFC3339Nano,
- timestr,
- )
- if err != nil {
- return deadletterT{}, err
- }
-
- return deadletter, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func deadletterEach(
- rows *sql.Rows,
- callback func(deadletterT, messageT) error,
-) error {
- for rows.Next() {
- var (
- deadletter deadletterT
- deadletter_id_bytes []byte
- deadletterTimestr string
- message messageT
- messageTimestr string
- message_id_bytes []byte
- flow_id_bytes []byte
- )
- err := rows.Scan(
- &deadletter_id_bytes,
- &message.id,
- &deadletterTimestr,
- &deadletter.consumer,
- &messageTimestr,
- &message_id_bytes,
- &flow_id_bytes,
- &message.topic,
- &message.payload,
- )
- if err != nil {
- rows.Close()
- return err
- }
-
- deadletter.uuid = uuid.UUID(deadletter_id_bytes)
- deadletter.messageID = uuid.UUID(message_id_bytes)
- message.uuid = uuid.UUID(message_id_bytes)
- message.flowID = uuid.UUID(flow_id_bytes)
-
- message.timestamp, err = time.Parse(
- time.RFC3339Nano,
- messageTimestr,
- )
- if err != nil {
- rows.Close()
- return err
- }
-
- deadletter.timestamp, err = time.Parse(
- time.RFC3339Nano,
- deadletterTimestr,
- )
- if err != nil {
- rows.Close()
- return err
- }
-
- err = callback(deadletter, message)
- if err != nil {
- rows.Close()
- return err
- }
- }
-
- return g.WrapErrors(rows.Err(), rows.Close())
-}
-
-func allDeadSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- "%s_deadletters".uuid,
- "%s_deadletters".message_id,
- "%s_offsets".timestamp,
- "%s_offsets".consumer,
- "%s_messages".timestamp,
- "%s_messages".uuid,
- "%s_messages".flow_id,
- "%s_payloads".topic,
- "%s_payloads".payload
- FROM "%s_deadletters"
- JOIN "%s_offsets" ON
- "%s_deadletters".message_id = "%s_offsets".message_id
- JOIN "%s_messages" ON
- "%s_deadletters".message_id = "%s_messages".id
- JOIN "%s_payloads" ON
- "%s_messages".payload_id = "%s_payloads".id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_deadletters".consumer = ? AND
- "%s_offsets".consumer = ? AND
- "%s_deadletters".id NOT IN (
- SELECT deadletter_id FROM "%s_replays"
- )
- ORDER BY "%s_deadletters".id ASC;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func allDeadStmt(
- cfg dbconfigT,
-) (func(string, string) (*sql.Rows, error), func() error, error) {
- q := allDeadSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (*sql.Rows, error) {
- return readStmt.Query(topic, consumer, consumer)
- }
-
- return fn, readStmt.Close, nil
-}
-
-func sizeSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- COUNT(1) as size
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_messages".payload_id = "%s_payloads".id
- WHERE "%s_payloads".topic = ?;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-
-func sizeStmt(
- cfg dbconfigT,
-) (func(string) (int, error), func() error, error) {
- q := sizeSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string) (int, error) {
- var size int
- err := readStmt.QueryRow(topic).Scan(&size)
- if err != nil {
- return -1, err
- }
-
- return size, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func countSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- COUNT(1) as count
- FROM "%s_messages"
- JOIN "%s_offsets" ON
- "%s_messages".id = "%s_offsets".message_id
- JOIN "%s_payloads" ON
- "%s_messages".payload_id = "%s_payloads".id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_offsets".consumer = ?;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func countStmt(
- cfg dbconfigT,
-) (func(string, string) (int, error), func() error, error) {
- q := countSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (int, error) {
- var count int
- err := readStmt.QueryRow(topic, consumer).Scan(&count)
- if err != nil {
- return -1, err
- }
-
- return count, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func hasDataSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT 1 as data
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_messages".id NOT IN (
- SELECT message_id FROM "%s_offsets"
- WHERE consumer = ?
- )
- LIMIT 1;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func hasDataStmt(
- cfg dbconfigT,
-) (func(string, string) (bool, error), func() error, error) {
- q := hasDataSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (bool, error) {
- var _x int
- err := readStmt.QueryRow(topic, consumer).Scan(&_x)
- if err == sql.ErrNoRows {
- return false, nil
- }
-
- if err != nil {
- return false, err
- }
-
- return true, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func initDB(
- dbpath string,
- prefix string,
- notifyFn func(messageT),
- instanceID int,
-) (queriesT, error) {
- err := g.ValidateSQLTablePrefix(prefix)
- if err != nil {
- return queriesT{}, err
- }
-
- shared, err := sql.Open(golite.DriverName, dbpath)
- if err != nil {
- return queriesT{}, err
- }
-
- cfg := dbconfigT{
- shared: shared,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
-
- createTablesErr := createTables(shared, prefix)
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- find, findClose, findErr := findStmt(cfg)
- next, nextClose, nextErr := nextStmt(cfg)
- pending, pendingClose, pendingErr := pendingStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
- allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
- size, sizeClose, sizeErr := sizeStmt(cfg)
- count, countClose, countErr := countStmt(cfg)
- hasData, hasDataClose, hasDataErr := hasDataStmt(cfg)
-
- err = g.SomeError(
- createTablesErr,
- takeErr,
- publishErr,
- findErr,
- nextErr,
- pendingErr,
- commitErr,
- toDeadErr,
- replayErr,
- oneDeadErr,
- allDeadErr,
- sizeErr,
- countErr,
- hasDataErr,
- )
- if err != nil {
- return queriesT{}, err
- }
-
- closeFn := func() error {
- return g.SomeFnError(
- takeClose,
- publishClose,
- findClose,
- nextClose,
- pendingClose,
- commitClose,
- toDeadClose,
- replayClose,
- oneDeadClose,
- allDeadClose,
- sizeClose,
- countClose,
- hasDataClose,
- shared.Close,
- )
- }
-
- var connMutex sync.RWMutex
- return queriesT{
- take: func(a string, b string) error {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return take(a, b)
- },
- publish: func(a UnsentMessage, b uuid.UUID) (messageT, error) {
- var (
- err error
- message messageT
- )
- {
- connMutex.RLock()
- defer connMutex.RUnlock()
- message, err = publish(a, b)
- }
- if err != nil {
- return messageT{}, err
- }
-
- go notifyFn(message)
- return message, nil
- },
- find: func(a string, b uuid.UUID) (messageT, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return find(a, b)
- },
- next: func(a string, b string) (messageT, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return next(a, b)
- },
- pending: func(
- a string,
- b string,
- callback func(messageT) error,
- ) error {
- var (
- err error
- rows *sql.Rows
- )
- {
- connMutex.RLock()
- defer connMutex.RUnlock()
- rows, err = pending(a, b)
- }
- if err != nil {
- return err
- }
-
- return messageEach(rows, callback)
- },
- commit: func(a string, b uuid.UUID) error {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return commit(a, b)
- },
- toDead: func(
- a string,
- b uuid.UUID,
- c uuid.UUID,
- ) error {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return toDead(a, b, c)
- },
- replay: func(a uuid.UUID, b uuid.UUID) (messageT, error) {
- var (
- err error
- message messageT
- )
- {
- connMutex.RLock()
- defer connMutex.RUnlock()
- message, err = replay(a, b)
- }
- if err != nil {
- return messageT{}, err
- }
-
- go notifyFn(message)
- return message, nil
- },
- oneDead: func(a string, b string) (deadletterT, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return oneDead(a, b)
- },
- allDead: func(
- a string,
- b string,
- callback func(deadletterT, messageT) error,
- ) error {
- var (
- err error
- rows *sql.Rows
- )
- {
- connMutex.RLock()
- defer connMutex.RUnlock()
- rows, err = allDead(a, b)
- }
- if err != nil {
- return err
- }
-
- return deadletterEach(rows, callback)
- },
- size: func(a string) (int, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return size(a)
- },
- count: func(a string, b string) (int, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return count(a, b)
- },
- hasData: func(a string, b string) (bool, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return hasData(a, b)
- },
- close: func() error {
- connMutex.Lock()
- defer connMutex.Unlock()
- return closeFn()
- },
- }, nil
-}
-
-
-func newPinger[T any]() pingerT[T] {
- channel := make(chan T, 1)
- closed := false
- var rwmutex sync.RWMutex
- return pingerT[T]{
- tryPing: func(x T) {
- rwmutex.RLock()
- defer rwmutex.RUnlock()
- if closed {
- return
- }
- select {
- case channel <- x:
- default:
- }
- },
- onPing: func(cb func(T)) {
- for x := range channel {
- cb(x)
- }
- },
- closed: func() bool {
- rwmutex.RLock()
- defer rwmutex.RUnlock()
- return closed
- },
- close: func() {
- rwmutex.Lock()
- defer rwmutex.Unlock()
- if closed {
- return
- }
- close(channel)
- closed = true
- },
- }
-}
-
-func makeSubscriptionsFuncs() subscriptionsT {
- var rwmutex sync.RWMutex
- subscriptions := subscriptionsSetM{}
- return subscriptionsT{
- read: func(callback func(subscriptionsSetM) error) error {
- rwmutex.RLock()
- defer rwmutex.RUnlock()
- return callback(subscriptions)
- },
- write: func(callback func(subscriptionsSetM) error) error {
- rwmutex.Lock()
- defer rwmutex.Unlock()
- return callback(subscriptions)
- },
- }
-}
-
-// Try notifying the consumer that they have data to work with. If they're
-// already full, we simply drop the notification, as on each they'll look for
-// all pending messages and process them all. So dropping the event here
-// doesn't mean not notifying the consumer, but simply acknoledging that the
-// existing notifications are enough for them to work with, without letting any
-// message slip through.
-func makeNotifyFn(
- readFn func(func(subscriptionsSetM) error) error,
- pinger pingerT[struct{}],
-) func(messageT) {
- return func(message messageT) {
- readFn(func(set subscriptionsSetM) error {
- topicSub, ok := set[message.topic]
- if !ok {
- return nil
- }
-
- for _, consumer := range topicSub.consumers {
- consumer.pinger.tryPing(struct{}{})
- }
- waiters := topicSub.waiters[message.flowID]
- for _, waiter := range waiters {
- waiter.pinger.tryPing(message.payload)
- }
- return nil
- })
- pinger.tryPing(struct{}{})
- }
-}
-
-func collectClosedWaiters(
- set subscriptionsSetM,
-) map[string]map[uuid.UUID][]string {
- waiters := map[string]map[uuid.UUID][]string{}
- for topic, topicSub := range set {
- waiters[topic] = map[uuid.UUID][]string{}
- for flowID, waitersByName := range topicSub.waiters {
- names := []string{}
- for name, waiter := range waitersByName {
- if (*waiter.closed)() {
- names = append(names, name)
- }
- }
- waiters[topic][flowID] = names
- }
- }
-
- return waiters
-}
-
-func trimEmptyLeaves(closedWaiters map[string]map[uuid.UUID][]string) {
- for topic, waiters := range closedWaiters {
- for flowID, names := range waiters {
- if len(names) == 0 {
- delete(closedWaiters[topic], flowID)
- }
- }
- if len(waiters) == 0 {
- delete(closedWaiters, topic)
- }
- }
-}
-
-func deleteIfEmpty(set subscriptionsSetM, topic string) {
- topicSub, ok := set[topic]
- if !ok {
- return
- }
-
- emptyConsumers := len(topicSub.consumers) == 0
- emptyWaiters := len(topicSub.waiters) == 0
- if emptyConsumers && emptyWaiters {
- delete(set, topic)
- }
-}
-
-func deleteEmptyTopics(set subscriptionsSetM) {
- for topic, _ := range set {
- deleteIfEmpty(set, topic)
- }
-}
-
-func removeClosedWaiters(
- set subscriptionsSetM,
- closedWaiters map[string]map[uuid.UUID][]string,
-) {
- for topic, waiters := range closedWaiters {
- _, ok := set[topic]
- if !ok {
- continue
- }
-
- for flowID, names := range waiters {
- if set[topic].waiters[flowID] == nil {
- continue
- }
- for _, name := range names {
- delete(set[topic].waiters[flowID], name)
- }
- if len(set[topic].waiters[flowID]) == 0 {
- delete(set[topic].waiters, flowID)
- }
- }
- }
-
- deleteEmptyTopics(set)
-}
-
-func reapClosedWaiters(
- readFn func(func(subscriptionsSetM) error) error,
- writeFn func(func(subscriptionsSetM) error) error,
-) {
- var closedWaiters map[string]map[uuid.UUID][]string
- readFn(func(set subscriptionsSetM) error {
- closedWaiters = collectClosedWaiters(set)
- return nil
- })
-
- trimEmptyLeaves(closedWaiters)
- if len(closedWaiters) == 0 {
- return
- }
-
- writeFn(func(set subscriptionsSetM) error {
- removeClosedWaiters(set, closedWaiters)
- return nil
- })
-}
-
-func everyNthCall[T any](n int, fn func(T)) func(T) {
- i := 0
- return func(x T) {
- i++
- if i == n {
- i = 0
- fn(x)
- }
- }
-}
-
-func runReaper(
- onPing func(func(struct{})),
- readFn func(func(subscriptionsSetM) error) error,
- writeFn func(func(subscriptionsSetM) error) error,
-) {
- onPing(everyNthCall(reaperSkipCount, func(struct{}) {
- reapClosedWaiters(readFn, writeFn)
- }))
-}
-
-func NewWithPrefix(databasePath string, prefix string) (IQueue, error) {
- subscriptions := makeSubscriptionsFuncs()
- pinger := newPinger[struct{}]()
- notifyFn := makeNotifyFn(subscriptions.read, pinger)
- queries, err := initDB(databasePath, prefix, notifyFn, os.Getpid())
- if err != nil {
- return queueT{}, err
- }
-
- go runReaper(pinger.onPing, subscriptions.read, subscriptions.write)
-
- return queueT{
- queries: queries,
- subscriptions: subscriptions,
- pinger: pinger,
- }, nil
-}
-
-func New(databasePath string) (IQueue, error) {
- return NewWithPrefix(databasePath, defaultPrefix)
-}
-
-func asPublicMessage(message messageT) Message {
- return Message{
- ID: message.uuid,
- Timestamp: message.timestamp,
- Topic: message.topic,
- FlowID: message.flowID,
- Payload: message.payload,
- }
-}
-
-func (queue queueT) Publish(unsent UnsentMessage) (Message, error) {
- message, err := queue.queries.publish(unsent, uuid.New())
- if err != nil {
- return Message{}, err
- }
-
- return asPublicMessage(message), nil
-}
-
-func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error {
- topicSub := topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- }
-
- return func(set subscriptionsSetM) error {
- topic := consumer.data.topic
- _, ok := set[topic]
- if !ok {
- set[topic] = topicSub
- }
- set[topic].consumers[consumer.data.name] = consumer
-
- return nil
- }
-}
-
-func registerWaiterFn(waiter waiterT) func(subscriptionsSetM) error {
- topicSub := topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- }
- waiters := map[string]waiterT{}
-
- return func(set subscriptionsSetM) error {
- var (
- topic = waiter.data.topic
- flowID = waiter.data.flowID
- )
- _, ok := set[topic]
- if !ok {
- set[topic] = topicSub
- }
- if set[topic].waiters[flowID] == nil {
- set[topic].waiters[flowID] = waiters
- }
- set[topic].waiters[flowID][waiter.data.name] = waiter
- return nil
- }
-}
-
-func makeConsumeOneFn(
- data consumerDataT,
- callback func(Message) error,
- successFn func(string, uuid.UUID) error,
- errorFn func(string, uuid.UUID, uuid.UUID) error,
-) func(messageT) error {
- return func(message messageT) error {
- err := callback(asPublicMessage(message))
- if err != nil {
- g.Info(
- "consumer failed", "fiinha-consumer",
- "topic", data.topic,
- "consumer", data.name,
- "error", err,
- slog.Group(
- "message",
- "id", message.id,
- "flow-id", message.flowID.String(),
- ),
- )
-
- return errorFn(data.name, message.uuid, uuid.New())
- }
-
- return successFn(data.name, message.uuid)
- }
-}
-
-func makeConsumeAllFn(
- data consumerDataT,
- consumeOneFn func(messageT) error,
- eachFn func(string, string, func(messageT) error) error,
-) func(struct{}) {
- return func(struct{}) {
- err := eachFn(data.topic, data.name, consumeOneFn)
- if err != nil {
- g.Warning(
- "eachFn failed", "fiinha-consume-all",
- "topic", data.topic,
- "consumer", data.name,
- "error", err,
- "circuit-breaker-enabled?", false,
- )
- }
- }
-}
-
-func makeWaitFn(channel chan []byte, closeFn func()) func([]byte) {
- closed := false
- var mutex sync.Mutex
- return func(payload []byte) {
- mutex.Lock()
- defer mutex.Unlock()
- if closed {
- return
- }
-
- closeFn()
- channel <- payload
- close(channel)
- closed = true
- }
-}
-
-func runConsumer(onPing func(func(struct{})), consumeAllFn func(struct{})) {
- consumeAllFn(struct{}{})
- onPing(consumeAllFn)
-}
-
-func tryFinding(
- findFn func(string, uuid.UUID) (messageT, error),
- topic string,
- flowID uuid.UUID,
- waitFn func([]byte),
-) {
- message, err := findFn(topic, flowID)
- if err != nil {
- return
- }
-
- waitFn(message.payload)
-}
-
-func (queue queueT) Subscribe(
- topic string,
- name string,
- callback func(Message) error,
-) error {
- data := consumerDataT{
- topic: topic,
- name: name,
- }
- pinger := newPinger[struct{}]()
- consumer := consumerT{
- data: data,
- callback: callback,
- pinger: pinger,
- close: &pinger.close,
- }
- consumeOneFn := makeConsumeOneFn(
- consumer.data,
- consumer.callback,
- queue.queries.commit,
- queue.queries.toDead,
- )
- consumeAllFn := makeConsumeAllFn(
- consumer.data,
- consumeOneFn,
- queue.queries.pending,
- )
-
- err := queue.queries.take(topic, name)
- if err != nil {
- return err
- }
-
- queue.subscriptions.write(registerConsumerFn(consumer))
- go runConsumer(pinger.onPing, consumeAllFn)
- return nil
-}
-
-type Waiter struct{
- Channel <-chan []byte
- Close func()
-}
-
-func (queue queueT) WaitFor(
- topic string,
- flowID uuid.UUID,
- name string,
-) Waiter {
- data := waiterDataT{
- topic: topic,
- flowID: flowID,
- name: name,
- }
- pinger := newPinger[[]byte]()
- waiter := waiterT{
- data: data,
- pinger: pinger,
- closed: &pinger.closed,
- close: &pinger.close,
- }
- channel := make(chan []byte, 1)
- waitFn := makeWaitFn(channel, (*waiter.close))
- closeFn := func() {
- queue.subscriptions.read(func(set subscriptionsSetM) error {
- (*set[topic].waiters[flowID][name].close)()
- return nil
- })
- }
-
- queue.subscriptions.write(registerWaiterFn(waiter))
- tryFinding(queue.queries.find, topic, flowID, waitFn)
- go pinger.onPing(waitFn)
- return Waiter{channel, closeFn}
-}
-
-func unsubscribeIfExistsFn(
- topic string,
- name string,
-) func(subscriptionsSetM) error {
- return func(set subscriptionsSetM) error {
- topicSub, ok := set[topic]
- if !ok {
- return nil
- }
-
- consumer, ok := topicSub.consumers[name]
- if !ok {
- return nil
- }
-
- (*consumer.close)()
- delete(set[topic].consumers, name)
- deleteIfEmpty(set, topic)
- return nil
- }
-}
-
-func (queue queueT) Unsubscribe(topic string, name string) {
- queue.subscriptions.write(unsubscribeIfExistsFn(topic, name))
-}
-
-func cleanSubscriptions(set subscriptionsSetM) error {
- for _, topicSub := range set {
- for _, consumer := range topicSub.consumers {
- (*consumer.close)()
- }
- for _, waiters := range topicSub.waiters {
- for _, waiter := range waiters {
- (*waiter.close)()
- }
- }
- }
- return nil
-}
-
-func (queue queueT) Close() error {
- queue.pinger.close()
- return g.WrapErrors(
- queue.subscriptions.write(cleanSubscriptions),
- queue.queries.close(),
- )
-}
-
-
-func topicGetopt(args argsT, w io.Writer) (argsT, bool) {
- if len(args.args) == 0 {
- fmt.Fprintf(w, "Missing TOPIC.\n")
- return args, false
- }
-
- args.topic = args.args[0]
- return args, true
-}
-
-func topicConsumerGetopt(args argsT, w io.Writer) (argsT, bool) {
- fs := flag.NewFlagSet("", flag.ContinueOnError)
- fs.Usage = func() {}
- fs.SetOutput(w)
-
- consumer := fs.String(
- "C",
- "default-consumer",
- "The name of the consumer to be used",
- )
-
- if fs.Parse(args.args) != nil {
- return args, false
- }
-
- subArgs := fs.Args()
- if len(subArgs) == 0 {
- fmt.Fprintf(w, "Missing TOPIC.\n")
- return args, false
- }
-
- args.consumer = *consumer
- args.topic = subArgs[0]
- return args, true
-}
-
-func inExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- payload, err := io.ReadAll(r)
- if err != nil {
- return 1, err
- }
-
- unsent := UnsentMessage{
- Topic: args.topic,
- FlowID: uuid.New(),
- Payload: payload,
- }
- message, err := queries.publish(unsent, uuid.New())
- if err != nil {
- return 1, err
- }
-
- fmt.Fprintf(w, "%s\n", message.uuid.String())
-
- return 0, nil
-}
-
-func outExec(
- args argsT,
- queries queriesT,
- _ io.Reader,
- w io.Writer,
-) (int, error) {
- err := queries.take(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- message, err := queries.next(args.topic, args.consumer)
-
- if err == sql.ErrNoRows {
- return 3, nil
- }
-
- if err != nil {
- return 1, err
- }
-
- fmt.Fprintln(w, string(message.payload))
-
- return 0, nil
-}
-
-func commitExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- err := queries.take(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- message, err := queries.next(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- err = queries.commit(args.consumer, message.uuid)
- if err != nil {
- return 1, err
- }
-
- return 0, nil
-}
-
-func deadExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- err := queries.take(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- message, err := queries.next(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- err = queries.toDead(args.consumer, message.uuid, uuid.New())
- if err != nil {
- return 1, err
- }
-
- return 0, nil
-}
-
-func listDeadExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- eachFn := func(deadletter deadletterT, _ messageT) error {
- fmt.Fprintf(
- w,
- "%s\t%s\t%s\n",
- deadletter.uuid.String(),
- deadletter.timestamp.Format(time.RFC3339),
- deadletter.consumer,
- )
- return nil
- }
-
- err := queries.allDead(args.topic, args.consumer, eachFn)
- if err != nil {
- return 1, err
- }
-
- return 0, nil
-}
-
-func replayExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- deadletter, err := queries.oneDead(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- _, err = queries.replay(deadletter.uuid, uuid.New())
- if err != nil {
- return 1, err
- }
-
- return 0, nil
-}
-
-func sizeExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- size, err := queries.size(args.topic)
- if err != nil {
- return 1, err
- }
-
- fmt.Fprintln(w, size)
-
- return 0, nil
-}
-
-func countExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- count, err := queries.count(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- fmt.Fprintln(w, count)
-
- return 0, nil
-}
-
-func hasDataExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- hasData, err := queries.hasData(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- if hasData {
- return 0, nil
- } else {
- return 1, nil
- }
-}
-
-func usage(argv0 string, w io.Writer) {
- fmt.Fprintf(
- w,
- "Usage: %s [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n",
- argv0,
- )
-}
-
-func getopt(
- allArgs []string,
- commandsMap map[string]commandT,
- w io.Writer,
-) (argsT, commandT, int) {
- argv0 := allArgs[0]
- argv := allArgs[1:]
- fs := flag.NewFlagSet("", flag.ContinueOnError)
- fs.Usage = func() {}
- fs.SetOutput(w)
- databasePath := fs.String(
- "f",
- "fiinha.db",
- "The path to the file where the queue is kept",
- )
- prefix := fs.String(
- "p",
- defaultPrefix,
- "The fiinha prefix of the table names",
- )
- if fs.Parse(argv) != nil {
- usage(argv0, w)
- return argsT{}, commandT{}, 2
- }
-
- subArgs := fs.Args()
- if len(subArgs) == 0 {
- fmt.Fprintf(w, "Missing COMMAND.\n")
- usage(argv0, w)
- return argsT{}, commandT{}, 2
- }
-
- args := argsT{
- databasePath: *databasePath,
- prefix: *prefix,
- command: subArgs[0],
- allArgs: allArgs,
- args: subArgs[1:],
- }
-
- command := commandsMap[args.command]
- if command.name == "" {
- fmt.Fprintf(w, "Bad COMMAND: \"%s\".\n", args.command)
- usage(argv0, w)
- return argsT{}, commandT{}, 2
- }
-
- args, ok := command.getopt(args, w)
- if !ok {
- usage(argv0, w)
- return argsT{}, commandT{}, 2
- }
-
- return args, command, 0
-}
-
-func runCommand(
- args argsT,
- command commandT,
- stdin io.Reader,
- stdout io.Writer,
- stderr io.Writer,
-) int {
- iqueue, err := NewWithPrefix(args.databasePath, args.prefix)
- if err != nil {
- fmt.Fprintln(stderr, err)
- return 1
- }
- defer iqueue.Close()
-
- rc, err := command.exec(args, iqueue.(queueT).queries, stdin, stdout)
- if err != nil {
- fmt.Fprintln(stderr, err)
- }
-
- return rc
-}
-
-var commands = map[string]commandT{
- "in": commandT{
- name: "in",
- getopt: topicGetopt,
- exec: inExec,
- },
- "out": commandT{
- name: "out",
- getopt: topicConsumerGetopt,
- exec: outExec,
- },
- "commit": commandT{
- name: "commit",
- getopt: topicConsumerGetopt,
- exec: commitExec,
- },
- "dead": commandT{
- name: "dead",
- getopt: topicConsumerGetopt,
- exec: deadExec,
- },
- "ls-dead": commandT{
- name: "ls-dead",
- getopt: topicConsumerGetopt,
- exec: listDeadExec,
- },
- "replay": commandT{
- name: "replay",
- getopt: topicConsumerGetopt,
- exec: replayExec,
- },
- "size": commandT{
- name: "size",
- getopt: topicGetopt,
- exec: sizeExec,
- },
- "count": commandT{
- name: "count",
- getopt: topicConsumerGetopt,
- exec: countExec,
- },
- "has-data": commandT{
- name: "has-data",
- getopt: topicConsumerGetopt,
- exec: hasDataExec,
- },
-}
-
-
-
-func Main() {
- g.Init()
- args, command, rc := getopt(os.Args, commands, os.Stderr)
- if rc != 0 {
- os.Exit(rc)
- }
- os.Exit(runCommand(args, command, os.Stdin, os.Stdout, os.Stderr))
-}
diff --git a/src/main.go b/src/main.go
deleted file mode 100644
index af67e07..0000000
--- a/src/main.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package main
-
-import "fiinha"
-
-func main() {
- fiinha.Main()
-}
diff --git a/tests/benchmarks/deadletters/fiinha.go b/tests/benchmarks/deadletters/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/deadletters/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/deadletters/main.go b/tests/benchmarks/deadletters/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/deadletters/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/lookup/fiinha.go b/tests/benchmarks/lookup/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/lookup/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/lookup/main.go b/tests/benchmarks/lookup/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/lookup/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/multiple-consumers/fiinha.go b/tests/benchmarks/multiple-consumers/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/multiple-consumers/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/multiple-consumers/main.go b/tests/benchmarks/multiple-consumers/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/multiple-consumers/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/multiple-produces/fiinha.go b/tests/benchmarks/multiple-produces/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/multiple-produces/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/multiple-produces/main.go b/tests/benchmarks/multiple-produces/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/multiple-produces/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/reaper/fiinha.go b/tests/benchmarks/reaper/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/reaper/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/reaper/main.go b/tests/benchmarks/reaper/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/reaper/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/replay/fiinha.go b/tests/benchmarks/replay/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/replay/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/replay/main.go b/tests/benchmarks/replay/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/replay/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/single-consumer/fiinha.go b/tests/benchmarks/single-consumer/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/single-consumer/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/single-consumer/main.go b/tests/benchmarks/single-consumer/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/single-consumer/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/single-producer/fiinha.go b/tests/benchmarks/single-producer/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/single-producer/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/single-producer/main.go b/tests/benchmarks/single-producer/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/single-producer/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/subscribe/fiinha.go b/tests/benchmarks/subscribe/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/subscribe/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/subscribe/main.go b/tests/benchmarks/subscribe/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/subscribe/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/unsubscribe/fiinha.go b/tests/benchmarks/unsubscribe/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/unsubscribe/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/unsubscribe/main.go b/tests/benchmarks/unsubscribe/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/unsubscribe/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/waiter/fiinha.go b/tests/benchmarks/waiter/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/waiter/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/waiter/main.go b/tests/benchmarks/waiter/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/waiter/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/cli-opts.sh b/tests/cli-opts.sh
deleted file mode 100755
index fcb62ca..0000000
--- a/tests/cli-opts.sh
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/sh
-set -eu
-
-exit
diff --git a/tests/fiinha.go b/tests/fiinha.go
deleted file mode 100644
index 0901190..0000000
--- a/tests/fiinha.go
+++ /dev/null
@@ -1,5889 +0,0 @@
-package fiinha
-
-import (
- "bytes"
- "database/sql"
- "errors"
- "fmt"
- "io"
- "log/slog"
- "os"
- "reflect"
- "sort"
- "strings"
- "sync"
- "time"
-
- "golite"
- "uuid"
- g "gobang"
-)
-
-
-
-var instanceID = os.Getpid()
-
-
-
-func test_defaultPrefix() {
- g.TestStart("defaultPrefix")
-
- g.Testing("the defaultPrefix is valid", func() {
- g.TErrorIf(g.ValidateSQLTablePrefix(defaultPrefix))
- })
-}
-
-func test_tryRollback() {
- g.TestStart("tryRollback()")
-
- myErr := errors.New("bottom error")
-
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
-
-
- g.Testing("the error is propagated if rollback doesn't fail", func() {
- tx, err := db.Begin()
- g.TErrorIf(err)
-
- err = tryRollback(tx, myErr)
- g.TAssertEqual(err, myErr)
- })
-
- g.Testing("a wrapped error when rollback fails", func() {
- tx, err := db.Begin()
- g.TErrorIf(err)
-
- err = tx.Commit()
- g.TErrorIf(err)
-
- err = tryRollback(tx, myErr)
- g.TAssertEqual(reflect.DeepEqual(err, myErr), false)
- g.TAssertEqual(errors.Is(err, myErr), true)
- })
-}
-
-func test_inTx() {
- g.TestStart("inTx()")
-
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
-
-
- g.Testing("when fn() errors, we propagate it", func() {
- myErr := errors.New("to be propagated")
- err := inTx(db, func(tx *sql.Tx) error {
- return myErr
- })
- g.TAssertEqual(err, myErr)
- })
-
- g.Testing("on nil error we get nil", func() {
- err := inTx(db, func(tx *sql.Tx) error {
- return nil
- })
- g.TErrorIf(err)
- })
-}
-
-func test_serialized() {
- // FIXME
-}
-
-func test_execSerialized() {
- // FIXME
-}
-
-func test_createTables() {
- g.TestStart("createTables()")
-
- const (
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- defer db.Close()
-
-
- g.Testing("tables exist afterwards", func() {
- const tmpl_read = `
- SELECT id FROM "%s_messages" LIMIT 1;
- `
- qRead := fmt.Sprintf(tmpl_read, prefix)
-
- _, err := db.Exec(qRead)
- g.TErrorNil(err)
-
- err = createTables(db, prefix)
- g.TErrorIf(err)
-
- _, err = db.Exec(qRead)
- g.TErrorIf(err)
- })
-
- g.Testing("we can do it multiple times", func() {
- g.TErrorIf(g.SomeError(
- createTables(db, prefix),
- createTables(db, prefix),
- createTables(db, prefix),
- ))
- })
-}
-
-func test_takeStmt() {
- g.TestStart("takeStmt()")
-
- const (
- topic = "take() topic"
- consumer = "take() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- g.TErrorIf(takeErr)
- defer g.SomeFnError(
- takeClose,
- db.Close,
- )
-
- const tmpl = `
- SELECT owner_id from "%s_owners"
- WHERE
- topic = ? AND
- consumer = ?;
- `
- sqlOwner := fmt.Sprintf(tmpl, prefix)
-
-
- g.Testing("when there is no owner, we become it", func() {
- var ownerID int
- err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
- g.TAssertEqual(err, sql.ErrNoRows)
-
- err = take(topic, consumer)
- g.TErrorIf(err)
-
- err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
- g.TErrorIf(err)
- g.TAssertEqual(ownerID, instanceID)
- })
-
- g.Testing("if there is already an owner, we overtake it", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- var ownerID int
- err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
- g.TErrorIf(err)
- g.TAssertEqual(ownerID, instanceID)
-
- err = take(topic, consumer)
- g.TErrorIf(err)
-
- err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
- g.TErrorIf(err)
- g.TAssertEqual(ownerID, otherCfg.instanceID)
- })
- g.Testing("no error if closed more than once", func() {
- g.TErrorIf(g.SomeError(
- takeClose(),
- takeClose(),
- takeClose(),
- ))
- })
-}
-
-func test_publishStmt() {
- g.TestStart("publishStmt()")
-
- const (
- topic = "publish() topic"
- payloadStr = "publish() payload"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- publish, publishClose, publishErr := publishStmt(cfg)
- g.TErrorIf(publishErr)
- defer g.SomeFnError(
- publishClose,
- db.Close,
- )
-
-
- g.Testing("we can publish a message", func() {
- messageID := uuid.New()
- message, err := publish(unsent, messageID)
- g.TErrorIf(err)
-
- g.TAssertEqual(message.id, int64(1))
- g.TAssertEqual(message.uuid, messageID)
- g.TAssertEqual(message.topic, topic)
- g.TAssertEqual(message.flowID, flowID)
- g.TAssertEqual(message.payload, payload)
- })
-
- g.Testing("we can publish the same message repeatedly", func() {
- messageID1 := uuid.New()
- messageID2 := uuid.New()
- message1, err1 := publish(unsent, messageID1)
- message2, err2 := publish(unsent, messageID2)
- g.TErrorIf(g.SomeError(err1, err2))
-
- g.TAssertEqual(message1.id, message2.id - 1)
- g.TAssertEqual(message1.topic, message2.topic)
- g.TAssertEqual(message1.flowID, message2.flowID)
- g.TAssertEqual(message1.payload, message2.payload)
-
- g.TAssertEqual(message1.uuid, messageID1)
- g.TAssertEqual(message2.uuid, messageID2)
- })
-
- g.Testing("publishing a message with the same UUID errors", func() {
- messageID := uuid.New()
- message1, err1 := publish(unsent, messageID)
- _, err2 := publish(unsent, messageID)
- g.TErrorIf(err1)
-
- g.TAssertEqual(message1.uuid, messageID)
- g.TAssertEqual(message1.topic, topic)
- g.TAssertEqual(message1.flowID, flowID)
- g.TAssertEqual(message1.payload, payload)
-
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- publishClose(),
- publishClose(),
- publishClose(),
- ))
- })
-}
-
-func test_findStmt() {
- g.TestStart("findStmt()")
-
- const (
- topic = "find() topic"
- payloadStr = "find() payload"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- publish, publishClose, publishErr := publishStmt(cfg)
- find, findClose, findErr := findStmt(cfg)
- g.TErrorIf(g.SomeError(
- publishErr,
- findErr,
- ))
- defer g.SomeFnError(
- publishClose,
- findClose,
- db.Close,
- )
-
- pub := func(flowID uuid.UUID) uuid.UUID {
- unsentWithFlowID := unsent
- unsentWithFlowID.FlowID = flowID
- messageID := uuid.New()
- _, err := publish(unsentWithFlowID, messageID)
- g.TErrorIf(err)
- return messageID
- }
-
-
- g.Testing("we can find a message by topic and flowID", func() {
- flowID := uuid.New()
- messageID := pub(flowID)
- message, err := find(topic, flowID)
- g.TErrorIf(err)
-
- g.TAssertEqual(message.uuid, messageID)
- g.TAssertEqual(message.topic, topic)
- g.TAssertEqual(message.flowID, flowID)
- g.TAssertEqual(message.payload, payload)
- })
-
- g.Testing("a non-existent message gives us an error", func() {
- message, err := find(topic, uuid.New())
- g.TAssertEqual(message, messageT{})
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("findig twice yields the exact same message", func() {
- flowID := uuid.New()
- messageID := pub(flowID)
- message1, err1 := find(topic, flowID)
- message2, err2 := find(topic, flowID)
- g.TErrorIf(g.SomeError(err1, err2))
-
- g.TAssertEqual(message1.uuid, messageID)
- g.TAssertEqual(message1, message2)
- })
-
- g.Testing("returns the latest entry if multiple are available", func() {
- flowID := uuid.New()
-
- _ , err0 := find(topic, flowID)
- pub(flowID)
- message1, err1 := find(topic, flowID)
- pub(flowID)
- message2, err2 := find(topic, flowID)
-
- g.TAssertEqual(err0, sql.ErrNoRows)
- g.TErrorIf(g.SomeError(err1, err2))
- g.TAssertEqual(message1.uuid == message2.uuid, false)
- g.TAssertEqual(message1.id < message2.id, true)
- })
-
- g.Testing("no error if closed more than once", func() {
- g.TErrorIf(g.SomeError(
- findClose(),
- findClose(),
- findClose(),
- ))
- })
-}
-
-func test_nextStmt() {
- g.TestStart("nextStmt()")
-
- const (
- topic = "next() topic"
- payloadStr = "next() payload"
- consumer = "next() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- next, nextClose, nextErr := nextStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- g.TErrorIf(takeErr)
- g.TErrorIf(publishErr)
- g.TErrorIf(nextErr)
- g.TErrorIf(commitErr)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- nextErr,
- commitErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- nextClose,
- commitClose,
- db.Close,
- )
-
- pub := func(topic string) messageT {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message
- }
-
-
- g.Testing("we get an error on empty topic", func() {
- _, err := next(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("we don't get messages from other topics", func() {
- pub("other topic")
- _, err := next(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("we can get the next message", func() {
- expectedMessage := pub(topic)
- pub(topic)
- pub(topic)
- message, err := next(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(message, expectedMessage)
- })
-
- g.Testing("we keep getting the next until we commit", func() {
- message1, err1 := next(topic, consumer)
- message2, err2 := next(topic, consumer)
- g.TErrorIf(commit(consumer, message1.uuid))
- message3, err3 := next(topic, consumer)
- g.TErrorIf(g.SomeError(err1, err2, err3))
-
- g.TAssertEqual(message1, message2)
- g.TAssertEqual(message2.uuid != message3.uuid, true)
- })
-
- g.Testing("each consumer has its own next message", func() {
- g.TErrorIf(take(topic, "other consumer"))
- message1, err1 := next(topic, consumer)
- message2, err2 := next(topic, "other consumer")
- g.TErrorIf(g.SomeError(err1, err2))
- g.TAssertEqual(message1.uuid != message2.uuid, true)
- })
-
- g.Testing("error when we're not the owner", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- _, err := next(topic, consumer)
- g.TErrorIf(err)
-
- err = take(topic, consumer)
- g.TErrorIf(err)
-
- _, err = next(topic, consumer)
- g.TAssertEqual(err, fmt.Errorf(
- notOwnerErrorFmt,
- otherCfg.instanceID,
- topic,
- consumer,
- instanceID,
- ))
- })
-
- g.Testing("we can close more than once", func() {
- g.TErrorIf(g.SomeError(
- nextClose(),
- nextClose(),
- nextClose(),
- ))
- })
-}
-
-func test_messageEach() {
- g.TestStart("messageEach()")
-
- const (
- topic = "messageEach() topic"
- payloadStr = "messageEach() payload"
- consumer = "messageEach() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- pending, pendingClose, pendingErr := pendingStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- pendingErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- pendingClose,
- db.Close,
- )
-
- pub := func() uuid.UUID {
- message, err := publish(unsent, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
- g.TErrorIf(take(topic, consumer))
-
-
- g.Testing("not called on empty set", func() {
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- messageEach(rows, func(messageT) error {
- g.Unreachable()
- return nil
- })
- })
-
- g.Testing("the callback is called once for each entry", func() {
- messageIDs := []uuid.UUID{
- pub(),
- pub(),
- pub(),
- }
-
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- var collectedIDs []uuid.UUID
- err = messageEach(rows, func(message messageT) error {
- collectedIDs = append(collectedIDs, message.uuid)
- return nil
- })
- g.TErrorIf(err)
-
- g.TAssertEqual(collectedIDs, messageIDs)
- })
-
- g.Testing("we halt if the timestamp is ill-formatted", func() {
- messageID := pub()
- message_id_bytes := messageID[:]
- pub()
- pub()
- pub()
-
- const tmplUpdate = `
- UPDATE "%s_messages"
- SET timestamp = '01/01/1970'
- WHERE uuid = ?;
- `
- sqlUpdate := fmt.Sprintf(tmplUpdate, prefix)
- _, err := db.Exec(sqlUpdate, message_id_bytes)
- g.TErrorIf(err)
-
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- n := 0
- err = messageEach(rows, func(messageT) error {
- n++
- return nil
- })
-
- g.TAssertEqual(
- err,
- &time.ParseError{
- Layout: time.RFC3339Nano,
- Value: "01/01/1970",
- LayoutElem: "2006",
- ValueElem: "01/01/1970",
- Message: "",
- },
- )
- g.TAssertEqual(n, 3)
-
- const tmplDelete = `
- DELETE FROM "%s_messages"
- WHERE uuid = ?;
- `
- sqlDelete := fmt.Sprintf(tmplDelete, prefix)
- _, err = db.Exec(sqlDelete, message_id_bytes)
- g.TErrorIf(err)
- })
-
- g.Testing("we halt if the callback returns an error", func() {
- myErr := errors.New("callback error early return")
-
- rows1, err1 := pending(topic, consumer)
- g.TErrorIf(err1)
-
- n1 := 0
- err1 = messageEach(rows1, func(messageT) error {
- n1++
- if n1 == 4 {
- return myErr
- }
- return nil
- })
-
- rows2, err2 := pending(topic, consumer)
- g.TErrorIf(err2)
-
- n2 := 0
- err2 = messageEach(rows2, func(messageT) error {
- n2++
- return nil
- })
-
- g.TAssertEqual(err1, myErr)
- g.TErrorIf(err2)
- g.TAssertEqual(n1, 4)
- g.TAssertEqual(n2, 6)
- })
-
- g.Testing("noop when given nil for *sql.Rows", func() {
- err := messageEach(nil, func(messageT) error {
- g.Unreachable()
- return nil
- })
- g.TErrorIf(err)
- })
-}
-
-func test_pendingStmt() {
- g.TestStart("pendingStmt()")
-
- const (
- topic = "pending() topic"
- payloadStr = "pending() payload"
- consumer = "pending() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- pending, pendingClose, pendingErr := pendingStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- pendingErr,
- commitErr,
- toDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- pendingClose,
- commitClose,
- toDeadClose,
- db.Close,
- )
-
- pub := func(topic string) messageT {
- g.TErrorIf(take(topic, consumer))
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message
- }
- g.TErrorIf(take(topic, consumer))
-
- collectPending := func(topic string, consumer string) []messageT {
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- var messages []messageT
- err = messageEach(rows, func(message messageT) error {
- messages = append(messages, message)
- return nil
- })
- g.TErrorIf(err)
- return messages
- }
-
-
- g.Testing("an empty database has 0 pending items", func() {
- g.TAssertEqual(len(collectPending(topic, consumer)), 0)
- })
-
- g.Testing("after publishing we get all messages", func() {
- expected := []messageT{
- pub(topic),
- pub(topic),
- }
-
- g.TAssertEqualI(collectPending(topic, consumer), expected)
- })
-
- g.Testing("we get the same messages when calling again", func() {
- messages1 := collectPending(topic, consumer)
- messages2 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
- g.TAssertEqualI(messages1, messages2)
- })
-
- g.Testing("we don't get messages from other topics", func() {
- pub("other topic")
-
- g.TAssertEqual(len(collectPending(topic, consumer)), 2)
- g.TAssertEqual(len(collectPending("other topic", consumer)), 1)
- })
-
- g.Testing("after others commit, pending still returns them", func() {
- g.TErrorIf(take(topic, "other consumer"))
- messages1 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
- g.TErrorIf(
- commit("other consumer", messages1[0].uuid),
- )
-
- messages2 := collectPending(topic, consumer)
- g.TAssertEqualI(messages1, messages2)
- })
-
- g.Testing("committing other topic doesn't change current", func() {
- messages1 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
-
- message := pub("other topic")
-
- g.TErrorIf(commit(consumer, message.uuid))
-
- messages2 := collectPending(topic, consumer)
- g.TAssertEqualI(messages1, messages2)
- })
-
- g.Testing("after commiting, pending doesn't return them again", func() {
- messages1 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
-
- g.TErrorIf(commit(consumer, messages1[0].uuid))
-
- messages2 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages2), 1)
- g.TAssertEqual(messages2[0], messages1[1])
-
- g.TErrorIf(commit(consumer, messages1[1].uuid))
-
- messages3 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages3), 0)
- })
-
- g.Testing("on deadletter, pending also doesn't return them", func() {
- messages0 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages0), 0)
-
- message1 := pub(topic)
- message2 := pub(topic)
-
- messages1 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
-
- err = toDead(consumer, message1.uuid, uuid.New())
- g.TErrorIf(err)
-
- messages2 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages2), 1)
- g.TAssertEqual(messages2[0], message2)
-
- err = toDead(consumer, message2.uuid, uuid.New())
- g.TErrorIf(err)
-
- messages3 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages3), 0)
- })
-
- g.Testing("if commits are unordered, pending is still sorted", func() {
- message1 := pub(topic)
- message2 := pub(topic)
- message3 := pub(topic)
-
- g.TAssertEqual(collectPending(topic, consumer), []messageT{
- message1,
- message2,
- message3,
- })
-
- g.TErrorIf(commit(consumer, message2.uuid))
- g.TAssertEqual(collectPending(topic, consumer), []messageT{
- message1,
- message3,
- })
-
- g.TErrorIf(commit(consumer, message1.uuid))
- g.TAssertEqual(collectPending(topic, consumer), []messageT{
- message3,
- })
-
- g.TErrorIf(commit(consumer, message3.uuid))
- g.TAssertEqual(len(collectPending(topic, consumer)), 0)
- })
-
- g.Testing("when we're not the owners we get nothing", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- message1 := pub(topic)
- message2 := pub(topic)
- message3 := pub(topic)
- message4 := pub(topic)
- message5 := pub(topic)
-
- expected := []messageT{
- message1,
- message2,
- message3,
- message4,
- message5,
- }
-
- g.TAssertEqual(collectPending(topic, consumer), expected)
-
- err := take(topic, consumer)
- g.TErrorIf(err)
-
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- err = messageEach(rows, func(messageT) error {
- g.Unreachable()
- return nil
- })
- g.TErrorIf(err)
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- pendingClose(),
- pendingClose(),
- pendingClose(),
- ))
- })
-}
-
-func test_commitStmt() {
- g.TestStart("commitStmt()")
-
- const (
- topic = "commit() topic"
- payloadStr = "commit() payload"
- consumer = "commit() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- commitErr,
- toDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- commitClose,
- toDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
- cmt := func(consumer string, messageID uuid.UUID) error {
- g.TErrorIf(take(topic, consumer))
-
- return commit(consumer, messageID)
- }
-
-
- g.Testing("we can't commit twice", func() {
- messageID := pub(topic)
-
- err1 := cmt(consumer, messageID)
- err2 := cmt(consumer, messageID)
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("we can't commit non-existent messages", func() {
- err := cmt(consumer, uuid.New())
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintNotNull,
- )
- })
-
- g.Testing("multiple consumers may commit a message", func() {
- messageID := pub(topic)
-
- g.TErrorIf(g.SomeError(
- cmt(consumer, messageID),
- cmt("other consumer", messageID),
- cmt("yet another consumer", messageID),
- ))
- })
-
- g.Testing("a consumer can commit to multiple topics", func() {
- messageID1 := pub(topic)
- messageID2 := pub("other topic")
- messageID3 := pub("yet another topic")
-
- g.TErrorIf(g.SomeError(
- cmt(consumer, messageID1),
- cmt(consumer, messageID2),
- cmt(consumer, messageID3),
- ))
- })
-
- g.Testing("a consumer can consume many messages from a topic", func() {
- messageID1 := pub(topic)
- messageID2 := pub(topic)
- messageID3 := pub(topic)
-
- g.TErrorIf(g.SomeError(
- cmt(consumer, messageID1),
- cmt(consumer, messageID2),
- cmt(consumer, messageID3),
- ))
- })
-
- g.Testing("we can't commit a dead message", func() {
- messageID := pub(topic)
-
- err1 := toDead(consumer, messageID, uuid.New())
- err2 := cmt(consumer, messageID)
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("error if we don't own the topic/consumer", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- messageID := pub(topic)
-
- err := take(topic, consumer)
- g.TErrorIf(err)
-
- err = commit(consumer, messageID)
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintTrigger,
- )
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- commitClose(),
- commitClose(),
- commitClose(),
- ))
- })
-}
-
-func test_toDeadStmt() {
- g.TestStart("toDeadStmt()")
-
- const (
- topic = "toDead() topic"
- payloadStr = "toDead() payload"
- consumer = "toDead() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- commitErr,
- toDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- commitClose,
- toDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
- asDead := func(
- consumer string,
- messageID uuid.UUID,
- deadletterID uuid.UUID,
- ) error {
- g.TErrorIf(take(topic, consumer))
- return toDead(consumer, messageID, deadletterID)
- }
-
-
- g.Testing("we can't mark as dead twice", func() {
- messageID := pub(topic)
-
- err1 := asDead(consumer, messageID, uuid.New())
- err2 := asDead(consumer, messageID, uuid.New())
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("we can't reuse a deadletter id", func() {
- messageID1 := pub(topic)
- messageID2 := pub(topic)
- deadletterID := uuid.New()
-
- err1 := asDead(consumer, messageID1, deadletterID)
- err2 := asDead(consumer, messageID2, deadletterID)
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
-
- })
-
- g.Testing("we can't mark as dead non-existent messages", func() {
- err := asDead(consumer, uuid.New(), uuid.New())
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintNotNull,
- )
- })
-
- g.Testing("multiple consumers may mark a message as dead", func() {
- messageID := pub(topic)
-
- g.TErrorIf(g.SomeError(
- asDead(consumer, messageID, uuid.New()),
- asDead("another consumer", messageID, uuid.New()),
- asDead("yet another consumer", messageID, uuid.New()),
- ))
- })
-
- g.Testing("a consumer can mark as dead in multiple topics", func() {
- messageID1 := pub(topic)
- messageID2 := pub("other topic")
- messageID3 := pub("yet other topic")
-
- g.TErrorIf(g.SomeError(
- asDead(consumer, messageID1, uuid.New()),
- asDead(consumer, messageID2, uuid.New()),
- asDead(consumer, messageID3, uuid.New()),
- ))
- })
-
- g.Testing("a consumer can produce many deadletters in a topic", func() {
- messageID1 := pub(topic)
- messageID2 := pub(topic)
- messageID3 := pub(topic)
-
- g.TErrorIf(g.SomeError(
- asDead(consumer, messageID1, uuid.New()),
- asDead(consumer, messageID2, uuid.New()),
- asDead(consumer, messageID3, uuid.New()),
- ))
- })
-
- g.Testing("a consumer can intercalate commits and deadletters", func() {
- messageID1 := pub(topic)
- messageID2 := pub(topic)
- messageID3 := pub(topic)
- messageID4 := pub(topic)
- messageID5 := pub(topic)
-
- g.TErrorIf(g.SomeError(
- asDead(consumer, messageID1, uuid.New()),
- commit(consumer, messageID2),
- commit(consumer, messageID3),
- asDead(consumer, messageID4, uuid.New()),
- commit(consumer, messageID5),
- ))
- })
-
- g.Testing("we can't mark a committed message as dead", func() {
- messageID := pub(topic)
-
- err1 := commit(consumer, messageID)
- err2 := asDead(consumer, messageID, uuid.New())
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("error if we don't own the message's consumer/topic", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- messageID1 := pub(topic)
- messageID2 := pub(topic)
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- err := toDead(consumer, messageID1, uuid.New())
- g.TErrorIf(err)
-
- err = take(topic, consumer)
- g.TErrorIf(err)
-
- err = toDead(consumer, messageID2, uuid.New())
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintTrigger,
- )
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- toDeadClose(),
- toDeadClose(),
- toDeadClose(),
- ))
- })
-}
-
-func test_replayStmt() {
- g.TestStart("replayStmt()")
-
- const (
- topic = "replay() topic"
- payloadStr = "replay() payload"
- consumer = "replay() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- replayErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- replayClose,
- db.Close,
- )
-
- pub := func() messageT {
- message, err := publish(unsent, uuid.New())
- g.TErrorIf(err)
- return message
- }
- g.TErrorIf(take(topic, consumer))
-
-
- g.Testing("we can replay a message", func() {
- message := pub()
- deadletterID := uuid.New()
- replayedID := uuid.New()
-
- err1 := toDead(consumer, message.uuid, deadletterID)
- replayed, err2 := replay(deadletterID, replayedID)
- g.TErrorIf(g.SomeError(err1, err2))
-
- g.TAssertEqual(replayed.uuid, replayedID)
- g.TAssertEqual(replayed.id == message.id, false)
- g.TAssertEqual(replayed.uuid == message.uuid, false)
- })
-
- g.Testing("a replayed message keeps its payload", func() {
- message := pub()
- deadletterID := uuid.New()
- err := toDead(consumer, message.uuid, deadletterID)
- g.TErrorIf(err)
-
- replayed, err := replay(deadletterID, uuid.New())
- g.TErrorIf(err)
- g.TAssertEqual(message.flowID, replayed.flowID)
- g.TAssertEqual(message.payload, replayed.payload)
- })
-
- g.Testing("we can't replay a dead message twice", func() {
- message := pub()
- deadletterID := uuid.New()
-
- err := toDead(consumer, message.uuid, deadletterID)
- g.TErrorIf(err)
-
- _, err1 := replay(deadletterID, uuid.New())
- _, err2 := replay(deadletterID, uuid.New())
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("we cant replay non-existent messages", func() {
- _, err := replay(uuid.New(), uuid.New())
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintNotNull,
- )
- })
-
- g.Testing("messages can die and then be replayed many times", func() {
- message := pub()
- deadletterID1 := uuid.New()
- deadletterID2 := uuid.New()
-
- err := toDead(consumer, message.uuid, deadletterID1)
- g.TErrorIf(err)
-
- replayed1, err := replay(deadletterID1, uuid.New())
- g.TErrorIf(err)
-
- err = toDead(consumer, replayed1.uuid, deadletterID2)
- g.TErrorIf(err)
-
- replayed2, err := replay(deadletterID2, uuid.New())
- g.TErrorIf(err)
-
- g.TAssertEqual(message.flowID, replayed1.flowID)
- g.TAssertEqual(replayed1.flowID, replayed2.flowID)
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- replayClose(),
- replayClose(),
- replayClose(),
- ))
- })
-}
-
-func test_oneDeadStmt() {
- g.TestStart("oneDeadStmt()")
-
- const (
- topic = "oneDead() topic"
- payloadStr = "oneDead() payload"
- consumer = "oneDead() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- replayErr,
- oneDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- replayClose,
- oneDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
-
- g.Testing("error on missing deadletters", func() {
- _, err := oneDead(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("deadletters on other topics don't show for us", func() {
- err := toDead(consumer, pub("other topic"), uuid.New())
- g.TErrorIf(err)
-
- _, err = oneDead(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("deadletters for other consumers don't show for use", func() {
- g.TErrorIf(take(topic, "other consumer"))
- err := toDead("other consumer", pub(topic), uuid.New())
- g.TErrorIf(err)
-
- _, err = oneDead(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("after being replayed deadletters aren't returned", func() {
- messageID1 := uuid.New()
- messageID2 := uuid.New()
- messageID3 := uuid.New()
-
- err1 := toDead(consumer, pub(topic), messageID1)
- err2 := toDead(consumer, pub(topic), messageID2)
- err3 := toDead(consumer, pub(topic), messageID3)
- g.TErrorIf(g.SomeError(err1, err2, err3))
-
- deadletter, err := oneDead(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(deadletter.uuid, messageID1)
-
- _, err = replay(messageID2, uuid.New())
- g.TErrorIf(err)
-
- deadletter, err = oneDead(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(deadletter.uuid, messageID1)
-
- _, err = replay(messageID1, uuid.New())
- g.TErrorIf(err)
-
- deadletter, err = oneDead(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(deadletter.uuid, messageID3)
-
- _, err = replay(messageID3, uuid.New())
- g.TErrorIf(err)
-
- _, err = oneDead(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-}
-
-func test_deadletterEach() {
- g.TestStart("deadletterEach")
-
- const (
- topic = "deadletterEach() topic"
- payloadStr = "deadletterEach() payload"
- consumer = "deadletterEach() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- allDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- allDeadClose,
- db.Close,
- )
-
- pub := func() uuid.UUID {
- message, err := publish(unsent, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
- dead := func(messageID uuid.UUID) uuid.UUID {
- deadletterID := uuid.New()
- err := toDead(consumer, messageID, deadletterID)
- g.TErrorIf(err)
-
- return deadletterID
- }
- g.TErrorIf(take(topic, consumer))
-
-
- g.Testing("not called on empty set", func() {
- rows, err := allDead(topic, consumer)
- g.TErrorIf(err)
-
- n := 0
- deadletterEach(rows, func(deadletterT, messageT) error {
- n++
- return nil
- })
- })
-
- g.Testing("the callback is called once for each entry", func() {
- expected := []uuid.UUID{
- dead(pub()),
- dead(pub()),
- dead(pub()),
- }
-
- rows, err := allDead(topic, consumer)
- g.TErrorIf(err)
-
- var deadletterIDs []uuid.UUID
- deadletterEach(rows, func(
- deadletter deadletterT,
- _ messageT,
- ) error {
- deadletterIDs = append(deadletterIDs, deadletter.uuid)
- return nil
- })
-
- g.TAssertEqual(deadletterIDs, expected)
- })
-
- g.Testing("we halt if the timestamp is ill-formatted", func() {
- messageID := pub()
- message_id_bytes := messageID[:]
- dead(messageID)
- dead(pub())
- dead(pub())
- dead(pub())
- dead(pub())
-
- const tmplUpdate = `
- UPDATE "%s_offsets"
- SET timestamp = '01-01-1970'
- WHERE message_id IN (
- SELECT id FROM "%s_messages" WHERE uuid = ?
- );
- `
- sqlUpdate := fmt.Sprintf(tmplUpdate, prefix, prefix)
- _, err := db.Exec(sqlUpdate, message_id_bytes)
- g.TErrorIf(err)
-
- rows, err := allDead(topic, consumer)
- g.TErrorIf(err)
-
- n := 0
- err = deadletterEach(rows, func(deadletterT, messageT) error {
- n++
- return nil
- })
-
- g.TAssertEqual(
- err,
- &time.ParseError{
- Layout: time.RFC3339Nano,
- Value: "01-01-1970",
- LayoutElem: "2006",
- ValueElem: "01-01-1970",
- Message: "",
- },
- )
- g.TAssertEqual(n, 3)
-
- const tmplDelete = `
- DELETE FROM "%s_offsets"
- WHERE message_id IN (
- SELECT id FROM "%s_messages" WHERE uuid = ?
- );
- `
- sqlDelete := fmt.Sprintf(tmplDelete, prefix, prefix)
- _, err = db.Exec(sqlDelete, message_id_bytes)
- g.TErrorIf(err)
- })
-
- g.Testing("we halt if the callback returns an error", func() {
- myErr := errors.New("early return error")
-
- rows1, err1 := allDead(topic, consumer)
- g.TErrorIf(err1)
-
- n1 := 0
- err1 = deadletterEach(rows1, func(deadletterT, messageT) error {
- n1++
- if n1 == 1 {
- return myErr
- }
- return nil
- })
-
- rows2, err2 := allDead(topic, consumer)
- g.TErrorIf(err2)
-
- n2 := 0
- err = deadletterEach(rows2, func(deadletterT, messageT) error {
- n2++
- return nil
- })
-
- g.TAssertEqual(err1, myErr)
- g.TErrorIf(err2)
- g.TAssertEqual(n1, 1)
- g.TAssertEqual(n2, 7)
- })
-}
-
-func test_allDeadStmt() {
- g.TestStart("allDeadStmt()")
-
- const (
- topic = "allDead() topic"
- payloadStr = "allDead() payload"
- consumer = "allDead() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- replayErr,
- allDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- replayClose,
- allDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
- collectAll := func(
- topic string,
- consumer string,
- ) ([]deadletterT, []messageT) {
- var (
- deadletters []deadletterT
- messages []messageT
- )
- eachFn := func(
- deadletter deadletterT,
- message messageT,
- ) error {
- deadletters = append(deadletters, deadletter)
- messages = append(messages, message)
- return nil
- }
-
- rows, err := allDead(topic, consumer)
- g.TErrorIf(err)
-
- err = deadletterEach(rows, eachFn)
- g.TErrorIf(err)
-
- return deadletters, messages
- }
-
-
- g.Testing("no entry on empty deadletters", func() {
- deadletterIDs, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(deadletterIDs), 0)
- })
-
- g.Testing("deadletters on other topics don't show up", func() {
- err := toDead(consumer, pub("other topic"), uuid.New())
- g.TErrorIf(err)
-
- deadletters, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(deadletters), 0)
- })
-
- g.Testing("deadletters of other consumers don't show up", func() {
- g.TErrorIf(take(topic, "other consumer"))
- err := toDead("other consumer", pub(topic), uuid.New())
- g.TErrorIf(err)
-
- deadletterIDs, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(deadletterIDs), 0)
- })
-
- g.Testing("deadletters are given in order", func() {
- deadletterIDs := []uuid.UUID{
- uuid.New(),
- uuid.New(),
- uuid.New(),
- }
- messageIDs := []uuid.UUID{
- pub(topic),
- pub(topic),
- pub(topic),
- }
-
- err1 := toDead(consumer, messageIDs[0], deadletterIDs[0])
- err2 := toDead(consumer, messageIDs[1], deadletterIDs[1])
- err3 := toDead(consumer, messageIDs[2], deadletterIDs[2])
- g.TErrorIf(g.SomeError(err1, err2, err3))
-
- deadletters, messages := collectAll(topic, consumer)
- g.TAssertEqual(deadletters[0].uuid, deadletterIDs[0])
- g.TAssertEqual(deadletters[1].uuid, deadletterIDs[1])
- g.TAssertEqual(deadletters[2].uuid, deadletterIDs[2])
- g.TAssertEqual( messages[0].uuid, messageIDs[0])
- g.TAssertEqual( messages[1].uuid, messageIDs[1])
- g.TAssertEqual( messages[2].uuid, messageIDs[2])
- g.TAssertEqual(len(deadletters), 3)
- g.TAssertEqual(len(messages), 3)
- })
-
- g.Testing("after being replayed, they stop appearing", func() {
- deadletters, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(deadletters), 3)
-
- _, err := replay(deadletters[0].uuid, uuid.New())
- g.TErrorIf(err)
- collecteds, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(collecteds), 2)
-
- _, err = replay(deadletters[1].uuid, uuid.New())
- g.TErrorIf(err)
- collecteds, _ = collectAll(topic, consumer)
- g.TAssertEqual(len(collecteds), 1)
-
- _, err = replay(deadletters[2].uuid, uuid.New())
- g.TErrorIf(err)
- collecteds, _ = collectAll(topic, consumer)
- g.TAssertEqual(len(collecteds), 0)
- })
-}
-
-func test_sizeStmt() {
- g.TestStart("sizeStmt()")
-
- const (
- topic = "size() topic"
- payloadStr = "size() payload"
- consumer = "size() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
- size, sizeClose, sizeErr := sizeStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- replayErr,
- oneDeadErr,
- sizeErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- replayClose,
- sizeClose,
- oneDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
-
- g.Testing("0 on empty topic", func() {
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("other topics don't fall into our count", func() {
- pub("other topic")
-
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("otherwise we just get the sum", func() {
- pub(topic)
- pub(topic)
- pub(topic)
- pub(topic)
- pub(topic)
-
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 5)
- })
-
- g.Testing("deadletters aren't taken into account", func() {
- sixthMessageID := pub(topic)
- err := toDead(consumer, sixthMessageID, uuid.New())
- g.TErrorIf(err)
-
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 6)
- })
-
- g.Testing("after replay, deadletters increases the size", func() {
- deadletter, err := oneDead(topic, consumer)
- g.TErrorIf(err)
-
- _, err = replay(deadletter.uuid, uuid.New())
- g.TErrorIf(err)
-
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 7)
- })
-}
-
-func test_countStmt() {
- g.TestStart("countStmt()")
-
- const (
- topic = "count() topic"
- payloadStr = "count() payload"
- consumer = "count() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- next, nextClose, nextErr := nextStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- count, countClose, countErr := countStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- nextErr,
- commitErr,
- toDeadErr,
- countErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- nextClose,
- commitClose,
- toDeadClose,
- countClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
-
- g.Testing("0 on empty topic", func() {
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("other topics don't add to our count", func() {
- err := commit(consumer, pub("other topic"))
- g.TErrorIf(err)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("other consumers don't influence our count", func() {
- g.TErrorIf(take(topic, "other consumer"))
- err := commit("other consumer", pub(topic))
- g.TErrorIf(err)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("unconsumed messages don't count", func() {
- pub(topic)
- pub(topic)
- pub(topic)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("consumed messages do count", func() {
- message, err := next(topic, consumer)
- g.TErrorIf(err)
- err = commit(consumer, message.uuid)
- g.TErrorIf(err)
-
- message, err = next(topic, consumer)
- g.TErrorIf(err)
- err = commit(consumer, message.uuid)
- g.TErrorIf(err)
-
- message, err = next(topic, consumer)
- g.TErrorIf(err)
- err = commit(consumer, message.uuid)
- g.TErrorIf(err)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 3)
- })
-
- g.Testing("deadletters count as consumed", func() {
- message, err := next(topic, consumer)
- g.TErrorIf(err)
-
- err = toDead(consumer, message.uuid, uuid.New())
- g.TErrorIf(err)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 4)
- })
-}
-
-func test_hasDataStmt() {
- g.TestStart("hasDataStmt()")
-
- const (
- topic = "hasData() topic"
- payloadStr = "hasData() payload"
- consumer = "hasData() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- next, nextClose, nextErr := nextStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- hasData, hasDataClose, hasDataErr := hasDataStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- nextErr,
- commitErr,
- toDeadErr,
- hasDataErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- nextClose,
- commitClose,
- toDeadClose,
- hasDataClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
-
- g.Testing("false on empty topic", func() {
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, false)
- })
-
- g.Testing("other topics don't change the response", func() {
- pub("other topic")
-
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, false)
- })
-
- g.Testing("published messages flip the flag", func() {
- pub(topic)
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, true)
- })
-
- g.Testing("other consumers don't influence us", func() {
- g.TErrorIf(take(topic, "other consumer"))
- message, err := next(topic, "other consumer")
- g.TErrorIf(err)
-
- err = commit("other consumer", message.uuid)
- g.TErrorIf(err)
-
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, true)
- })
-
- g.Testing("consuming messages unflips the result", func() {
- message, err := next(topic, consumer)
- g.TErrorIf(err)
-
- err = commit(consumer, message.uuid)
- g.TErrorIf(err)
-
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, false)
- })
-
- g.Testing("same for deadletters", func() {
- has0, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has0, false)
-
- messageID1 := pub(topic)
- messageID2 := pub(topic)
-
- has1, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has1, true)
-
- err = toDead(consumer, messageID1, uuid.New())
- g.TErrorIf(err)
- err = commit(consumer, messageID2)
- g.TErrorIf(err)
-
- has2, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has2, false)
- })
-}
-
-func test_initDB() {
- g.TestStart("initDB()")
-
- const (
- topic = "initDB() topic"
- payloadStr = "initDB() payload"
- consumer = "initDB() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- var messages []messageT
- notifyFn := func(message messageT) {
- messages = append(messages, message)
- }
-
- queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
- g.TErrorIf(err)
- defer queries.close()
-
- g.TErrorIf(queries.take(topic, consumer))
-
-
- g.Testing("we can perform all the wrapped operations", func() {
- messageID := uuid.New()
- newMessageID := uuid.New()
- deadletterID := uuid.New()
-
- messageV1, err := queries.publish(unsent, messageID)
- g.TErrorIf(err)
-
- messageV2, err := queries.next(topic, consumer)
- g.TErrorIf(err)
-
- var messagesV3 []messageT
- pendingFn := func(message messageT) error {
- messagesV3 = append(messagesV3, message)
- return nil
- }
- err = queries.pending(topic, consumer, pendingFn)
- g.TErrorIf(err)
- g.TAssertEqual(len(messagesV3), 1)
- messageV3 := messagesV3[0]
-
- err = queries.toDead(consumer, messageID, deadletterID)
- g.TErrorIf(err)
-
- deadletterV1, err := queries.oneDead(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(deadletterV1.uuid, deadletterID)
-
- var (
- deadlettersV2 []deadletterT
- messagesV4 []messageT
- )
- deadletterFn := func(
- deadletter deadletterT,
- message messageT,
- ) error {
- deadlettersV2 = append(deadlettersV2, deadletter)
- messagesV4 = append(messagesV4, message)
- return nil
- }
- err = queries.allDead(topic, consumer, deadletterFn)
- g.TErrorIf(err)
- g.TAssertEqual(len(deadlettersV2), 1)
- g.TAssertEqual(deadlettersV2[0].uuid, deadletterID)
- g.TAssertEqual(len(messagesV4), 1)
- messageV4 := messagesV4[0]
-
- g.TAssertEqual(messageV1, messageV2)
- g.TAssertEqual(messageV1, messageV3)
- g.TAssertEqual(messageV1, messageV4)
-
- newMessageV1, err := queries.replay(deadletterID, newMessageID)
- g.TErrorIf(err)
-
- err = queries.commit(consumer, newMessageID)
- g.TErrorIf(err)
-
- newMessageV0 := messageV1
- newMessageV0.id = newMessageV1.id
- newMessageV0.uuid = newMessageID
- newMessageV0.timestamp = newMessageV1.timestamp
- g.TAssertEqual(newMessageV1, newMessageV0)
-
- size, err := queries.size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(size, 2)
-
- count, err := queries.count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(count, 2)
-
- hasData, err := queries.hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(hasData, false)
- })
-}
-
-func test_queriesTclose() {
- g.TestStart("queriesT.close()")
-
- const (
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
-
- notifyFn := func(messageT) {}
- queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
- g.TErrorIf(err)
-
-
- g.Testing("closing more than once does not error", func() {
- g.TErrorIf(g.SomeError(
- queries.close(),
- queries.close(),
- ))
- })
-}
-
-func test_newPinger() {
- g.TestStart("newPinger()")
-
- g.Testing("onPing() on a closed pinger is a noop", func() {
- pinger := newPinger[string]()
- pinger.close()
- pinger.onPing(func(s string) {
- panic(s)
- })
- pinger.tryPing("ignored")
- })
-
- g.Testing("onPing() on a closed pinger with data gets that", func() {
- pinger := newPinger[string]()
- pinger.tryPing("received")
- pinger.tryPing("ignored")
- g.TAssertEqual(pinger.closed(), false)
-
- pinger.close()
- g.TAssertEqual(pinger.closed(), true)
-
- c := make(chan string)
- count := 0
- go pinger.onPing(func(s string) {
- if count > 0 {
- panic(s)
- }
- count++
-
- c <- s
- })
-
- given := <- c
- g.TAssertEqual(given, "received")
- })
-
- g.Testing("when onPing is late, we loose messages", func() {
- pinger := newPinger[string]()
- pinger.tryPing("first seen")
- pinger.tryPing("second dropped")
- pinger.tryPing("third dropped")
-
- c := make(chan string)
- go pinger.onPing(func(s string) {
- c <- s
- })
- s := <- c
-
- close(c)
- pinger.close()
- g.TAssertEqual(s, "first seen")
- })
-
- g.Testing("if onPing is on time, it may not loose", func() {
- pinger := newPinger[string]()
- pinger.tryPing("first seen")
-
- c := make(chan string)
- go pinger.onPing(func(s string) {
- c <- s
- })
-
- s1 := <- c
- pinger.tryPing("second seen")
- s2 := <- c
-
- close(c)
- pinger.close()
- g.TAssertEqual(s1, "first seen")
- g.TAssertEqual(s2, "second seen")
- })
-
- g.Testing("if onPing takes too long, it still looses messages", func() {
- pinger := newPinger[string]()
- pinger.tryPing("first seen")
-
- c1 := make(chan string)
- c2 := make(chan struct{})
- go pinger.onPing(func(s string) {
- c1 <- s
- c2 <- struct{}{}
- })
-
- s1 := <- c1
- pinger.tryPing("second seen")
- pinger.tryPing("third dropped")
- <- c2
- s2 := <- c1
- <- c2
-
- close(c2)
- close(c1)
- pinger.close()
- g.TAssertEqual(s1, "first seen")
- g.TAssertEqual(s2, "second seen")
- })
-}
-
-func test_makeSubscriptionsFunc() {
- g.TestStart("makeSubscriptionsFunc()")
-
- g.Testing("we can have multiple readers", func() {
- subscriptions := makeSubscriptionsFuncs()
-
- var (
- readStarted sync.WaitGroup
- readFinished sync.WaitGroup
- )
- c := make(chan struct{})
- readFn := func(subscriptionsSetM) error {
- readStarted.Done()
- <- c
- return nil
- }
- addReader := func() {
- readStarted.Add(1)
- readFinished.Add(1)
- go func() {
- subscriptions.read(readFn)
- readFinished.Done()
- }()
- }
-
- addReader()
- addReader()
- addReader()
- readStarted.Wait()
- c <- struct{}{}
- c <- struct{}{}
- c <- struct{}{}
- readFinished.Wait()
- })
-
- g.Testing("writers are exclusive", func() {
- subscriptions := makeSubscriptionsFuncs()
-
- var (
- readStarted sync.WaitGroup
- readFinished sync.WaitGroup
- writeWillStart sync.WaitGroup
- writeFinished sync.WaitGroup
- )
- c := make(chan string)
- readFn := func(subscriptionsSetM) error {
- readStarted.Done()
- c <- "reader"
- return nil
- }
- addReader := func() {
- readStarted.Add(1)
- readFinished.Add(1)
- go func() {
- subscriptions.read(readFn)
- readFinished.Done()
- }()
- }
-
- writeFn := func(subscriptionsSetM) error {
- c <- "writer"
- return nil
- }
- addWriter := func() {
- writeWillStart.Add(1)
- writeFinished.Add(1)
- go func() {
- writeWillStart.Done()
- subscriptions.write(writeFn)
- writeFinished.Done()
- }()
- }
-
- addReader()
- addReader()
- addReader()
- readStarted.Wait()
- addWriter()
- writeWillStart.Wait()
-
- g.TAssertEqual(<-c, "reader")
- g.TAssertEqual(<-c, "reader")
- g.TAssertEqual(<-c, "reader")
-
- readFinished.Wait()
- g.TAssertEqual(<-c, "writer")
- writeFinished.Wait()
- })
-}
-
-func test_makeNotifyFn() {
- g.TestStart("makeNotifyFn()")
-
- g.Testing("when topic is nil only top pinger gets pinged", func() {
- pinger1 := newPinger[struct{}]()
- pinger2 := newPinger[[]byte]()
- defer pinger1.close()
- defer pinger2.close()
-
- go pinger1.onPing(func(struct{}) {
- panic("consumer pinger")
- })
- go pinger2.onPing(func(payload []byte) {
- panic("waiter pinger")
- })
-
- flowID := uuid.New()
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer-1": consumerT{
- pinger: pinger1,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter-1": waiterT{
- pinger: pinger2,
- },
- },
- },
- },
- }
- subsFn := func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- }
- topPinger := newPinger[struct{}]()
- defer topPinger.close()
-
- var wg sync.WaitGroup
- wg.Add(1)
- go topPinger.onPing(func(struct{}) {
- wg.Done()
- })
-
- notifyFn := makeNotifyFn(subsFn, topPinger)
-
- message := messageT{
- uuid: uuid.New(),
- topic: "nobody is subscribed to this one",
- payload: []byte("nobody with get this payload"),
- }
- notifyFn(message)
- wg.Wait()
- })
-
- g.Testing("otherwise all interested subscribers get pinged", func() {
- const topic = "the topic name"
-
- pinger1 := newPinger[struct{}]()
- pinger2 := newPinger[[]byte]()
- defer pinger1.close()
- defer pinger2.close()
-
- var wg sync.WaitGroup
- go pinger1.onPing(func(struct{}) {
- wg.Done()
- })
- go pinger2.onPing(func([]byte) {
- wg.Done()
- })
- wg.Add(2)
-
- flowID := uuid.New()
-
- set := subscriptionsSetM{
- topic: topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer-1": consumerT{
- pinger: pinger1,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter-1": waiterT{
- pinger: pinger2,
- },
- },
- },
- },
- }
-
- subsFn := func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- }
-
- topPinger := newPinger[struct{}]()
- defer topPinger.close()
- go topPinger.onPing(func(struct{}) {
- wg.Done()
- })
- wg.Add(1)
-
- notifyFn := makeNotifyFn(subsFn, topPinger)
-
- message := messageT{
- uuid: uuid.New(),
- topic: topic,
- flowID: flowID,
- payload: []byte("ignored in this test"),
- }
- notifyFn(message)
- wg.Wait()
- })
-}
-
-func test_collectClosedWaiters() {
- g.TestStart("collectClosedWaiter()")
-
- g.Testing("collects all the reports to be closed", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
- flowID4 := uuid.New()
- flowID5 := uuid.New()
-
- mkwaiter := func(closed bool) waiterT {
- fn := func() bool {
- return closed
- }
- return waiterT{
- closed: &fn,
- }
- }
-
- set := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": mkwaiter(false),
- "waiter-2": mkwaiter(true),
- "waiter-3": mkwaiter(true),
- },
- flowID2: map[string]waiterT{
- "waiter-4": mkwaiter(true),
- "waiter-5": mkwaiter(false),
- },
- },
- },
- "topic-2": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID3: map[string]waiterT{
- "waiter-1": mkwaiter(false),
- "waiter-2": mkwaiter(false),
- },
- flowID4: map[string]waiterT{
- "waiter-3": mkwaiter(true),
- "waiter-4": mkwaiter(true),
- "waiter-5": mkwaiter(true),
- "waiter-6": mkwaiter(true),
- },
- },
- },
- "topic-3": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID5: map[string]waiterT{},
- },
- },
- "topic-4": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := map[string]map[uuid.UUID][]string{
- "topic-1": map[uuid.UUID][]string{
- flowID1: []string{
- "waiter-2",
- "waiter-3",
- },
- flowID2: []string{
- "waiter-4",
- },
- },
- "topic-2": map[uuid.UUID][]string{
- flowID3: []string{},
- flowID4: []string{
- "waiter-3",
- "waiter-4",
- "waiter-5",
- "waiter-6",
- },
- },
- "topic-3": map[uuid.UUID][]string{
- flowID5: []string{},
- },
- "topic-4": map[uuid.UUID][]string{},
- }
-
- given := collectClosedWaiters(set)
-
- // sort names for equality
- for _, waitersIndex := range given {
- for _, names := range waitersIndex {
- sort.Strings(names)
- }
- }
- g.TAssertEqual(given, expected)
- })
-}
-
-func test_trimEmptyLeaves() {
- g.TestStart("trimEmptyLeaves()")
-
- g.Testing("noop on an empty index", func() {
- input := map[string]map[uuid.UUID][]string{}
- expected := map[string]map[uuid.UUID][]string{}
-
- trimEmptyLeaves(input)
- g.TAssertEqual(input, expected)
- })
-
- g.Testing("simplifies tree when it can", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
- flowID4 := uuid.New()
-
- input := map[string]map[uuid.UUID][]string{
- "topic-1": map[uuid.UUID][]string{
- flowID1: []string{
- "waiter-1",
- },
- flowID2: []string{},
- },
- "topic-2": map[uuid.UUID][]string{
- flowID3: []string{},
- flowID4: []string{},
- },
- "topic-3": map[uuid.UUID][]string{},
- }
-
- expected := map[string]map[uuid.UUID][]string{
- "topic-1": map[uuid.UUID][]string{
- flowID1: []string{
- "waiter-1",
- },
- },
- }
-
- trimEmptyLeaves(input)
- g.TAssertEqual(input, expected)
- })
-
- g.Testing("fully prune tree if possible", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
- flowID4 := uuid.New()
- flowID5 := uuid.New()
-
- input := map[string]map[uuid.UUID][]string{
- "topic-1": map[uuid.UUID][]string{},
- "topic-2": map[uuid.UUID][]string{},
- "topic-3": map[uuid.UUID][]string{},
- "topic-4": map[uuid.UUID][]string{
- flowID1: []string{},
- },
- "topic-5": map[uuid.UUID][]string{
- flowID2: []string{},
- flowID3: []string{},
- flowID4: []string{},
- flowID5: []string{},
- },
- }
-
- expected := map[string]map[uuid.UUID][]string{}
-
- trimEmptyLeaves(input)
- g.TAssertEqual(input, expected)
- })
-}
-
-func test_deleteIfEmpty() {
- g.TestStart("deleteIfEmpty()")
-
- g.Testing("noop if there are consumers", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- },
- }
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- },
- }
- expected2 := subscriptionsSetM{}
-
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected1)
-
- delete(set["topic"].consumers, "consumer")
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected2)
- })
-
- g.Testing("noop if there are waiters", func() {
- flowID := uuid.New()
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: nil,
- },
- },
- }
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: nil,
- },
- },
- }
- expected2 := subscriptionsSetM{}
-
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected1)
-
- delete(set["topic"].waiters, flowID)
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected2)
- })
-
- g.Testing("otherwise deletes when empty", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{}
-
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("unrelated topics are left untouched", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- deleteIfEmpty(set, "another-topic")
- g.TAssertEqual(set, expected)
- })
-}
-
-func test_deleteEmptyTopics() {
- g.TestStart("deleteEmptyTopics()")
-
- g.Testing("cleans up all empty topics from the set", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
-
- set := subscriptionsSetM{
- "empty": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- "has-consumers": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- "has-waiters": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: nil,
- },
- },
- "has-both": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID2: nil,
- },
- },
- "has-neither": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{
- "has-consumers": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- "has-waiters": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: nil,
- },
- },
- "has-both": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID2: nil,
- },
- },
- }
-
- deleteEmptyTopics(set)
- g.TAssertEqual(set, expected)
- })
-}
-
-func test_removeClosedWaiter() {
- g.TestStart("removeClosedWaiter()")
-
- g.Testing("removes from set all that we request", func() {
- flowID0 := uuid.New()
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
-
- set := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": waiterT{},
- "waiter-2": waiterT{},
- },
- flowID2: map[string]waiterT{
- "waiter-3": waiterT{},
- "waiter-4": waiterT{},
- "waiter-5": waiterT{},
- },
- },
- },
- "topic-2": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID3: map[string]waiterT{
- "waiter-6": waiterT{},
- "waiter-7": waiterT{},
- "waiter-8": waiterT{},
- },
- },
- },
- "topic-3": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- input := map[string]map[uuid.UUID][]string{
- "topic-0": map[uuid.UUID][]string{
- flowID0: []string{
- "waiter-0",
- },
- },
- "topic-1": map[uuid.UUID][]string{
- flowID1: []string{
- "waiter-2",
- },
- flowID2: []string{
- "waiter-3",
- "waiter-4",
- "waiter-5",
- },
- },
- "topic-2": map[uuid.UUID][]string{
- flowID3: []string{
- "waiter-6",
- "waiter-7",
- "waiter-8",
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": waiterT{},
- },
- },
- },
- }
-
- removeClosedWaiters(set, input)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("empty flowIDs from input GET LEAKED", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
-
- input := map[string]map[uuid.UUID][]string{
- "topic-2": map[uuid.UUID][]string{
- flowID2: []string{
- "waiter",
- },
- },
- }
-
- set := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{},
- },
- },
- "topic-2": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID2: map[string]waiterT{
- "waiter": waiterT{},
- },
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{},
- },
- },
- }
-
- removeClosedWaiters(set, input)
- g.TAssertEqual(set, expected)
- })
-}
-
-func test_reapClosedWaiters() {
- g.TestStart("reapClosedWaiters()")
-
- g.Testing("trimEmptyLeaves() prevents removal of empty flows", func() {
- var (
- set subscriptionsSetM
- readCount = 0
- writeCount = 0
- )
- readFn := func(fn func(subscriptionsSetM) error) error {
- readCount++
- return fn(set)
- }
- writeFn := func(fn func(subscriptionsSetM) error) error {
- writeCount++
- return fn(set)
- }
-
- openFn := func() bool {
- return false
- }
- closedFn := func() bool {
- return true
- }
- open := waiterT{ closed: &openFn }
- closed := waiterT{ closed: &closedFn }
- flowID1 := uuid.New()
- flowID2 := uuid.New()
-
- set = subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": open,
- "waiter-2": open,
- "waiter-3": open,
- },
- flowID2: map[string]waiterT{
- "waiter-4": open,
- },
- },
- },
- }
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": open,
- "waiter-2": open,
- "waiter-3": open,
- },
- flowID2: map[string]waiterT{
- "waiter-4": open,
- },
- },
- },
- }
-
- expected2 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-2": open,
- "waiter-3": open,
- },
- flowID2: map[string]waiterT{
- "waiter-4": open,
- },
- },
- },
- }
-
- expected3 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-2": open,
- "waiter-3": open,
- },
- },
- },
- }
-
- reapClosedWaiters(readFn, writeFn)
- g.TAssertEqual(readCount, 1)
- g.TAssertEqual(writeCount, 0)
- g.TAssertEqual(set, expected1)
-
- set["topic"].waiters[flowID1]["waiter-1"] = closed
- reapClosedWaiters(readFn, writeFn)
- g.TAssertEqual(readCount, 2)
- g.TAssertEqual(writeCount, 1)
- g.TAssertEqual(set, expected2)
-
- set["topic"].waiters[flowID2]["waiter-4"] = closed
- reapClosedWaiters(readFn, writeFn)
- g.TAssertEqual(readCount, 3)
- g.TAssertEqual(writeCount, 2)
- g.TAssertEqual(set, expected3)
- })
-}
-
-func test_everyNthCall() {
- g.TestStart("everyNthCall()")
-
- g.Testing("0 (incorrect) would make fn never be called", func() {
- never := everyNthCall[int](0, func(int) {
- panic("unreachable")
- })
- for i := 0; i < 100; i++ {
- never(i)
- }
- })
-
- g.Testing("the first call is delayed to be the nth", func() {
- count := 0
- values := []string{}
- fn := everyNthCall[string](3, func(s string) {
- count++
- values = append(values, s)
- })
-
- fn("1 ignored")
- g.TAssertEqual(count, 0)
- fn("2 ignored")
- g.TAssertEqual(count, 0)
- fn("3 not ignored")
- g.TAssertEqual(count, 1)
-
- fn("4 ignored")
- fn("5 ignored")
- g.TAssertEqual(count, 1)
-
- fn("6 not ignored")
- fn("7 ignored")
- fn("8 ignored")
- g.TAssertEqual(count, 2)
-
- fn("9 not ignored")
- g.TAssertEqual(count, 3)
-
- expected := []string{
- "3 not ignored",
- "6 not ignored",
- "9 not ignored",
- }
- g.TAssertEqual(values, expected)
- })
-}
-
-func test_runReaper() {
- g.TestStart("runReaper()")
-
- g.Testing("debounce reapClosedWaiters `reaperSkipCount` times", func() {
- set := subscriptionsSetM{}
-
- var (
- readCount = 0
- writeCount = 0
- )
- readFn := func(fn func(subscriptionsSetM) error) error {
- readCount++
- return fn(set)
- }
- writeFn := func(fn func(subscriptionsSetM) error) error {
- writeCount++
- return fn(set)
- }
-
- var iterCount int
- onPing := func(fn func(struct{})) {
- for i := 0; i < iterCount; i++ {
- fn(struct{}{})
- }
- }
-
- iterCount = reaperSkipCount - 1
- runReaper(onPing, readFn, writeFn)
- g.TAssertEqual(readCount, 0)
- g.TAssertEqual(writeCount, 0)
-
- iterCount = reaperSkipCount
- runReaper(onPing, readFn, writeFn)
- g.TAssertEqual(readCount, 1)
- g.TAssertEqual(writeCount, 0)
- })
-}
-
-func test_NewWithPrefix() {
- g.TestStart("NewWithPrefix()")
-
- g.Testing("we get an error with a bad prefix", func() {
- _, err := NewWithPrefix(golite.InMemory, "a bad prefix")
- g.TAssertEqual(err, g.ErrBadSQLTablePrefix)
- })
-
- g.Testing("otherwise we have a queueT and no errors", func() {
- queue, err := NewWithPrefix(golite.InMemory, "good")
- g.TErrorIf(err)
- queue.Close()
-
- g.TAssertEqual(queue.(queueT).pinger.closed(), true)
- })
-}
-
-func test_New() {
- g.TestStart("New()")
-
- g.Testing("smoke test that we get a queueT", func() {
- queue, err := New(golite.InMemory)
- g.TErrorIf(err)
- queue.Close()
-
- g.TAssertEqual(queue.(queueT).pinger.closed(), true)
- })
-}
-
-func test_asPublicMessage() {
- g.TestStart("asPublicMessage()")
-
- g.Testing("it picks the correct fields 🤷", func() {
- input := messageT{
- uuid: uuid.New(),
- timestamp: time.Now(),
- topic: "topic",
- flowID: uuid.New(),
- payload: []byte("payload"),
- }
-
- expected := Message{
- ID: input.uuid,
- Timestamp: input.timestamp,
- Topic: input.topic,
- FlowID: input.flowID,
- Payload: input.payload,
- }
-
- given := asPublicMessage(input)
- g.TAssertEqual(given, expected)
- })
-}
-
-func test_queueT_Publish() {
- g.TestStart("queueT.Publish()")
-
- const (
- topic = "queueT.Publish() topic"
- payloadStr = "queueT.Publish() payload"
- dbpath = golite.InMemory
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- queue, err := New(dbpath)
- g.TErrorIf(err)
- defer queue.Close()
-
-
- g.Testing("it just publishes and returns", func() {
- message, err := queue.Publish(unsent)
- g.TErrorIf(err)
-
- g.TAssertEqual(message.Timestamp == time.Time{}, false)
- g.TAssertEqual(message.Topic, topic)
- g.TAssertEqual(message.FlowID, flowID)
- g.TAssertEqual(message.Payload, payload)
- })
-
- queue.Close()
- g.TAssertEqual(queue.(queueT).pinger.closed(), true)
-}
-
-func test_registerConsumerFn() {
- g.TestStart("registerConsumerFn()")
-
- g.Testing("adds a new topicSubscriptionT{} if needed", func() {
- consumer := consumerT{
- data: consumerDataT{
- topic: "topic",
- name: "consumer",
- },
- }
-
- set := subscriptionsSetM{}
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumer,
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- registerConsumerFn(consumer)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("otherwise it just uses what exists", func() {
- flowID := uuid.New()
-
- consumer := consumerT{
- data: consumerDataT{
- topic: "topic",
- name: "consumer",
- },
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "other-consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{},
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumer,
- "other-consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{},
- },
- },
- }
-
- registerConsumerFn(consumer)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("overwrites existing consumer if desired", func() {
- close1 := func() {}
- consumer1 := consumerT{
- data: consumerDataT{
- topic: "topic",
- name: "consumer",
- },
- close: &close1,
- }
-
- close2 := func() {}
- consumer2 := consumerT{
- data: consumerDataT{
- topic: "topic",
- name: "consumer",
- },
- close: &close2,
- }
-
- set := subscriptionsSetM{}
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumer1,
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected2 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumer2,
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- registerConsumerFn(consumer1)(set)
- g.TAssertEqual(set, expected1)
- g.TAssertEqual(reflect.DeepEqual(set, expected2), false)
-
- registerConsumerFn(consumer2)(set)
- g.TAssertEqual(set, expected2)
- g.TAssertEqual(reflect.DeepEqual(set, expected1), false)
- })
-}
-
-func test_registerWaiterFn() {
- g.TestStart("registerWaiterFn()")
-
- g.Testing("adds a new topicSubscriptionT{} if needed", func() {
- flowID := uuid.New()
-
- waiter := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- }
-
- set := subscriptionsSetM{}
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter,
- },
- },
- },
- }
-
- registerWaiterFn(waiter)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("adds a new waiters map if needed", func() {
- flowID := uuid.New()
-
- waiter := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter,
- },
- },
- },
- }
-
- registerWaiterFn(waiter)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("otherwise it just uses what exists", func() {
- flowID := uuid.New()
-
- waiter := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "other-waiter": waiterT{},
- },
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter,
- "other-waiter": waiterT{},
- },
- },
- },
- }
-
- registerWaiterFn(waiter)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("overwrites existing waiter if desired", func() {
- flowID := uuid.New()
-
- close1 := func() {}
- waiter1 := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- close: &close1,
- }
-
- close2 := func() {}
- waiter2 := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- close: &close2,
- }
-
- set := subscriptionsSetM{}
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter1,
- },
- },
- },
- }
-
- expected2 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter2,
- },
- },
- },
- }
-
- registerWaiterFn(waiter1)(set)
- g.TAssertEqual(set, expected1)
- g.TAssertEqual(reflect.DeepEqual(set, expected2), false)
-
- registerWaiterFn(waiter2)(set)
- g.TAssertEqual(set, expected2)
- g.TAssertEqual(reflect.DeepEqual(set, expected1), false)
- })
-}
-
-func test_makeConsumeOneFn() {
- g.TestStart("makeConsumeOneFn()")
-
- savedLogger := slog.Default()
-
- s := new(strings.Builder)
- g.SetLoggerOutput(s)
-
- var (
- successCount int
- errorCount int
- callbackErr error
- successFnErr error
- errorFnErr error
- messages []Message
- successNames []string
- successIDs []uuid.UUID
- errorNames []string
- errorIDs []uuid.UUID
- deadletterIDs []uuid.UUID
- )
-
- data := consumerDataT{
- topic: "topic",
- name: "name",
- }
-
- callback := func(message Message) error {
- messages = append(messages, message)
- return callbackErr
- }
-
- successFn := func(name string, messageID uuid.UUID) error {
- successCount++
- successNames = append(successNames, name)
- successIDs = append(successIDs, messageID)
- return successFnErr
- }
-
- errorFn := func(
- name string,
- messageID uuid.UUID,
- deadletterID uuid.UUID,
- ) error {
- errorCount++
- errorNames = append(errorNames, name)
- errorIDs = append(errorIDs, messageID)
- deadletterIDs = append(deadletterIDs, deadletterID)
-
- return errorFnErr
- }
-
- consumeOneFn := makeConsumeOneFn(
- data,
- callback,
- successFn,
- errorFn,
- )
-
- message1 := messageT{ uuid: uuid.New() }
- message2 := messageT{ uuid: uuid.New() }
- message3 := messageT{ uuid: uuid.New() }
- message4 := messageT{ uuid: uuid.New() }
-
-
- g.Testing("error from successFn() is propagated", func() {
- err := consumeOneFn(message1)
- g.TErrorIf(err)
- g.TAssertEqual(successCount, 1)
- g.TAssertEqual(errorCount, 0)
-
- successFnErr = errors.New("successFn() error")
- err = consumeOneFn(message2)
- g.TAssertEqual(err, successFnErr)
- g.TAssertEqual(successCount, 2)
- g.TAssertEqual(errorCount, 0)
-
- g.TAssertEqual(s.String(), "")
- })
-
- g.Testing("error from callback() is logged and dropped", func() {
- *s = strings.Builder{}
-
- callbackErr = errors.New("callback() error")
- err := consumeOneFn(message3)
- g.TErrorIf(err)
- g.TAssertEqual(successCount, 2)
- g.TAssertEqual(errorCount, 1)
- g.TAssertEqual(s.String() == "", false)
- })
-
- g.Testing("error from errorFn() is propagated", func() {
- *s = strings.Builder{}
-
- errorFnErr = errors.New("errorFn() error")
- err := consumeOneFn(message4)
- g.TAssertEqual(err, errorFnErr)
- g.TAssertEqual(successCount, 2)
- g.TAssertEqual(errorCount, 2)
- g.TAssertEqual(s.String() == "", false)
- })
-
- g.Testing("calls happened with the expected arguments", func() {
- expectedMessages := []Message{
- asPublicMessage(message1),
- asPublicMessage(message2),
- asPublicMessage(message3),
- asPublicMessage(message4),
- }
-
- g.TAssertEqual(messages, expectedMessages)
- g.TAssertEqual(successNames, []string{ "name", "name" })
- g.TAssertEqual(errorNames, []string{ "name", "name" })
- g.TAssertEqual(successIDs, []uuid.UUID{
- message1.uuid,
- message2.uuid,
- })
- g.TAssertEqual(errorIDs, []uuid.UUID{
- message3.uuid,
- message4.uuid,
- })
- g.TAssertEqual(deadletterIDs[0] == message3.uuid, false)
- g.TAssertEqual(deadletterIDs[1] == message4.uuid, false)
- })
-
- slog.SetDefault(savedLogger)
-}
-
-func test_makeConsumeAllFn() {
- g.TestStart("makeConsumeAllFn()")
-
- savedLogger := slog.Default()
-
- s := new(strings.Builder)
- g.SetLoggerOutput(s)
-
- data := consumerDataT{}
-
- consumeOneFn := func(messageT) error {
- return nil
- }
-
- var eachFnErr error
- eachFn := func(string, string, func(messageT) error) error {
- return eachFnErr
- }
-
- consumeAllFn := makeConsumeAllFn(data, consumeOneFn, eachFn)
-
-
- g.Testing("silent when eachFn() returns no error", func() {
- consumeAllFn(struct{}{})
- g.TAssertEqual(s.String(), "")
- })
-
- g.Testing("logs warning otherwise", func() {
- eachFnErr = errors.New("eachFn() error")
- consumeAllFn(struct{}{})
- g.TAssertEqual(s.String() == "", false)
- })
-
- slog.SetDefault(savedLogger)
-}
-
-func test_makeWaitFn() {
- g.TestStart("makeWaitFn()")
-
- g.Testing("all it does is send the data and close things", func() {
- callCount := 0
- closeFn := func() {
- callCount++
- }
-
- c := make(chan []byte, 1)
- waitFn := makeWaitFn(c, closeFn)
-
- payload := []byte("payload")
- waitFn(payload)
- given := <- c
-
- g.TAssertEqual(given, payload)
- g.TAssertEqual(callCount, 1)
- })
-
- g.Testing("you can call it twice for cases of race condition", func() {
- callCount := 0
- closeFn := func() {
- callCount++
- }
-
- c := make(chan []byte, 1)
- waitFn := makeWaitFn(c, closeFn)
-
- payload := []byte("something")
- waitFn(payload)
- waitFn(payload)
- given1 := <- c
- given2 := <- c
-
- g.TAssertEqual(given1, payload)
- g.TAssertEqual(given2, []byte(nil))
- })
-}
-
-func test_runConsumer() {
- g.TestStart("runConsumer()")
-
- g.Testing("calls consumeAllFn() at least one", func() {
- onPing := func(fn func(struct{})) {}
-
- count := 0
- consumeAllFn := func(struct{}) {
- count++
- }
-
- runConsumer(onPing, consumeAllFn)
- g.TAssertEqual(count, 1)
- })
-
- g.Testing("can call consumeAllFn() (onPing + 1) times", func() {
- onPing := func(fn func(struct{})) {
- fn(struct{}{})
- fn(struct{}{})
- fn(struct{}{})
- }
-
- count := 0
- consumeAllFn := func(struct{}) {
- count++
- }
-
- runConsumer(onPing, consumeAllFn)
- g.TAssertEqual(count, 4)
- })
-}
-
-func test_tryFinding() {
- g.TestStart("tryFinding()")
-
- g.Testing("noop in case of failure", func() {
- myErr := errors.New("find() error")
- findFn := func(string, uuid.UUID) (messageT, error) {
- return messageT{}, myErr
- }
-
- count := 0
- waitFn := func([]byte) {
- count++
- }
-
-
- tryFinding(findFn, "topic", uuid.New(), waitFn)
- g.TAssertEqual(count, 0)
- })
-
- g.Testing("calls waitFn in case of success", func() {
- payload := []byte("find() payload")
-
- findFn := func(string, uuid.UUID) (messageT, error) {
- return messageT{ payload: payload }, nil
- }
-
- payloads := [][]byte{}
- waitFn := func(payload []byte) {
- payloads = append(payloads, payload)
- }
-
-
- tryFinding(findFn, "topic", uuid.New(), waitFn)
- g.TAssertEqual(payloads, [][]byte{ payload })
- })
-}
-
-func test_queueT_Subscribe() {
- g.TestStart("queueT.Subscribe()")
-
- set := subscriptionsSetM{}
- consumed := []uuid.UUID{}
- messages := []messageT{
- messageT{ uuid: uuid.New() },
- messageT{ uuid: uuid.New() },
- }
-
- var takeErr error
- queue := queueT{
- queries: queriesT{
- take: func(string, string) error {
- return takeErr
- },
- pending: func(
- topic string,
- consumer string,
- fn func(messageT) error,
- ) error {
- for _, message := range messages {
- consumed = append(
- consumed,
- message.uuid,
- )
- _ = fn(message)
- }
- return nil
- },
- commit: func(
- consumer string,
- messageID uuid.UUID,
- ) error {
- return nil
- },
- toDead: func(string, uuid.UUID, uuid.UUID) error {
- g.Unreachable()
- return nil
- },
- },
- subscriptions: subscriptionsT{
- write: func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- },
- },
- }
-
-
- g.Testing("registers our callback in the set and calls it", func() {
- var wg sync.WaitGroup
- wg.Add(2)
-
- queue.Subscribe("topic", "consumer-1", func(Message) error {
- wg.Done()
- return nil
- })
- defer queue.Unsubscribe("topic", "consumer-1")
-
- wg.Wait()
- g.TAssertEqual(consumed, []uuid.UUID{
- messages[0].uuid,
- messages[1].uuid,
- })
- })
-
- g.Testing("our callback also gets called when pinged", func() {
- consumed = []uuid.UUID{}
-
- var wg sync.WaitGroup
- wg.Add(4)
-
- queue.Subscribe("topic", "consumer-2", func(m Message) error {
- wg.Done()
- return nil
- })
- defer queue.Unsubscribe("topic", "consumer-2")
-
- consumer := set["topic"].consumers["consumer-2"]
- consumer.pinger.tryPing(struct{}{})
-
- wg.Wait()
-
- g.TAssertEqual(consumed, []uuid.UUID{
- messages[0].uuid,
- messages[1].uuid,
- messages[0].uuid,
- messages[1].uuid,
- })
- })
-
- g.Testing("we try to own the topic", func() {
- takeErr = errors.New("queueT.Subscribe() 1")
-
- err := queue.Subscribe("topic", "name", func(Message) error {
- g.Unreachable()
- return nil
- })
-
- g.TAssertEqual(err, takeErr)
- takeErr = nil
- })
-
- g.Testing("if we can't own the topic, we don't get registered", func() {
- takeErr = errors.New("queueT.Subscribe() 2")
-
- err := queue.Subscribe("topic", "name", func(Message) error {
- g.Unreachable()
- return nil
- })
- g.TErrorNil(err)
-
- expected := subscriptionsSetM{}
- g.TAssertEqual(set, expected)
-
- takeErr = nil
- })
-}
-
-func test_queueT_WaitFor() {
- g.TestStart("queueT.WaitFor()")
-
- findErr := errors.New("find() error")
- message := messageT{
- payload: []byte("queueT.WaitFor() payload"),
- }
-
- set := subscriptionsSetM{}
-
- queue := queueT{
- queries: queriesT{
- find: func(
- topic string,
- flowID uuid.UUID,
- ) (messageT, error) {
- return message, findErr
- },
- },
- subscriptions: subscriptionsT{
- write: func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- },
- read: func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- },
- },
- }
-
-
- g.Testing("registers the waiter in the set", func() {
- flowID := uuid.New()
-
- defer queue.WaitFor("topic", flowID, "waiter-1").Close()
-
- expected := waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter-1",
- }
-
- g.TAssertEqual(
- set["topic"].waiters[flowID]["waiter-1"].data,
- expected,
- )
- })
-
- g.Testing("the channel gets a message when waiter is pinged", func() {
- flowID := uuid.New()
- payload := []byte("sent payload")
-
- w := queue.WaitFor("topic", flowID, "waiter-2")
- defer w.Close()
-
- waiter := set["topic"].waiters[flowID]["waiter-2"]
- waiter.pinger.tryPing(payload)
-
- given := <- w.Channel
-
- g.TAssertEqual(given, payload)
- g.TAssertEqual((*waiter.closed)(), true)
- })
-
- g.Testing("we can also WaitFor() after publishing the message", func() {
- findErr = nil
- flowID := uuid.New()
-
- w := queue.WaitFor("topic", flowID, "waiter-3")
- defer w.Close()
-
- given := <- w.Channel
-
- waiter := set["topic"].waiters[flowID]["waiter-3"]
- waiter.pinger.tryPing([]byte("ignored"))
-
- g.TAssertEqual(given, message.payload)
- g.TAssertEqual((*waiter.closed)(), true)
- })
-
- g.Testing("if the data already exists we get it immediatelly", func() {
- flowID := uuid.New()
-
- w := queue.WaitFor("topic", flowID, "waiter-4")
- defer w.Close()
-
- waiter := set["topic"].waiters[flowID]["waiter-4"]
- g.TAssertEqual((*waiter.closed)(), true)
-
- given := <- w.Channel
- g.TAssertEqual(given, message.payload)
- })
-}
-
-func test_unsubscribeIfExistsFn() {
- g.TestStart("unsubscribeIfExistsFn()")
-
- g.Testing("noop on empty set", func() {
- set := subscriptionsSetM{}
- expected := subscriptionsSetM{}
- unsubscribeIfExistsFn("topic", "consumer")(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("noop on missing topic", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{},
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{},
- }
-
- unsubscribeIfExistsFn("other-topic", "consumer")(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("noop on missing consumer", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- },
- }
-
- unsubscribeIfExistsFn("topic", "other-consumer")(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("closes consumer and removes it from set", func() {
- flowID := uuid.New()
-
- count := 0
- close := func() {
- count++
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{
- close: &close,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{},
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{},
- },
- },
- }
-
- unsubscribeIfExistsFn("topic", "consumer")(set)
- g.TAssertEqual(set, expected)
- g.TAssertEqual(count, 1)
- })
-
- g.Testing("empty topics are also removed", func() {
- count := 0
- close := func() {
- count++
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{
- close: &close,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{}
-
- unsubscribeIfExistsFn("topic", "consumer")(set)
- g.TAssertEqual(set, expected)
- g.TAssertEqual(count, 1)
- })
-}
-
-func test_queueT_Unsubscribe() {
- g.TestStart("queueT.Unsubscribe()")
-
- g.Testing("calls unsubscribesIfExists() via writeFn()", func() {
- closed := false
- close := func() {
- closed = true
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{
- close: &close,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{}
-
- queue := queueT{
- subscriptions: subscriptionsT{
- write: func(
- fn func(subscriptionsSetM) error,
- ) error {
- return fn(set)
- },
- },
- }
-
- queue.Unsubscribe("topic", "consumer")
- g.TAssertEqual(set, expected)
- g.TAssertEqual(closed, true)
- })
-}
-
-func test_cleanSubscriptions() {
- g.TestStart("cleanSubscriptions()")
-
- g.Testing("all consumers and waiters get close()'d", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
-
- type pairT struct{
- closed func() bool
- fn func()
- }
-
- mkclose := func() pairT {
- closed := false
- return pairT{
- closed: func() bool {
- return closed
- },
- fn: func() {
- closed = true
- },
- }
- }
-
- c := mkclose()
- g.TAssertEqual(c.closed(), false)
- c.fn()
- var x bool = c.closed()
- g.TAssertEqual(x, true)
-
- close1 := mkclose()
- close2 := mkclose()
- close3 := mkclose()
- close4 := mkclose()
- close5 := mkclose()
- close6 := mkclose()
- close7 := mkclose()
- close8 := mkclose()
-
- set := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer-1": consumerT{
- close: &close1.fn,
- },
- "consumer-2": consumerT{
- close: &close2.fn,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": waiterT{
- close: &close3.fn,
- },
- },
- flowID2: map[string]waiterT{
- "waiter-2": waiterT{
- close: &close4.fn,
- },
- "waiter-3": waiterT{
- close: &close5.fn,
- },
- },
- },
- },
- "topic-2": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer-3": consumerT{
- close: &close6.fn,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID3: map[string]waiterT{
- "waiter-4": waiterT{
- close: &close7.fn,
- },
- },
- },
- },
- "topic-3": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- cleanSubscriptions(set)
-
- given := []bool{
- close1.closed(),
- close2.closed(),
- close3.closed(),
- close4.closed(),
- close5.closed(),
- close6.closed(),
- close7.closed(),
- close8.closed(),
- }
-
- expected := []bool{
- true,
- true,
- true,
- true,
- true,
- true,
- true,
- false,
- }
-
- g.TAssertEqualI(given, expected)
- })
-}
-
-func test_queueT_Close() {
- g.TestStart("queueT.Close()")
-
- g.Testing("clean pinger, subscriptions and queries", func() {
- var (
- pingerCount = 0
- subscriptionsCount = 0
- queriesCount = 0
-
- subscriptionsErr = errors.New("subscriptionsT{} error")
- queriesErr = errors.New("queriesT{} error")
- )
-
- queue := queueT{
- queries: queriesT{
- close: func() error{
- queriesCount++
- return queriesErr
- },
- },
- subscriptions: subscriptionsT{
- write: func(
- func(subscriptionsSetM) error,
- ) error {
- subscriptionsCount++
- return subscriptionsErr
- },
- },
- pinger: pingerT[struct{}]{
- close: func() {
- pingerCount++
- },
- },
- }
-
- err := queue.Close()
- g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr))
- g.TAssertEqual(pingerCount, 1)
- g.TAssertEqual(subscriptionsCount, 1)
- g.TAssertEqual(queriesCount, 1)
- })
-}
-
-
-func test_topicGetopt() {
- g.TestStart("topicGetopt()")
-
- g.Testing("checks for required positional argument", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{},
- }
-
- argsOut, ok := topicGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "Missing TOPIC.\n")
- g.TAssertEqual(ok, false)
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("success otherwise", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"a topic"},
- }
-
- argsOut, ok := topicGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(ok, true)
- argsIn.topic = "a topic"
- g.TAssertEqual(argsOut, argsIn)
- })
-}
-
-func test_topicConsumerGetopt() {
- g.TestStart("topicConsumerGetopt()")
-
- g.Testing("checks for TOPIC argument", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{},
- }
-
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "Missing TOPIC.\n")
- g.TAssertEqual(ok, false)
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("we get an error on unsupported flag", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"-Z"},
- }
-
- const message = "flag provided but not defined: -Z\n"
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), message)
- g.TAssertEqual(ok, false)
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("we also get an error on incorrect usage of flags", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"-C"},
- }
-
- const message = "flag needs an argument: -C\n"
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), message)
- g.TAssertEqual(ok, false)
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("we can customize the CONSUMER", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"-C", "custom consumer", "this topic"},
- }
-
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(ok, true)
- argsIn.topic = "this topic"
- argsIn.consumer = "custom consumer"
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("otherwise we get the default one", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"T"},
- }
-
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(ok, true)
- argsIn.topic = "T"
- argsIn.consumer = "default-consumer"
- g.TAssertEqual(argsOut, argsIn)
- })
-}
-
-func test_inExec() {
- g.TestStart("inExec()")
-
- const (
- topic = "inExec topic"
- payloadStr = "inExec payload"
- )
- var (
- payload = []byte(payloadStr)
- args = argsT{
- topic: topic,
- }
- )
-
- var (
- publishErr error
- messages []messageT
- id int64 = 0
- )
- queries := queriesT{
- publish: func(
- unsent UnsentMessage,
- messageID uuid.UUID,
- ) (messageT, error) {
- if publishErr != nil {
- return messageT{}, publishErr
- }
-
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: messageID,
- topic: unsent.Topic,
- flowID: unsent.FlowID,
- payload: unsent.Payload,
- }
- messages = append(messages, message)
- return message, nil
- },
- }
-
-
- g.Testing("messageID to output when successful", func() {
- r := bytes.NewReader(payload)
- var w strings.Builder
- rc, err := inExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(messages[0].topic, topic)
- g.TAssertEqual(messages[0].payload, payload)
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(w.String(), messages[0].uuid.String() + "\n")
- })
-
- g.Testing("if reading fails, we return the error", func() {
- var r *os.File
- var w strings.Builder
- rc, err := inExec(args, queries, r, &w)
- g.TAssertEqual(
- err.Error(),
- "invalid argument",
- )
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-
- g.Testing("if publishing fails, we return the error", func() {
- publishErr = errors.New("publish() error")
- r := strings.NewReader("read but not published")
- var w strings.Builder
- rc, err := inExec(args, queries, r, &w)
- g.TAssertEqual(err, publishErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_outExec() {
- g.TestStart("outExec()")
-
- const (
- topic = "outExec topic"
- consumer = "outExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- takeErr error
- nextErr error
- messages []messageT
- id int64 = 0
- )
- queries := queriesT{
- take: func(string, string) error {
- return takeErr
- },
- next: func(string, string) (messageT, error) {
- if nextErr != nil {
- return messageT{}, nextErr
- }
-
- if len(messages) == 0 {
- return messageT{}, sql.ErrNoRows
- }
-
- return messages[0], nil
- },
- }
- pub := func(payload []byte) {
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
-
-
- g.Testing("exit code 1 when we can't take", func() {
- takeErr = errors.New("outExec() take error")
-
- var w strings.Builder
-
- rc, err := outExec(args, queries, r, &w)
- g.TAssertEqual(err, takeErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
-
- takeErr = nil
- })
-
- g.Testing("exit code 3 when no message is available", func() {
- var w strings.Builder
-
- rc, err := outExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 3)
- })
-
- g.Testing("we get the same message until we commit", func() {
- var (
- w1 strings.Builder
- w2 strings.Builder
- w3 strings.Builder
- )
- args := argsT{
- topic: topic,
- consumer: consumer,
- }
-
- pub([]byte("first payload"))
- pub([]byte("second payload"))
-
- rc1, err1 := outExec(args, queries, r, &w1)
- rc2, err2 := outExec(args, queries, r, &w2)
- messages = messages[1:]
- rc3, err3 := outExec(args, queries, r, &w3)
-
- g.TErrorIf(g.SomeError(err1, err2, err3))
- g.TAssertEqual(w1.String(), "first payload\n")
- g.TAssertEqual(w2.String(), "first payload\n")
- g.TAssertEqual(w3.String(), "second payload\n")
- g.TAssertEqual(rc1, 0)
- g.TAssertEqual(rc2, 0)
- g.TAssertEqual(rc3, 0)
- })
-
- g.Testing("we propagate the error when the query fails", func() {
- nextErr = errors.New("next() error")
- var w strings.Builder
- rc, err := outExec(args, queries, r, &w)
- g.TAssertEqual(err, nextErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_commitExec() {
- g.TestStart("commitExec()")
-
- const (
- topic = "commitExec topic"
- consumer = "commitExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- takeErr error
- nextErr error
- commitErr error
- messages []messageT
- id int64 = 0
- )
- queries := queriesT{
- take: func(string, string) error {
- return takeErr
- },
- next: func(string, string) (messageT, error) {
- if nextErr != nil {
- return messageT{}, nextErr
- }
-
- if len(messages) == 0 {
- return messageT{}, sql.ErrNoRows
- }
-
- return messages[0], nil
- },
- commit: func(string, uuid.UUID) error {
- if commitErr != nil {
- return commitErr
- }
-
- messages = messages[1:]
- return nil
- },
- }
- pub := func(payload []byte) {
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
-
-
- g.Testing("error when we can't take", func() {
- takeErr = errors.New("commitExec() take error")
-
- var w strings.Builder
-
- rc, err := commitExec(args, queries, r, &w)
- g.TAssertEqual(err, takeErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
-
- takeErr = nil
- })
-
- g.Testing("error when there is nothing to commit", func() {
- var w strings.Builder
-
- rc, err := commitExec(args, queries, r, &w)
- g.TAssertEqual(err, sql.ErrNoRows)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-
- g.Testing("messages get committed in order", func() {
- var w strings.Builder
-
- pub([]byte("first payload"))
- pub([]byte("second payload"))
- pub([]byte("third payload"))
-
- message1 := messages[0]
- g.TAssertEqual(message1.payload, []byte("first payload"))
-
- rc, err := commitExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- message2 := messages[0]
- g.TAssertEqual(message2.payload, []byte("second payload"))
-
- rc, err = commitExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- message3 := messages[0]
- g.TAssertEqual(message3.payload, []byte("third payload"))
-
- rc, err = commitExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- g.TAssertEqual(len(messages), 0)
- })
-
- g.Testing("when next() query fails, we propagate its result", func() {
- nextErr = errors.New("next() error")
- var w strings.Builder
- rc, err := commitExec(args, queries, r, &w)
- g.TAssertEqual(err, nextErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- nextErr = nil
- })
-
- g.Testing("we also propagate the error on commit() failure", func() {
- commitErr = errors.New("commit() error")
- pub([]byte{})
- var w strings.Builder
- rc, err := commitExec(args, queries, r, &w)
- g.TAssertEqual(err, commitErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_deadExec() {
- g.TestStart("deadExec()")
-
- const (
- topic = "deadExec topic"
- consumer = "deadExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- takeErr error
- nextErr error
- toDeadErr error
- messages []messageT
- id int64 = 0
- )
- queries := queriesT{
- take: func(string, string) error {
- return takeErr
- },
- next: func(string, string) (messageT, error) {
- if nextErr != nil {
- return messageT{}, nextErr
- }
-
- if len(messages) == 0 {
- return messageT{}, sql.ErrNoRows
- }
-
- return messages[0], nil
- },
- toDead: func(
- _ string,
- _ uuid.UUID,
- deadletterID uuid.UUID,
- ) error {
- if toDeadErr != nil {
- return toDeadErr
- }
-
- messages = messages[1:]
- return nil
- },
- }
- pub := func(payload []byte) {
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
-
-
- g.Testing("error when we can't take", func() {
- takeErr = errors.New("deadExec() take error")
-
- var w strings.Builder
-
- rc, err := deadExec(args, queries, r, &w)
- g.TAssertEqual(err, takeErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
-
- takeErr = nil
- })
-
- g.Testing("error when there is nothing to mark as dead", func() {
- var w strings.Builder
-
- rc, err := deadExec(args, queries, r, &w)
- g.TAssertEqual(err, sql.ErrNoRows)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-
- g.Testing("the latest message becomes a deadletter", func() {
- var w strings.Builder
-
- pub([]byte("first payload"))
- pub([]byte("second payload"))
-
- message1 := messages[0]
- g.TAssertEqual(message1.payload, []byte("first payload"))
-
- rc, err := deadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- message2 := messages[0]
- g.TAssertEqual(message2.payload, []byte("second payload"))
-
- rc, err = deadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- g.TAssertEqual(len(messages), 0)
- })
-
- g.Testing("next() error is propagated", func() {
- nextErr = errors.New("next() error")
- var w strings.Builder
- rc, err := deadExec(args, queries, r, &w)
- g.TAssertEqual(err, nextErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- nextErr = nil
- })
-
- g.Testing("toDead() error is propagated", func() {
- toDeadErr = errors.New("toDead() error")
- pub([]byte{})
- var w strings.Builder
- rc, err := deadExec(args, queries, r, &w)
- g.TAssertEqual(err, toDeadErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_listDeadExec() {
- g.TestStart("listDeadExec()")
-
- const (
- topic = "listDeadExec topic"
- consumer = "listDeadExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- messages []messageT
- deadletters []deadletterT
- id int64 = 0
- errorIndex = -1
- allDeadErr = errors.New("allDead() error")
- )
- queries := queriesT{
- allDead: func(
- _ string,
- _ string,
- callback func(deadletterT, messageT) error,
- ) error {
- for i, deadletter := range deadletters {
- if i == errorIndex {
- return allDeadErr
- }
-
- callback(deadletter, messageT{})
- }
-
- return nil
- },
- }
- pub := func() {
- payload := []byte("ignored payload for this test")
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
- commit := func() {
- messages = messages[1:]
- }
- dead := func() {
- message := messages[0]
- now := time.Now()
- deadletter := deadletterT{
- uuid: uuid.New(),
- timestamp: now,
- consumer: consumer,
- messageID: message.uuid,
- }
-
- messages = messages[1:]
- deadletters = append(deadletters, deadletter)
- }
- replay := func() {
- pub()
- deadletters = deadletters[1:]
- }
-
-
- g.Testing("nothing is shown if topic is empty", func() {
- var w strings.Builder
-
- rc, err := listDeadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- })
-
- g.Testing("deadletters are printed in order", func() {
- var w strings.Builder
-
- pub()
- pub()
- pub()
-
- rc, err := listDeadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
-
- dead()
- commit()
- dead()
-
- expected := fmt.Sprintf(
- "%s\t%s\t%s\n%s\t%s\t%s\n",
- deadletters[0].uuid.String(),
- deadletters[0].timestamp.Format(time.RFC3339),
- deadletters[0].consumer,
- deadletters[1].uuid.String(),
- deadletters[1].timestamp.Format(time.RFC3339),
- deadletters[1].consumer,
- )
-
- rc, err = listDeadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), expected)
- g.TAssertEqual(rc, 0)
- })
-
- g.Testing("deadletters disappear after being replayed", func() {
- var (
- w1 strings.Builder
- w2 strings.Builder
- )
-
- replay()
-
- deadletter := deadletters[0]
- expected := fmt.Sprintf(
- "%s\t%s\t%s\n",
- deadletter.uuid.String(),
- deadletter.timestamp.Format(time.RFC3339),
- deadletter.consumer,
- )
-
- rc, err := listDeadExec(args, queries, r, &w1)
- g.TErrorIf(err)
- g.TAssertEqual(w1.String(), expected)
- g.TAssertEqual(rc, 0)
-
- replay()
-
- rc, err = listDeadExec(args, queries, r, &w2)
- g.TErrorIf(err)
- g.TAssertEqual(w2.String(), "")
- g.TAssertEqual(rc, 0)
- })
-
- g.Testing("a database failure interrupts the output", func() {
- var w strings.Builder
-
- pub()
- pub()
- pub()
- dead()
- dead()
- dead()
-
- deadletter := deadletters[0]
- expected := fmt.Sprintf(
- "%s\t%s\t%s\n",
- deadletter.uuid.String(),
- deadletter.timestamp.Format(time.RFC3339),
- deadletter.consumer,
- )
-
- errorIndex = 1
- rc, err := listDeadExec(args, queries, r, &w)
- g.TAssertEqual(err, allDeadErr)
- g.TAssertEqual(w.String(), expected)
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_replayExec() {
- g.TestStart("replayExec()")
-
- const (
- topic = "replayExec topic"
- consumer = "replayExec consumer"
- )
- var (
- w strings.Builder
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- oneDeadErr error
- replayErr error
- messages []messageT
- deadletters []deadletterT
- deadMessages []messageT
- id int64 = 0
- )
- queries := queriesT{
- oneDead: func(string, string) (deadletterT, error) {
- if oneDeadErr != nil {
- return deadletterT{}, oneDeadErr
- }
-
- if len(deadletters) == 0 {
- return deadletterT{}, sql.ErrNoRows
- }
-
- return deadletters[0], nil
- },
- replay: func(uuid.UUID, uuid.UUID) (messageT, error) {
- if replayErr != nil {
- return messageT{}, replayErr
- }
-
- message := deadMessages[0]
- messages = append(messages, message)
- deadletters = deadletters[1:]
- deadMessages = deadMessages[1:]
- return message, nil
- },
- }
- pub := func(payload []byte) {
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
- commit := func() {
- messages = messages[1:]
- }
- dead := func() {
- message := messages[0]
- deadletter := deadletterT{ uuid: uuid.New() }
-
- messages = messages[1:]
- deadletters = append(deadletters, deadletter)
- deadMessages = append(deadMessages, message)
- }
- next := func() string {
- return string(messages[0].payload)
- }
-
-
- g.Testing("error when there is nothing to replay", func() {
- rc, err := replayExec(args, queries, r, &w)
- g.TAssertEqual(err, sql.ErrNoRows)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
-
- pub([]byte("first payload"))
- pub([]byte("second payload"))
- pub([]byte("third payload"))
- pub([]byte("fourth payload"))
-
- rc, err = replayExec(args, queries, r, &w)
- g.TAssertEqual(err, sql.ErrNoRows)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-
- g.Testing("deadletters are replayed in order", func() {
- dead()
- commit()
- dead()
- commit()
-
- rc, err := replayExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(next(), "first payload")
-
- commit()
-
- rc, err = replayExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(next(), "third payload")
- })
-
- g.Testing("oneDead() error is forwarded", func() {
- oneDeadErr = errors.New("oneDead() error")
- rc, err := replayExec(args, queries, r, &w)
- g.TAssertEqual(err, oneDeadErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- oneDeadErr = nil
- })
-
- g.Testing("replay() error is also forwarded", func() {
- pub([]byte{})
- dead()
-
- replayErr = errors.New("replay() error")
- rc, err := replayExec(args, queries, r, &w)
- g.TAssertEqual(err, replayErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_sizeExec() {
- g.TestStart("sizeExec()")
-
- const (
- topic = "sizeExec topic"
- consumer = "sizeExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var sizeErr error
- queries := queriesT{
- size: func(string) (int, error) {
- if sizeErr != nil {
- return -1, sizeErr
- }
-
- return 123, nil
- },
- }
-
-
- g.Testing("it propagates the error when the query fails", func() {
- sizeErr = errors.New("size() error")
- var w strings.Builder
- rc, err := sizeExec(args, queries, r, &w)
- g.TAssertEqual(err, sizeErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- sizeErr = nil
- })
-
- g.Testing("otherwise it just prints what is was given", func() {
- var w strings.Builder
- rc, err := sizeExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "123\n")
- g.TAssertEqual(rc, 0)
- })
-}
-
-func test_countExec() {
- g.TestStart("countExec()")
-
- const (
- topic = "countExec topic"
- consumer = "countExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var countErr error
- queries := queriesT{
- count: func(string, string) (int, error) {
- if countErr != nil {
- return -1, countErr
- }
-
- return 2222, nil
- },
- }
-
-
- g.Testing("it propagates the query error", func() {
- countErr = errors.New("count() error")
- var w strings.Builder
- rc, err := countExec(args, queries, r, &w)
- g.TAssertEqual(err, countErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- countErr = nil
- })
-
- g.Testing("otherwise it prints the given count", func() {
- var w strings.Builder
- rc, err := countExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "2222\n")
- g.TAssertEqual(rc, 0)
- })
-}
-
-func test_hasDataExec() {
- g.TestStart("hasData()")
-
- const (
- topic = "hasData topic"
- consumer = "hasData consumer"
- )
- var (
- w strings.Builder
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- hasData := true
- var hasDataErr error
- queries := queriesT{
- hasData: func(string, string) (bool, error) {
- if hasDataErr != nil {
- return false, hasDataErr
- }
-
- return hasData, nil
- },
- }
-
-
- g.Testing("it propagates the query error", func() {
- hasDataErr = errors.New("hasData() error")
- rc, err := hasDataExec(args, queries, r, &w)
- g.TAssertEqual(err, hasDataErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- hasDataErr = nil
- })
-
- g.Testing("otherwise if just returns (not prints) the flag", func() {
- hasData = true
- rc, err := hasDataExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- hasData = false
- rc, err = hasDataExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 1)
-
- g.TAssertEqual(w.String(), "")
- })
-}
-
-func test_usage() {
- g.TestStart("usage()")
-
- g.Testing("it just writes to io.Writer", func() {
- var w strings.Builder
- usage("xxx", &w)
- const message =
- "Usage: xxx [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
- g.TAssertEqual(w.String(), message)
- })
-
- g.Testing("noop on io.Discard for io.Writer", func() {
- usage("AN ERROR IF SEEN ANYWHERE!", io.Discard)
- })
-}
-
-func test_getopt() {
- g.TestStart("getopt()")
-
- const warning = "Missing COMMAND.\n"
- const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
-
- commandsMap := map[string]commandT{
- "good": commandT{
- name: "good",
- getopt: func(args argsT, _ io.Writer) (argsT, bool) {
- return args, true
- },
- },
- "bad": commandT{
- name: "bad",
- getopt: func(args argsT, w io.Writer) (argsT, bool) {
- if len(args.args) == 0 {
- fmt.Fprintln(w, "no required arg")
- return args, false
- }
-
- if args.args[0] != "required" {
- fmt.Fprintln(w, "not correct one")
- return args, false
- }
-
- args.topic = "a topic"
- args.consumer = "a consumer"
- return args, true
- },
- },
- }
-
-
- g.Testing("we suppress the default error message", func() {
- var w strings.Builder
- argv := []string{"$0", "-h"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- g.TAssertEqual(w.String(), usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("we get an error on unsupported flag", func() {
- var w strings.Builder
- argv := []string{"$0", "-X"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- const message = "flag provided but not defined: -X\n"
- g.TAssertEqual(w.String(), message + usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("we also get an error on incorrect usage of flags", func() {
- var w strings.Builder
- argv := []string{"$0", "-f"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- const message = "flag needs an argument: -f\n"
- g.TAssertEqual(w.String(), message + usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("error when not given a command", func() {
- var w strings.Builder
- argv := []string{"$0"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- g.TAssertEqual(w.String(), warning + usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("error on unknown command", func() {
- var w strings.Builder
- argv := []string{"$0", "unknown"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- const message = `Bad COMMAND: "unknown".` + "\n"
- g.TAssertEqual(w.String(), message + usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("checks the command usage", func() {
- var (
- w1 strings.Builder
- w2 strings.Builder
- w3 strings.Builder
- )
-
- argv1 := []string{"$0", "bad"}
- argv2 := []string{"$0", "bad", "arg"}
- argv3 := []string{"$0", "bad", "required"}
- _, _, rc1 := getopt(argv1, commandsMap, &w1)
- _, _, rc2 := getopt(argv2, commandsMap, &w2)
- args, command, rc3 := getopt(argv3, commandsMap, &w3)
- expectedArgs := argsT{
- databasePath: "fiinha.db",
- prefix: "fiinha",
- command: "bad",
- allArgs: argv3,
- args: argv3[2:],
- topic: "a topic",
- consumer: "a consumer",
- }
-
- g.TAssertEqual(w1.String(), "no required arg\n" + usage)
- g.TAssertEqual(w2.String(), "not correct one\n" + usage)
- g.TAssertEqual(w3.String(), "")
- g.TAssertEqual(rc1, 2)
- g.TAssertEqual(rc2, 2)
- g.TAssertEqual(rc3, 0)
- g.TAssertEqual(args, expectedArgs)
- g.TAssertEqual(command.name, "bad")
- })
-
- g.Testing("when given a command we the default values", func() {
- var w strings.Builder
- args, command, rc := getopt(
- []string{"$0", "good"},
- commandsMap,
- &w,
- )
- expectedArgs := argsT{
- databasePath: "fiinha.db",
- prefix: "fiinha",
- command: "good",
- allArgs: []string{"$0", "good"},
- args: []string{},
- }
-
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(args, expectedArgs)
- g.TAssertEqual(command.name, "good")
- })
-
- g.Testing("we can customize both values", func() {
- var w strings.Builder
- argv := []string{"$0", "-f", "F", "-p", "P", "good"}
- args, command, rc := getopt(argv, commandsMap, &w)
- expectedArgs := argsT{
- databasePath: "F",
- prefix: "P",
- command: "good",
- allArgs: argv,
- args: []string{},
- }
-
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(args, expectedArgs)
- g.TAssertEqual(command.name, "good")
- })
-
- g.Testing("a command can have its own commands and options", func() {
- var w strings.Builder
- argv := []string{"$0", "-f", "F", "good", "-f", "-f", "SUB"}
- args, command, rc := getopt(argv, commandsMap, &w)
- expectedArgs := argsT{
- databasePath: "F",
- prefix: "fiinha",
- command: "good",
- allArgs: argv,
- args: []string{"-f", "-f", "SUB"},
- }
-
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(args, expectedArgs)
- g.TAssertEqual(command.name, "good")
- })
-}
-
-func test_runCommand() {
- g.TestStart("runCommand()")
-
- g.Testing("returns an error on bad prefix", func() {
- stdin := strings.NewReader("")
- var (
- stdout strings.Builder
- stderr strings.Builder
- )
- args := argsT{
- prefix: "a bad prefix",
- command: "in",
- allArgs: []string{"$0"},
- args: []string{"some topic name"},
- }
- rc := runCommand(args, commands["in"], stdin, &stdout, &stderr)
-
- g.TAssertEqual(rc, 1)
- g.TAssertEqual(stdout.String(), "")
- g.TAssertEqual(stderr.String(), "Invalid table prefix\n")
- })
-
- g.Testing("otherwise it build a queueT and calls command", func() {
- stdin := strings.NewReader("")
- var (
- stdout1 strings.Builder
- stdout2 strings.Builder
- stderr1 strings.Builder
- stderr2 strings.Builder
- )
- args1 := argsT{
- prefix: defaultPrefix,
- command: "good",
- }
- args2 := argsT{
- prefix: defaultPrefix,
- command: "bad",
- }
- myErr := errors.New("an error")
- good := commandT{
- exec: func(
- _ argsT,
- _ queriesT,
- _ io.Reader,
- w io.Writer,
- ) (int, error) {
- fmt.Fprintf(w, "good text\n")
- return 0, nil
- },
- }
- bad := commandT{
- exec: func(
- _ argsT,
- _ queriesT,
- _ io.Reader,
- w io.Writer,
- ) (int, error) {
- fmt.Fprintf(w, "bad text\n")
- return 123, myErr
- },
- }
- rc1 := runCommand(args1, good, stdin, &stdout1, &stderr1)
- rc2 := runCommand(args2, bad, stdin, &stdout2, &stderr2)
-
- g.TAssertEqual(stdout1.String(), "good text\n")
- g.TAssertEqual(stdout2.String(), "bad text\n")
- g.TAssertEqual(stderr1.String(), "")
- g.TAssertEqual(stderr2.String(), "an error\n")
- g.TAssertEqual(rc1, 0)
- g.TAssertEqual(rc2, 123)
- })
-}
-
-func test_commands() {
- g.TestStart("commands")
-
- g.Testing("ensure map key and name are in sync", func() {
- for key, command := range commands {
- g.TAssertEqual(command.name, key)
- }
- })
-}
-
-
-func dumpQueries() {
- queries := []struct{name string; fn func(string) queryT}{
- { "createTables", createTablesSQL },
- { "take", takeSQL },
- { "publish", publishSQL },
- { "find", findSQL },
- { "next", nextSQL },
- { "pending", pendingSQL },
- { "commit", commitSQL },
- { "toDead", toDeadSQL },
- { "replay", replaySQL },
- { "oneDead", oneDeadSQL },
- { "allDead", allDeadSQL },
- { "size", sizeSQL },
- { "count", countSQL },
- { "hasData", hasDataSQL },
- }
- for _, query := range queries {
- q := query.fn(defaultPrefix)
- fmt.Printf("\n-- %s.sql:", query.name)
- fmt.Printf("\n-- write:%s\n", q.write)
- fmt.Printf("\n-- read:%s\n", q.read)
- fmt.Printf("\n-- owner:%s\n", q.owner)
- }
-}
-
-
-
-func MainTest() {
- if os.Getenv("TESTING_DUMP_SQL_QUERIES") != "" {
- dumpQueries()
- return
- }
-
- g.Init()
- test_defaultPrefix()
- test_serialized()
- test_execSerialized()
- test_tryRollback()
- test_inTx()
- test_createTables()
- test_takeStmt()
- test_publishStmt()
- test_findStmt()
- test_nextStmt()
- test_messageEach()
- test_pendingStmt()
- test_commitStmt()
- test_toDeadStmt()
- test_replayStmt()
- test_oneDeadStmt()
- test_deadletterEach()
- test_allDeadStmt()
- test_sizeStmt()
- test_countStmt()
- test_hasDataStmt()
- test_initDB()
- test_queriesTclose()
- test_newPinger()
- test_makeSubscriptionsFunc()
- test_makeNotifyFn()
- test_collectClosedWaiters()
- test_trimEmptyLeaves()
- test_deleteIfEmpty()
- test_deleteEmptyTopics()
- test_removeClosedWaiter()
- test_reapClosedWaiters()
- test_everyNthCall()
- test_runReaper()
- test_NewWithPrefix()
- test_New()
- test_asPublicMessage()
- test_queueT_Publish()
- test_registerConsumerFn()
- test_registerWaiterFn()
- test_makeConsumeOneFn()
- test_makeConsumeAllFn()
- test_makeWaitFn()
- test_runConsumer()
- test_tryFinding()
- test_queueT_Subscribe()
- test_queueT_WaitFor()
- test_unsubscribeIfExistsFn()
- test_queueT_Unsubscribe()
- test_cleanSubscriptions()
- test_queueT_Close()
- test_topicGetopt()
- test_topicConsumerGetopt()
- test_inExec()
- test_outExec()
- test_commitExec()
- test_deadExec()
- test_listDeadExec()
- test_replayExec()
- test_sizeExec()
- test_countExec()
- test_hasDataExec()
- test_usage()
- test_getopt()
- test_runCommand()
- test_commands()
-}
diff --git a/tests/functional/consume-one-produce-many/fiinha.go b/tests/functional/consume-one-produce-many/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/consume-one-produce-many/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/consume-one-produce-many/main.go b/tests/functional/consume-one-produce-many/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/consume-one-produce-many/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/consumer-with-deadletter/fiinha.go b/tests/functional/consumer-with-deadletter/fiinha.go
deleted file mode 100644
index 292d327..0000000
--- a/tests/functional/consumer-with-deadletter/fiinha.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package fiinha
-
-import (
- "errors"
- "runtime"
-
- "uuid"
- g "gobang"
-)
-
-
-
-const (
- topicX = "new-event-x"
- topicY = "new-event-y"
-)
-
-var forbidden3Err = errors.New("we don't like 3")
-
-
-
-func processNewEventXToY(message Message) (UnsentMessage, error) {
- payload := string(message.Payload)
- if payload == "event 3" {
- return UnsentMessage{}, forbidden3Err
- }
-
- newPayload := []byte("processed " + payload)
- unsent := UnsentMessage{
- Topic: topicY,
- FlowID: message.FlowID,
- Payload: newPayload,
- }
- return unsent, nil
-}
-
-
-
-func MainTest() {
- g.SetLevel(g.LogLevel_None)
-
- _, file, _, ok := runtime.Caller(0)
- g.TAssertEqualS(ok, true, "can't get filename")
-
- databasePath := file + ".db"
- queue, err := New(databasePath)
- g.TErrorIf(err)
- defer queue.Close()
-
- pub := func(payload []byte, flowID uuid.UUID) {
- unsent := UnsentMessage{
- Topic: topicX,
- FlowID: flowID,
- Payload: payload,
- }
- _, err := queue.Publish(unsent)
- g.TErrorIf(err)
- }
-
-
- g.Testing("we can WaitFor() a message after a deadletter", func() {
- flowID := uuid.New()
-
- handlerFn := func(message Message) error {
- messageY, err := processNewEventXToY(message)
- if err != nil {
- return err
- }
-
- _, err = queue.Publish(messageY)
- return err
- }
- queue.Subscribe(topicX, "main-worker", handlerFn)
- defer queue.Unsubscribe(topicX, "main-worker")
-
- pub([]byte("event 1"), uuid.New())
- pub([]byte("event 2"), uuid.New())
- pub([]byte("event 3"), uuid.New())
- pub([]byte("event 4"), uuid.New())
- pub([]byte("event 5"), flowID)
-
- given := <- queue.WaitFor(topicY, flowID, "waiter").Channel
- g.TAssertEqual(given, []byte("processed event 5"))
- })
-}
diff --git a/tests/functional/consumer-with-deadletter/main.go b/tests/functional/consumer-with-deadletter/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/consumer-with-deadletter/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/custom-prefix/fiinha.go b/tests/functional/custom-prefix/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/custom-prefix/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/custom-prefix/main.go b/tests/functional/custom-prefix/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/custom-prefix/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/distinct-consumers-separate-instances/fiinha.go b/tests/functional/distinct-consumers-separate-instances/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/distinct-consumers-separate-instances/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/distinct-consumers-separate-instances/main.go b/tests/functional/distinct-consumers-separate-instances/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/distinct-consumers-separate-instances/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/flow-id/fiinha.go b/tests/functional/flow-id/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/flow-id/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/flow-id/main.go b/tests/functional/flow-id/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/flow-id/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/idempotency/fiinha.go b/tests/functional/idempotency/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/idempotency/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/idempotency/main.go b/tests/functional/idempotency/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/idempotency/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/new-instance-takeover/fiinha.go b/tests/functional/new-instance-takeover/fiinha.go
deleted file mode 100644
index 5e6ad4b..0000000
--- a/tests/functional/new-instance-takeover/fiinha.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package fiinha
-
-import (
- "runtime"
- "os"
-
- "uuid"
- g "gobang"
-)
-
-
-
-const topic = "topic"
-
-
-
-func pub(queue IQueue, topic string, flowID uuid.UUID) {
- unsent := UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: []byte{},
- }
- _, err := queue.Publish(unsent)
- g.TErrorIf(err)
-}
-
-func handlerFn(publish func(uuid.UUID)) func(Message) error {
- return func(message Message) error {
- publish(message.FlowID)
- return nil
- }
-}
-
-func startInstance(
- dbpath string,
- instanceID int,
- name string,
-) (IQueue, error) {
- iqueue, err := New(dbpath)
- g.TErrorIf(err)
- queue := iqueue.(queueT)
-
- notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
- queries, err := initDB(dbpath, defaultPrefix, notifyFn, instanceID)
- g.TErrorIf(err)
-
- err = queue.queries.close()
- g.TErrorIf(err)
-
- queue.queries = queries
-
- pub_ := func(topic string) func(uuid.UUID) {
- return func(flowID uuid.UUID) {
- pub(queue, topic, flowID)
- }
- }
-
- individual := "individual-" + name
- shared := "shared"
-
- queue.Subscribe(topic, individual, handlerFn(pub_(individual)))
- queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name)))
-
- return queue, nil
-}
-
-
-
-func MainTest() {
- g.Init()
-
- _, file, _, ok := runtime.Caller(0)
- g.TAssertEqualS(ok, true, "can't get filename")
-
- dbpath := file + ".db"
- instanceID1 := os.Getpid()
- instanceID2 := instanceID1 + 1
-
- flowID1 := uuid.New()
- flowID2 := uuid.New()
-
- g.Testing("new instances take ownership of topic+name combo", func() {
- q1, err := startInstance(dbpath, instanceID1, "first")
- g.TErrorIf(err)
- defer q1.Close()
-
- pub(q1, topic, uuid.New())
- pub(q1, topic, uuid.New())
- pub(q1, topic, flowID1)
-
- <- q1.WaitFor("individual-first", flowID1, "w").Channel
- <- q1.WaitFor( "shared-first", flowID1, "w").Channel
-
- q2, err := startInstance(dbpath, instanceID2, "second")
- g.TErrorIf(err)
- defer q2.Close()
-
- <- q2.WaitFor("individual-second", flowID1, "w").Channel
-
- pub(q2, topic, uuid.New())
- pub(q2, topic, uuid.New())
- pub(q2, topic, flowID2)
-
- // FIXME: notify multiple instances so we can add this:
- // <- q2.WaitFor("individual-first", flowID2, "w").Channel
- <- q2.WaitFor("individual-second", flowID2, "w").Channel
- <- q2.WaitFor( "shared-second", flowID2, "w").Channel
- })
-}
diff --git a/tests/functional/new-instance-takeover/main.go b/tests/functional/new-instance-takeover/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/new-instance-takeover/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/wait-after-publish/fiinha.go b/tests/functional/wait-after-publish/fiinha.go
deleted file mode 100644
index 71b9b56..0000000
--- a/tests/functional/wait-after-publish/fiinha.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package fiinha
-
-import (
- "runtime"
-
- "uuid"
- g "gobang"
-)
-
-
-
-const topic = "topic"
-
-
-
-func MainTest() {
- _, file, _, ok := runtime.Caller(0)
- g.TAssertEqualS(ok, true, "can't get filename")
-
- databasePath := file + ".db"
- queue, err := New(databasePath)
- g.TErrorIf(err)
- defer queue.Close()
-
- pub := func(flowID uuid.UUID, payload []byte) {
- unsent := UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- _, err := queue.Publish(unsent)
- g.TErrorIf(err)
- }
-
-
- g.Testing("we can WaitFor() a message before its publishing", func() {
- flowID := uuid.New()
- waiter := queue.WaitFor(topic, flowID, "waiter").Channel
-
- pub(flowID, []byte("payload before"))
-
- given := <- waiter
- g.TAssertEqual(given, []byte("payload before"))
- })
-
- g.Testing("we can also do it after its publishing", func() {
- flowID := uuid.New()
-
- pub(flowID, []byte("payload after"))
-
- given := <- queue.WaitFor(topic, flowID, "waiter").Channel
- g.TAssertEqual(given, []byte("payload after"))
- })
-}
diff --git a/tests/functional/wait-after-publish/main.go b/tests/functional/wait-after-publish/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/wait-after-publish/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/waiter/fiinha.go b/tests/functional/waiter/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/waiter/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/waiter/main.go b/tests/functional/waiter/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/waiter/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/api-check/fiinha.go b/tests/fuzz/api-check/fiinha.go
deleted file mode 100644
index 86801de..0000000
--- a/tests/fuzz/api-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func api(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- // FIXME
- if n > 1 {
- if n < 2 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- fuzzTargets := []testing.InternalFuzzTarget{
- { "api", api },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/api-check/main.go b/tests/fuzz/api-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/api-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/cli-check/fiinha.go b/tests/fuzz/cli-check/fiinha.go
deleted file mode 100644
index 1cb6f37..0000000
--- a/tests/fuzz/cli-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/cli-check/main.go b/tests/fuzz/cli-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/cli-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/equal-produced-consumed-order-check/fiinha.go b/tests/fuzz/equal-produced-consumed-order-check/fiinha.go
deleted file mode 100644
index ef2e72a..0000000
--- a/tests/fuzz/equal-produced-consumed-order-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME: produced order is identical to consumed order
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/equal-produced-consumed-order-check/main.go b/tests/fuzz/equal-produced-consumed-order-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/equal-produced-consumed-order-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/exactly-once-check/fiinha.go b/tests/fuzz/exactly-once-check/fiinha.go
deleted file mode 100644
index 6ac1fb1..0000000
--- a/tests/fuzz/exactly-once-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME: a message is consumed exactly once
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/exactly-once-check/main.go b/tests/fuzz/exactly-once-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/exactly-once-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/queries-check/fiinha.go b/tests/fuzz/queries-check/fiinha.go
deleted file mode 100644
index 1cb6f37..0000000
--- a/tests/fuzz/queries-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/queries-check/main.go b/tests/fuzz/queries-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/queries-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/total-order-check/fiinha.go b/tests/fuzz/total-order-check/fiinha.go
deleted file mode 100644
index cb5aa61..0000000
--- a/tests/fuzz/total-order-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME: a consumer gets the messages in total order
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/total-order-check/main.go b/tests/fuzz/total-order-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/total-order-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/integration.sh b/tests/integration.sh
deleted file mode 100755
index fcb62ca..0000000
--- a/tests/integration.sh
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/sh
-set -eu
-
-exit
diff --git a/tests/main.go b/tests/main.go
deleted file mode 100644
index 789b267..0000000
--- a/tests/main.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package main
-
-import "fiinha"
-
-func main() {
- fiinha.MainTest()
-}
diff --git a/tests/queries.sql b/tests/queries.sql
deleted file mode 100644
index 241f419..0000000
--- a/tests/queries.sql
+++ /dev/null
@@ -1,387 +0,0 @@
-
--- createTables.sql:
--- write:
- CREATE TABLE IF NOT EXISTS "fiinha_payloads" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
- topic TEXT NOT NULL,
- payload BLOB NOT NULL
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "fiinha_payloads_topic"
- ON "fiinha_payloads"(topic);
-
- CREATE TABLE IF NOT EXISTS "fiinha_messages" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
- uuid BLOB NOT NULL UNIQUE,
- flow_id BLOB NOT NULL,
- payload_id INTEGER NOT NULL
- REFERENCES "fiinha_payloads"(id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "fiinha_messages_flow_id"
- ON "fiinha_messages"(flow_id);
-
- CREATE TABLE IF NOT EXISTS "fiinha_offsets" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
- consumer TEXT NOT NULL,
- message_id INTEGER NOT NULL
- REFERENCES "fiinha_messages"(id),
- instance_id INTEGER NOT NULL,
- UNIQUE (consumer, message_id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "fiinha_offsets_consumer"
- ON "fiinha_offsets"(consumer);
-
- CREATE TABLE IF NOT EXISTS "fiinha_deadletters" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- uuid BLOB NOT NULL UNIQUE,
- consumer TEXT NOT NULL,
- message_id INTEGER NOT NULL
- REFERENCES "fiinha_messages"(id),
- instance_id INTEGER NOT NULL,
- UNIQUE (consumer, message_id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "fiinha_deadletters_consumer"
- ON "fiinha_deadletters"(consumer);
-
- CREATE TABLE IF NOT EXISTS "fiinha_replays" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- deadletter_id INTEGER NOT NULL UNIQUE
- REFERENCES "fiinha_deadletters"(id) ,
- message_id INTEGER NOT NULL UNIQUE
- REFERENCES "fiinha_messages"(id)
- ) STRICT;
-
- CREATE TABLE IF NOT EXISTS "fiinha_owners" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- topic TEXT NOT NULL,
- consumer TEXT NOT NULL,
- owner_id INTEGER NOT NULL,
- UNIQUE (topic, consumer)
- ) STRICT;
-
- CREATE TRIGGER IF NOT EXISTS "fiinha_check_instance_owns_topic"
- BEFORE INSERT ON "fiinha_offsets"
- WHEN NEW.instance_id != (
- SELECT owner_id FROM "fiinha_owners"
- WHERE topic = (
- SELECT "fiinha_payloads".topic
- FROM "fiinha_payloads"
- JOIN "fiinha_messages" ON "fiinha_payloads".id =
- "fiinha_messages".payload_id
- WHERE "fiinha_messages".id = NEW.message_id
- ) AND consumer = NEW.consumer
- )
- BEGIN
- SELECT RAISE(
- ABORT,
- 'instance does not own topic/consumer combo'
- );
- END;
-
- CREATE TRIGGER IF NOT EXISTS "fiinha_check_can_publish_deadletter"
- BEFORE INSERT ON "fiinha_deadletters"
- WHEN NEW.instance_id != (
- SELECT owner_id FROM "fiinha_owners"
- WHERE topic = (
- SELECT "fiinha_payloads".topic
- FROM "fiinha_payloads"
- JOIN "fiinha_messages" ON "fiinha_payloads".id =
- "fiinha_messages".payload_id
- WHERE "fiinha_messages".id = NEW.message_id
- ) AND consumer = NEW.consumer
- )
- BEGIN
- SELECT RAISE(
- ABORT,
- 'Instance does not own topic/consumer combo'
- );
- END;
-
-
--- read:
-
--- owner:
-
--- take.sql:
--- write:
- INSERT INTO "fiinha_owners" (topic, consumer, owner_id)
- VALUES (?, ?, ?)
- ON CONFLICT (topic, consumer) DO
- UPDATE SET owner_id=excluded.owner_id;
-
-
--- read:
-
--- owner:
-
--- publish.sql:
--- write:
- INSERT INTO "fiinha_payloads" (topic, payload)
- VALUES (?, ?);
-
- INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id)
- VALUES (?, ?, last_insert_rowid());
-
-
--- read:
- SELECT id, timestamp FROM "fiinha_messages"
- WHERE uuid = ?;
-
-
--- owner:
-
--- find.sql:
--- write:
-
--- read:
- SELECT
- "fiinha_messages".id,
- "fiinha_messages".timestamp,
- "fiinha_messages".uuid,
- "fiinha_payloads".payload
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_messages".flow_id = ?
- ORDER BY "fiinha_messages".id DESC
- LIMIT 1;
-
-
--- owner:
-
--- next.sql:
--- write:
-
--- read:
- SELECT
- (
- SELECT owner_id FROM "fiinha_owners"
- WHERE
- topic = ? AND
- consumer = ?
- LIMIT 1
- ) AS owner_id,
- "fiinha_messages".id,
- "fiinha_messages".timestamp,
- "fiinha_messages".uuid,
- "fiinha_messages".flow_id,
- "fiinha_payloads".payload
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_messages".id NOT IN (
- SELECT message_id FROM "fiinha_offsets"
- WHERE consumer = ?
- )
- ORDER BY "fiinha_messages".id ASC
- LIMIT 1;
-
-
--- owner:
-
--- pending.sql:
--- write:
-
--- read:
- SELECT
- "fiinha_messages".id,
- "fiinha_messages".timestamp,
- "fiinha_messages".uuid,
- "fiinha_messages".flow_id,
- "fiinha_payloads".topic,
- "fiinha_payloads".payload
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_messages".id NOT IN (
- SELECT message_id FROM "fiinha_offsets"
- WHERE consumer = ?
- )
- ORDER BY "fiinha_messages".id ASC;
-
-
--- owner:
- SELECT owner_id FROM "fiinha_owners"
- WHERE
- topic = ? AND
- consumer = ?;
-
-
--- commit.sql:
--- write:
- INSERT INTO "fiinha_offsets" (consumer, message_id, instance_id)
- VALUES (?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?);
-
-
--- read:
-
--- owner:
-
--- toDead.sql:
--- write:
- INSERT INTO "fiinha_offsets"
- ( consumer, message_id, instance_id)
- VALUES ( ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?);
-
- INSERT INTO "fiinha_deadletters"
- (uuid, consumer, message_id, instance_id)
- VALUES (?, ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?);
-
-
--- read:
-
--- owner:
-
--- replay.sql:
--- write:
- INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id)
- SELECT
- ?,
- "fiinha_messages".flow_id,
- "fiinha_messages".payload_id
- FROM "fiinha_messages"
- JOIN "fiinha_deadletters" ON
- "fiinha_messages".id = "fiinha_deadletters".message_id
- WHERE "fiinha_deadletters".uuid = ?;
-
- INSERT INTO "fiinha_replays" (deadletter_id, message_id)
- VALUES (
- (SELECT id FROM "fiinha_deadletters" WHERE uuid = ?),
- last_insert_rowid()
- );
-
-
--- read:
- SELECT
- "fiinha_messages".id,
- "fiinha_messages".timestamp,
- "fiinha_messages".flow_id,
- "fiinha_payloads".topic,
- "fiinha_payloads".payload
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE "fiinha_messages".uuid = ?;
-
-
--- owner:
-
--- oneDead.sql:
--- write:
-
--- read:
- SELECT
- "fiinha_deadletters".uuid,
- "fiinha_offsets".timestamp,
- "fiinha_messages".uuid
- FROM "fiinha_deadletters"
- JOIN "fiinha_offsets" ON
- "fiinha_deadletters".message_id = "fiinha_offsets".message_id
- JOIN "fiinha_messages" ON
- "fiinha_deadletters".message_id = "fiinha_messages".id
- JOIN "fiinha_payloads" ON
- "fiinha_messages".payload_id = "fiinha_payloads".id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_deadletters".consumer = ? AND
- "fiinha_offsets".consumer = ? AND
- "fiinha_deadletters".id NOT IN (
- SELECT deadletter_id FROM "fiinha_replays"
- )
- ORDER BY "fiinha_deadletters".id ASC
- LIMIT 1;
-
-
--- owner:
-
--- allDead.sql:
--- write:
-
--- read:
- SELECT
- "fiinha_deadletters".uuid,
- "fiinha_deadletters".message_id,
- "fiinha_offsets".timestamp,
- "fiinha_offsets".consumer,
- "fiinha_messages".timestamp,
- "fiinha_messages".uuid,
- "fiinha_messages".flow_id,
- "fiinha_payloads".topic,
- "fiinha_payloads".payload
- FROM "fiinha_deadletters"
- JOIN "fiinha_offsets" ON
- "fiinha_deadletters".message_id = "fiinha_offsets".message_id
- JOIN "fiinha_messages" ON
- "fiinha_deadletters".message_id = "fiinha_messages".id
- JOIN "fiinha_payloads" ON
- "fiinha_messages".payload_id = "fiinha_payloads".id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_deadletters".consumer = ? AND
- "fiinha_offsets".consumer = ? AND
- "fiinha_deadletters".id NOT IN (
- SELECT deadletter_id FROM "fiinha_replays"
- )
- ORDER BY "fiinha_deadletters".id ASC;
-
-
--- owner:
-
--- size.sql:
--- write:
-
--- read:
- SELECT
- COUNT(1) as size
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_messages".payload_id = "fiinha_payloads".id
- WHERE "fiinha_payloads".topic = ?;
-
-
--- owner:
-
--- count.sql:
--- write:
-
--- read:
- SELECT
- COUNT(1) as count
- FROM "fiinha_messages"
- JOIN "fiinha_offsets" ON
- "fiinha_messages".id = "fiinha_offsets".message_id
- JOIN "fiinha_payloads" ON
- "fiinha_messages".payload_id = "fiinha_payloads".id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_offsets".consumer = ?;
-
-
--- owner:
-
--- hasData.sql:
--- write:
-
--- read:
- SELECT 1 as data
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_messages".id NOT IN (
- SELECT message_id FROM "fiinha_offsets"
- WHERE consumer = ?
- )
- LIMIT 1;
-
-
--- owner: