aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/release.yml52
-rw-r--r--.gitignore1
-rw-r--r--README.md130
-rw-r--r--build.edn34
-rw-r--r--deps.edn63
-rw-r--r--pom.xml41
-rwxr-xr-xrelease.sh37
-rw-r--r--src/com/github/ivarref/yoltq.clj146
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj438
-rw-r--r--test/com/github/ivarref/yoltq/log_init.clj4
10 files changed, 791 insertions, 155 deletions
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
new file mode 100644
index 0000000..1350854
--- /dev/null
+++ b/.github/workflows/release.yml
@@ -0,0 +1,52 @@
+name: Tag and Release
+on: workflow_dispatch
+
+jobs:
+ tag-and-release:
+ runs-on: ubuntu-22.04
+ steps:
+ - uses: actions/checkout@v3
+ with:
+ # NOTE: Fetch all for counting commits
+ fetch-depth: 0
+ - uses: actions/setup-java@v3
+ with:
+ distribution: 'adopt'
+ java-version: 21
+ - uses: DeLaGuardo/setup-clojure@13.4
+ with:
+ cli: 1.12.0.1530
+
+ - name: Show versions
+ run: |
+ java -version
+ clojure --version
+
+ - name: deploy to clojars
+ # NOTE: Specify ID to refer outputs from other steps
+ id: deploy
+ run: |
+ clojure -T:build deploy
+ env:
+ CLOJARS_PASSWORD: ${{secrets.CLOJARS_PASSWORD}}
+ CLOJARS_USERNAME: ${{secrets.CLOJARS_USERNAME}}
+
+ - uses: actions/create-release@v1
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ with:
+ # NOTE: Refer outputs
+ tag_name: ${{ steps.deploy.outputs.version }}
+ release_name: ${{ steps.deploy.outputs.version }}
+ body: released
+ draft: false
+ prerelease: false
+
+ - run: |
+ clojure -T:build update-documents
+ git diff
+ git config --global user.email "github-actions@example.com"
+ git config --global user.name "github-actions"
+ git add -A
+ git commit -m "Update for release"
+ git push \ No newline at end of file
diff --git a/.gitignore b/.gitignore
index c82fdd7..707e1be 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,3 +11,4 @@ tree.txt
*.pom
temp/
.clj-kondo/
+.rebel_readline_history \ No newline at end of file
diff --git a/README.md b/README.md
index c5f2bdb..8747162 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,8 @@
# yoltq
-An opinionated Datomic queue for building (more) reliable systems.
-Implements the [transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html)
+An opinionated Datomic queue for building (more) reliable systems.
+Implements the
+[transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html)
pattern.
Supports retries, backoff, ordering and more.
On-prem only.
@@ -62,8 +63,8 @@ Imagine the following code:
```clojure
(defn post-handler [user-input]
(let [db-item (process user-input)
- ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds
- :socket-timeout 10000 ; timeout in milliseconds
+ ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds
+ :socket-timeout 10000 ; milliseconds
...})] ; may throw exception
@(d/transact conn [(assoc db-item :some/ext-ref ext-ref)])))
```
@@ -78,8 +79,8 @@ The queue way to solve this would be:
```clojure
(defn get-ext-ref [{:keys [id]}]
- (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds
- :socket-timeout 10000 ; timeout in milliseconds
+ (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds
+ :socket-timeout 10000 ; milliseconds
...})] ; may throw exception
@(d/transact conn [[:db/cas [:some/id id]
:some/ext-ref
@@ -105,7 +106,8 @@ Thus `get-ext-ref` and any queue consumer should tolerate to
be executed successfully several times.
For `get-ext-ref` this is solved by using
-the database function [:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas)
+the database function
+[:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas)
to achieve a write-once behaviour.
The yoltq system treats cas failures as job successes
when a consumer has `:allow-cas-failure?` set to `true` in its options.
@@ -154,18 +156,19 @@ the payload. It can be added like this:
; An optional map of queue opts
{:allow-cas-failure? true ; Treat [:db.cas ...] failures as success. This is one way for the
; consumer function to ensure idempotence.
- :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload. Should return truthy for valid payloads.
+ :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload.
+ ; Should return truthy for valid payloads.
; The default function always returns true.
:max-retries 10}) ; Specify maximum number of times an item will be retried. Default: 10000.
; If :max-retries is given as 0, the job will ~always be retried, i.e.
; 9223372036854775807 times (Long/MAX_VALUE).
```
-The `payload` will be deserialized from the database using `clojure.edn/read-string` before invocation, i.e.
-you will get back what you put into `yq/put`.
+The `payload` will be deserialized from the database using `clojure.edn/read-string` before
+invocation, i.e. you will get back what you put into `yq/put`.
-The yoltq system treats a queue consumer function invocation as successful if it does not throw an exception.
-Any return value, be it `nil`, `false`, `true`, etc. is considered a success.
+The yoltq system treats a queue consumer function invocation as successful if it does not throw
+an exception. Any return value, be it `nil`, `false`, `true`, etc. is considered a success.
### Listening for queue jobs
@@ -251,7 +254,8 @@ For example if you want to use [nippy](https://github.com/ptaoussanis/nippy):
### Partitions
-Yoltq supports specifying which [partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions)
+Yoltq supports specifying which
+[partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions)
queue entities should belong to.
The default function is:
```clojure
@@ -270,7 +274,8 @@ E.g.:
### All configuration options
For an exhaustive list of all configuration options,
-see [yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21).
+see
+[yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21).
# Regular and REPL usage
@@ -426,15 +431,94 @@ Note: I have not tried these libraries myself.
If you liked this library, you may also like:
-* [conformity](https://github.com/avescodes/conformity): A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, data, or otherwise.
-* [datomic-schema](https://github.com/ivarref/datomic-schema): Simplified writing of Datomic schemas (works with conformity).
-* [double-trouble](https://github.com/ivarref/double-trouble): Handle duplicate Datomic transactions with ease.
-* [gen-fn](https://github.com/ivarref/gen-fn): Generate Datomic function literals from regular Clojure namespaces.
-* [rewriting-history](https://github.com/ivarref/rewriting-history): A library to rewrite Datomic history.
+* [conformity](https://github.com/avescodes/conformity):
+ A Clojure/Datomic library for idempotently transacting norms into your database – be they schema,
+ data, or otherwise.
+* [datomic-schema](https://github.com/ivarref/datomic-schema):
+ Simplified writing of Datomic schemas (works with conformity).
+* [double-trouble](https://github.com/ivarref/double-trouble):
+ Handle duplicate Datomic transactions with ease.
+* [gen-fn](https://github.com/ivarref/gen-fn):
+ Generate Datomic function literals from regular Clojure namespaces.
+* [rewriting-history](https://github.com/ivarref/rewriting-history):
+ A library to rewrite Datomic history.
## Change log
+#### [Unreleased]
+
+#### [0.2.85] - 2025-07-29
+
+#### [v0.2.82] - 2025-06-18
+Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will
+then not grab the datomic report queue, but use the one provided:
+
+```clojure
+(require '[com.github.ivarref.yoltq :as yq])
+(yq/init! {:conn conn
+ :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq)
+ ; ^^ can be any `java.util.concurrent.BlockingQueue` value
+ })
+
+(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id))
+
+```
+
+Added multicast support for `datomic.api/tx-report-queue`:
+```clojure
+(require '[com.github.ivarref.yoltq :as yq])
+(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1))
+; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue`
+
+(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2))
+; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue`
+; for the given `conn`
+
+(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true))
+; my-q3 sets the optional third argument, `send-end-token?`, to true.
+; The queue will then receive `:end` if the queue is stopped.
+; This can enable simpler consuming of queues:
+(future
+ (loop []
+ (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)]
+ (if (= q-item :end)
+ (println "Time to exit. Goodbye!")
+ (do
+ (println "Processing q-item" q-item)
+ (recur))))))
+
+; The default value for `send-end-token?` is `false`, i.e. the behaviour will be
+; identical to that of datomic.api/tx-report-queue.
+
+@(d/transact conn [{:db/doc "new-data"}])
+
+; Stop the queue:
+(yq/stop-multicast-consumer-id! conn :q-id-3)
+=> true
+; The multicaster thread will send `:end`.
+; The consumer thread will then print "Time to exit. Goodbye!".
+
+; if the queue is already stopped (or never was started), the `stop-multicaster...`
+; functions will return false:
+(yq/stop-multicast-consumer-id! conn :already-stopped-queue-or-typo)
+=> false
+
+; Stop all queues for all connections:
+(yq/stop-all-multicasters!)
+```
+
+`yq/get-tx-report-queue-multicast!` returns, like
+`datomic.api/tx-report-queue`,
+`java.util.concurrent.BlockingQueue` and starts a background thread that does
+the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!`
+returns the same `BlockingQueue`.
+
+Changed the default for `max-retries` from `10000` to `9223372036854775807`.
+
+Fixed reflection warnings.
+
#### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64)
+
Added support for `max-retries` being `0`, meaning the job should be retried forever
(or at least 9223372036854775807 times).
@@ -443,7 +527,8 @@ Changed the default for `max-retries` from `100` to `10000`.
#### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63)
Added custom `:encode` and `:decode` support.
-Added support for specifying `:partifion-fn` to specify which partition a queue item should belong to.
+Added support for specifying `:partifion-fn` to specify
+which partition a queue item should belong to.
It defaults to:
```clojure
(defn default-partition-fn [_queue-name]
@@ -619,8 +704,13 @@ Added `:valid-payload?` option for queue consumers.
Improved error reporting.
#### 2021-09-24 v0.2.33
+
First publicly announced release.
+## Making a new release
+
+Go to https://github.com/ivarref/yoltq/actions/workflows/release.yml and press `Run workflow`.
+
## License
Copyright © 2021-2022 Ivar Refsdal
diff --git a/build.edn b/build.edn
new file mode 100644
index 0000000..3e1f016
--- /dev/null
+++ b/build.edn
@@ -0,0 +1,34 @@
+{:lib
+ com.github.ivarref/yoltq
+
+ :version
+ "0.2.{{git/commit-count}}"
+
+ :github-actions?
+ true
+
+ :scm
+ {:connection "scm:git:git://github.com/ivarref/yoltq.git"
+ :developerConnection "scm:git:ssh://git@github.com/ivarref/yoltq.git"
+ :url "https://github.com/ivarref/yoltq"}
+
+ :documents
+ [{:file "README.md"
+ :match-exactly "#### [Unreleased]"
+ :action :append-after
+ :text "\n#### [{{version}}] - {{now/yyyy}}-{{now/mm}}-{{now/dd}}"}
+ {:file "README.md"
+ :match-exactly "com.github.ivarref/yoltq {:git/tag"
+ :action :replace
+ :keep-indent? true
+ :text
+ "com.github.ivarref/yoltq {:git/tag \"{{version}}\" :git/sha \"{{git/head-long-sha}}\"}"}
+ {:file "README.md"
+ :match-exactly "com.github.ivarref/yoltq {:mvn/version"
+ :action :replace
+ :keep-indent? true
+ :text "com.github.ivarref/yoltq {:mvn/version \"{{version}}\"}"}]
+
+ :licenses
+ [{:name "Eclipse Public License - v 2.0"
+ :url "https://www.eclipse.org/legal/epl-2.0/"}]} \ No newline at end of file
diff --git a/deps.edn b/deps.edn
index 8b742f0..f0488fa 100644
--- a/deps.edn
+++ b/deps.edn
@@ -1,31 +1,40 @@
-{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"}
- org.clojure/tools.logging {:mvn/version "1.2.4"}
- org.clojure/clojure {:mvn/version "1.11.1"}}
+{:deps
+ {com.github.ivarref/double-trouble {:mvn/version "0.1.102"}
+ org.clojure/tools.logging {:mvn/version "1.2.4"}
+ org.clojure/clojure {:mvn/version "1.11.1"}
+ com.datomic/peer {:mvn/version "1.0.7364"}}
- :paths ["src"]
+ :paths
+ ["src"]
- :aliases {:datomic {:extra-deps {com.datomic/peer {:mvn/version "1.0.7075" :exclusions [org.slf4j/slf4j-nop]}}}
- :test {:extra-paths ["test"]
- :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"}
- com.taoensso/timbre {:mvn/version "5.2.1"}
- com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
- clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
- com.datomic/peer {:mvn/version "1.0.7075" :exclusions [org.slf4j/slf4j-nop]}
- org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
- com.taoensso/nippy {:mvn/version "3.2.0"}
- io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}}
- :jvm-opts ["-DDISABLE_SPY=true"
- "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"]
- :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]}
+ :aliases
+ {:test
+ {:extra-paths ["test"]
+ :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"}
+ com.taoensso/timbre {:mvn/version "5.2.1"}
+ com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
+ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
+ org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
+ com.taoensso/nippy {:mvn/version "3.2.0"}
+ io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}}
+ :exec-fn cognitect.test-runner.api/test
+ :jvm-opts ["-DDISABLE_SPY=true"
+ "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"]
+ :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]}
- :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git"
- :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}}
- :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]}
+ :repl
+ {:extra-paths ["test"]
+ :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"}
+ ivarref/datomic-schema {:mvn/version "0.2.0"}
+ com.taoensso/timbre {:mvn/version "5.2.1"}
+ com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
+ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
+ org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
+ com.taoensso/nippy {:mvn/version "3.2.0"}}
+ :exec-fn rebel-readline.tool/repl
+ :exec-args {}
+ :main-opts ["-m" "rebel-readline.main"]}
- :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}}
-
- :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}}
- :exec-fn deps-deploy.deps-deploy/deploy
- :exec-args {:installer :remote
- :sign-releases? false
- :artifact "target/out.jar"}}}}
+ :build
+ {:deps {com.github.liquidz/build.edn {:mvn/version "0.11.241"}}
+ :ns-default build-edn.main}}}
diff --git a/pom.xml b/pom.xml
deleted file mode 100644
index 466f47a..0000000
--- a/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <packaging>jar</packaging>
- <groupId>com.github.ivarref</groupId>
- <artifactId>yoltq</artifactId>
- <version>0.2.64</version>
- <name>yoltq</name>
- <dependencies>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>clojure</artifactId>
- <version>1.11.1</version>
- </dependency>
- <dependency>
- <groupId>com.github.ivarref</groupId>
- <artifactId>double-trouble</artifactId>
- <version>0.1.102</version>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>tools.logging</artifactId>
- <version>1.2.4</version>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src</sourceDirectory>
- </build>
- <repositories>
- <repository>
- <id>clojars</id>
- <url>https://repo.clojars.org/</url>
- </repository>
- </repositories>
- <scm>
- <connection>scm:git:git://github.com/ivarref/yoltq.git</connection>
- <developerConnection>scm:git:ssh://git@github.com/ivarref/yoltq.git</developerConnection>
- <tag>v0.2.64</tag>
- <url>https://github.com/ivarref/yoltq</url>
- </scm>
-</project> \ No newline at end of file
diff --git a/release.sh b/release.sh
deleted file mode 100755
index 3d06135..0000000
--- a/release.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-
-if [[ $# -ne 1 ]]; then
- echo "Illegal number of parameters" >&2
- exit 2
-fi
-
-set -ex
-
-git update-index --refresh
-git diff-index --quiet HEAD --
-
-clojure -Spom
-clojure -M:test
-clojure -M:jar
-clojure -X:release ivarref.pom-patch/clojars-repo-only!
-
-LAST_TAG="$(git rev-list --tags --no-walk --max-count=1)"
-COMMITS_SINCE_LAST_TAG="$(git rev-list "$LAST_TAG"..HEAD --count)"
-echo "Squashing $COMMITS_SINCE_LAST_TAG commits ..."
-git reset --soft HEAD~"$COMMITS_SINCE_LAST_TAG"
-MSG="$(git log --format=%B --reverse HEAD..HEAD@{1})"
-git commit -m"$MSG"
-
-VERSION="$(clojure -X:release ivarref.pom-patch/set-patch-version! :patch :commit-count)"
-echo "Releasing $VERSION"
-sed -i "s/HEAD/v$VERSION/g" ./README.md
-git add pom.xml README.md
-git commit -m "Release $VERSION"
-git reset --soft HEAD~2
-git commit -m"Release $VERSION: $1"
-
-git tag -a v"$VERSION" -m "Release v$VERSION: $1"
-git push --follow-tags --force
-
-clojure -X:deploy
-echo "Released $VERSION"
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 1ba286e..ccd9062 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -12,8 +12,7 @@
(:import (datomic Connection)
(java.lang.management ManagementFactory)
(java.time Duration Instant ZoneOffset ZonedDateTime)
- (java.util.concurrent ExecutorService Executors TimeUnit)))
-
+ (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit)))
(defonce ^:dynamic *config* (atom nil))
(defonce threadpool (atom nil))
@@ -27,7 +26,7 @@
; If you want no limit on the number of retries, specify
; the value `0`. That will set the effective retry limit to
; 9223372036854775807 times.
- :max-retries 10000
+ :max-retries 9223372036854775807
; Minimum amount of time to wait before a failed queue job is retried
:error-backoff-time (Duration/ofSeconds 5)
@@ -85,8 +84,11 @@
u/duration->millis))
-(defn init! [{:keys [conn] :as cfg}]
+(defn init! [{:keys [conn tx-report-queue] :as cfg}]
(assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil")))
+ (when (some? tx-report-queue)
+ (assert (instance? BlockingQueue tx-report-queue)
+ (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue")))
(locking threadpool
@(d/transact conn i/schema)
(let [new-cfg (swap! *config*
@@ -140,14 +142,39 @@
(let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size)))
queue-listener-ready (promise)]
(reset! *running?* true)
- (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
- (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
- (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*)))
+ (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
+ (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
+ (.execute ^ScheduledExecutorService pool
+ (fn []
+ (try
+ (log/debug "report-queue-listener starting")
+ (rq/report-queue-listener *running?* queue-listener-ready pool *config*)
+ (finally
+ (log/debug "report-queue-listener exiting")
+ (deliver queue-listener-ready :finally)))))
(future (try
(slow-executor/show-slow-threads pool *config*)
(finally
(deliver slow-thread-watcher-done? :done))))
- @queue-listener-ready)))
+ (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)]
+ (cond (= :timeout q-listener-retval)
+ (do
+ (log/error "Timed out waiting for report-queue-listener to start")
+ (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start")))
+
+ (= :finally q-listener-retval)
+ (do
+ (log/error "report-queue-listener did not start")
+ (throw (IllegalStateException. "report-queue-listener did not start")))
+
+ (= :ready q-listener-retval)
+ (do
+ (log/debug "report-queue-listener is ready"))
+
+ :else
+ (do
+ (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))
+ (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))))))))))
(defn start! []
@@ -348,9 +375,9 @@
:p95 ...
:p99 ...}}"
[{:keys [age-days queue-name now db duration->long]
- :or {age-days 30
- now (ZonedDateTime/now ZoneOffset/UTC)
- duration->long (fn [duration] (.toSeconds duration))}}]
+ :or {age-days 30
+ now (ZonedDateTime/now ZoneOffset/UTC)
+ duration->long (fn [duration] (.toSeconds ^Duration duration))}}]
(let [{:keys [conn]} @*config*
db (or db (d/db conn))
->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)]
@@ -380,6 +407,66 @@
:min (apply min values))})))
(into (sorted-map)))))
+(defn get-tx-report-queue-multicast!
+ "Multicast the datomic.api/tx-report-queue to different consumers.
+ A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer.
+ Repeated calls using the same `conn` and `id` returns the same queue.
+
+ The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread
+ to send `:end` if the queue is stopped.
+ The default value for `send-end-token?` is `false`.
+
+ A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`.
+
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ ([conn id]
+ (get-tx-report-queue-multicast! conn id false))
+ ([conn id send-end-token?]
+ (assert (instance? Connection conn))
+ (assert (boolean? send-end-token?))
+ (rq/get-tx-report-queue-multicast! conn id send-end-token?)))
+
+(defn stop-multicast-consumer-id!
+ "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`.
+ If this is the last report destination for the given `conn`, the multicaster thread will exit.
+ Repeated calls are no-op.
+
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if the queue was stopped.
+ Return `false` if the queue does not exist."
+ [conn id]
+ (assert (instance? Connection conn))
+ (rq/stop-multicast-consumer-id! conn id))
+
+(defn stop-multicaster!
+ "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`.
+ The multicaster thread will exit.
+ Repeated calls are no-op.
+
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if any queue belonging to `conn` was stopped.
+ Returns `false` is `conn` did not have any associated queues."
+ [conn]
+ (assert (instance? Connection conn))
+ (rq/stop-multicaster! conn))
+
+(defn stop-all-multicasters!
+ "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`.
+ All multicaster threads will exit.
+ Repeated calls are no-op.
+
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if any queue was stopped.
+ Returns `false` if no queues existed."
+ []
+ (rq/stop-all-multicasters!))
+
(comment
(do
(require 'com.github.ivarref.yoltq.log-init)
@@ -413,3 +500,40 @@
@started-consuming?
(stop!)
nil)))))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq.migrate"} :warn]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://demo")]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)
+ started-consuming? (promise)
+ n 1]
+ (init! {:conn conn
+ :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true)
+ :slow-thread-show-stacktrace? false})
+ (add-consumer! :q (fn [_]
+ (deliver started-consuming? true)))
+ (log/info "begin start! ...")
+ (start!)
+ (log/info "begin start! ... Done")
+ (Thread/sleep 2000)
+ (log/info "*******************************************")
+ @(d/transact conn [(put :q {:work 123})])
+ @started-consuming?
+ (stop-multicaster! conn)
+ (log/info "*******************************************")
+ (stop!)
+ (log/info "stop! done")
+ nil)))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index 20e0a93..f83e3ba 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -3,9 +3,10 @@
[com.github.ivarref.yoltq.impl :as i]
[datomic.api :as d]
[clojure.tools.logging :as log])
- (:import (datomic Datom)
- (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit)))
+ (:import (datomic Connection Datom)
+ (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit)))
+; Private API, subject to change!
(defn process-poll-result! [cfg id-ident poll-result consumer]
(let [{:keys [tx-data db-after]} poll-result]
@@ -28,30 +29,431 @@
(i/take! cfg)
(i/execute! cfg)))))
(catch Throwable t
- (log/error t "unexpected error in process-poll-result!")))))))))
-
+ (log/error t "Unexpected error in process-poll-result!")))))))))
(defn report-queue-listener [running?
ready?
^ScheduledExecutorService pool
config-atom]
- (let [conn (:conn @config-atom)
- ^BlockingQueue q (d/tx-report-queue conn)
+ (let [cfg @config-atom
+ conn (:conn cfg)
+ tx-report-queue-given (contains? cfg :tx-report-queue)
+ ^BlockingQueue q (if tx-report-queue-given
+ (get cfg :tx-report-queue)
+ (d/tx-report-queue conn))
id-ident (d/q '[:find ?e .
:where [?e :db/ident :com.github.ivarref.yoltq/id]]
(d/db conn))]
+ (assert (instance? BlockingQueue q))
+ (log/debug "tx-report-queue-given:" tx-report-queue-given)
(try
- (while @running?
- (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
- (process-poll-result! @config-atom
- id-ident
- poll-result
- (fn [f]
- (when @running?
- (.execute ^ScheduledExecutorService pool f)))))
- (deliver ready? true))
+ (let [running-local? (atom true)]
+ (while (and @running? @running-local?)
+ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
+ (if (= poll-result :end)
+ (do
+ (log/debug "report-queue-listener received :end token. Exiting")
+ (reset! running-local? false))
+ ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system."))
+ (process-poll-result! @config-atom
+ id-ident
+ poll-result
+ (fn [f]
+ (when @running?
+ (.execute ^ScheduledExecutorService pool f))))))
+ (deliver ready? :ready)))
(catch Throwable t
- (log/error t "unexpected error in report-queue-listener"))
+ (log/error t "Unexpected error in report-queue-listener:" (.getMessage t)))
(finally
- (log/debug "remove tx-report-queue")
- (d/remove-tx-report-queue conn))))) \ No newline at end of file
+ (if tx-report-queue-given
+ (log/debug "Remove tx-report-queue handled elsewhere")
+ (do
+ (log/debug "Remove tx-report-queue")
+ (d/remove-tx-report-queue conn)))))))
+
+; https://stackoverflow.com/a/14488425
+(defn- dissoc-in
+ "Dissociates an entry from a nested associative structure returning a new
+ nested structure. keys is a sequence of keys. Any empty maps that result
+ will not be present in the new structure."
+ [m [k & ks :as keys]]
+ (if ks
+ (if-let [nextmap (get m k)]
+ (let [newmap (dissoc-in nextmap ks)]
+ (if (seq newmap)
+ (assoc m k newmap)
+ (dissoc m k)))
+ m)
+ (dissoc m k)))
+
+(defn- queues-to-shutdown [old-state new-state]
+ (assert (map? old-state))
+ (assert (map? new-state))
+ (doseq [x (vals new-state)]
+ (assert (vector? x)))
+ (doseq [x (vals old-state)]
+ (assert (vector? x)))
+ (let [new-qs (into #{} (mapv second (vals new-state)))]
+ (reduce
+ (fn [o [send-end-token? old-q]]
+ ;(assert (boolean? send-end-token?))
+ ;(assert (instance? BlockingQueue old-q))
+ (if (contains? new-qs old-q)
+ o
+ (conj o [send-end-token? old-q])))
+ []
+ (vals old-state))))
+
+(comment
+ (queues-to-shutdown {:a [true 999] :b [false 777]}
+ {:a [true 123] :b [true 777]}))
+
+(defn- multicast-once [conn work-item old-state new-state]
+ (assert (map? old-state))
+ (assert (map? new-state))
+ (doseq [[send-end-token? q-to-shutdown] (queues-to-shutdown old-state new-state)]
+ (if send-end-token?
+ (do
+ #_(log/debug "offering :end token")
+ (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)
+ (log/debug "Multicaster sent :end token")
+ (log/debug "Multicaster failed to send :end token")))
+ (do
+ (log/debug "Multicaster not sending :end token"))))
+ (when (seq new-state)
+ (if (some? work-item)
+ (reduce-kv
+ (fn [m id [send-end-token? q]]
+ (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)]
+ (if (true? ok-offer)
+ (assoc m id [send-end-token? q])
+ (log/error "Multicaster failed to offer item for connection" conn "and queue id" id))))
+ {}
+ new-state)
+ new-state)))
+
+(defonce ^:private multicast-state-lock (Object.))
+(defonce ^:private consumer-state-lock (Object.))
+(defonce ^:private multicast-state (atom {}))
+(defonce ^:private thread-count (atom 0))
+
+(defn- multicaster-loop [init-state conn ready?]
+ (assert (instance? Connection conn))
+ (let [input-queue (d/tx-report-queue conn)]
+ (deliver ready? true)
+ (loop [old-state init-state]
+ (let [work-item (.poll ^BlockingQueue input-queue 16 TimeUnit/MILLISECONDS)
+ new-state (locking multicast-state-lock
+ ; writer to `multicast-state` must be protected by `multicast-state-lock`
+ ; it should block minimally / spend minimum amount of time
+ (swap! multicast-state (fn [old-state] (update-in old-state [:iter-count conn] (fnil inc 0))))
+ (if-let [new-state (multicast-once conn work-item old-state (get-in @multicast-state [:queues conn] {}))]
+ new-state
+ (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))
+ (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec)))
+ (d/remove-tx-report-queue conn)
+ (log/debug "Multicaster removed tx-report-queue for conn" conn)
+ nil)))]
+ (if new-state
+ (recur new-state)
+ nil)))))
+
+(defn- start-multicaster! [conn]
+ (assert (instance? Connection conn))
+ (let [ready? (promise)]
+ (future
+ (log/debug "Multicaster starting for conn" conn)
+ (try
+ (swap! thread-count inc)
+ (let [new-state (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] (fnil inc 0))))]
+ (assert (= 1 (get-in new-state [:thread-count conn])))
+ ; "parent" thread holds `multicast-state-lock` and
+ ; waits for `ready?` promise, so effectively this new thread also holds
+ ; the lock until `ready?` is delivered. That is: it is safe
+ ; for this thread to modify multicast-state regardless of what other threads are doing
+ (multicaster-loop (get-in new-state [:queues conn]) conn ready?))
+ (catch Throwable t
+ (log/error t "Unexpected error in multicaster:" (.getMessage t))
+ (log/error "Multicaster exiting for conn"))
+ (finally
+ (swap! thread-count dec)
+ (log/debug "Multicaster exiting for conn" conn))))
+ (when (= :timeout (deref ready? 30000 :timeout))
+ (throw (RuntimeException. "Timed out waiting for multicaster to start")))))
+
+(defn- wait-multicast-thread-step
+ [conn state]
+ ; `get-tx-report-queue-multicast!` should return only when the multicaster thread
+ ; has picked up the new queue.
+ ;
+ ; Otherwise the following could happen:
+ ; 1. multicast thread is sleeping
+ ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true`
+ ; 3: user-thread (or somebody else) calls `stop-multicaster`.
+ ; The multicast-state atom is now identical as it was in step 1.
+ ; , Step 2 and 3 happened while the multicast thread was sleeping.
+ ; 4: The multicast thread is scheduled and does _not_ detect any state change.
+ ; Therefore the multicast thread does _not_ send out an :end token as one would expect.
+ ;
+ ; The new queue is written to memory at this point. No other thread can remove it because
+ ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock.
+ ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock,
+ ; we can be sure that no other thread changes or has changed the state.
+ ;
+ ; Once [:iter-count conn] has changed, we know that the multicaster thread
+ ; has seen the new queue. This means that we can be sure that the queue
+ ; will receive the `:end` token if the queue is stopped.
+ (let [start-ms (System/currentTimeMillis)
+ iter-count (get-in state [:iter-count conn] -1)]
+ (loop [spin-count 0]
+ (if (not= iter-count (locking multicast-state-lock
+ (get-in @multicast-state [:iter-count conn] -1)))
+ nil
+ (let [spent-ms (- (System/currentTimeMillis) start-ms)]
+ (if (> spent-ms 30000)
+ (throw (RuntimeException. "Timed out waiting for multicaster thread"))
+ (do
+ (Thread/sleep 16)
+ (recur (inc spin-count)))))))))
+
+(defn get-tx-report-queue-multicast!
+ "Multicast the datomic.api/tx-report-queue to different consumers.
+ A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer.
+ Repeated calls using the same `conn` and `id` returns the same queue.
+
+ The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread
+ to send `:end` if the queue is stopped. The default value is `false`.
+
+ A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`.
+
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ ([conn id]
+ (get-tx-report-queue-multicast! conn id false))
+ ([conn id send-end-token?]
+ (assert (instance? Connection conn))
+ (locking consumer-state-lock
+ (let [[new-state the-q]
+ (locking multicast-state-lock
+ (assert (map? @multicast-state))
+ (if-let [existing-q (get-in @multicast-state [:queues conn id])]
+ (do
+ (let [new-state (swap! multicast-state
+ (fn [old-state]
+ (update-in old-state [:queues conn id] (fn [[end-token? q]]
+ (if (not= end-token? send-end-token?)
+ (log/debug "flipped `send-end-token?`")
+ (log/debug "identical `send-end-token?`"))
+ [send-end-token? q]))))]
+ (log/debug "Returning existing queue for id" id)
+ (assert (instance? BlockingQueue (second existing-q)))
+ [new-state (second existing-q)]))
+ (let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn]))
+ new-q (LinkedBlockingQueue.)
+ new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))]
+ (if needs-multicaster?
+ (do
+ (start-multicaster! conn)
+ (log/debug "Returning new queue for id" id "(multicaster thread started)")
+ [new-state new-q])
+ (do
+ (log/debug "Returning new queue for id" id "(multicaster thread already running)")
+ [new-state new-q])))))]
+ ; wait for multicaster thread to pick up current Queue
+ (wait-multicast-thread-step conn new-state)
+ the-q))))
+
+(defn- wait-multicast-threads-exit [[old-state new-state]]
+ (assert (map? old-state))
+ (assert (map? new-state))
+ (assert (map? (get old-state :queues {})))
+ (assert (map? (get new-state :queues {})))
+ (assert (map? (get old-state :thread-count {})))
+ (assert (map? (get new-state :thread-count {})))
+ (locking consumer-state-lock
+ ; No new multicast threads will be launched inside this block.
+ ; The lock is already held by parent function.
+ ;
+ ; Why do we need to _wait_ for multicaster thread(s) to exit after
+ ; removing all queue ids for a given connection?
+ ; Otherwise the following could happen:
+ ; 1. multicaster thread is sleeping
+ ; 2. user calls stop-multicaster!
+ ; One would expect that multicaster thread would exit, but it is still sleeping
+ ; 3. user calls get-tx-report-queue-multicast! with the same conn
+ ; The state is now empty, so a new multicaster thread is spawned.
+ ; 4. Now there is two multicaster threads for the same connection!
+ ; ... and since the datomic report queue can be shared between threads
+ ; it will seemingly work, but when the end event is sent, it will be
+ ; sent by multiple threads.
+ (let [old-conns (into #{} (keys (get old-state :queues {})))
+ new-conns (into #{} (keys (get new-state :queues {})))]
+ (assert (every?
+ (fn [x] (instance? Connection x))
+ old-conns))
+ (assert (every?
+ (fn [x] (instance? Connection x))
+ new-conns))
+ (doseq [old-conn old-conns]
+ (when-not (contains? new-conns old-conn)
+ (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)]
+ (assert (= 1 old-threadcount))
+ (let [start-ms (System/currentTimeMillis)]
+ (loop []
+ (if (= 0 (get-in @multicast-state [:thread-count old-conn]))
+ :ok
+ (do
+ (let [spent-ms (- (System/currentTimeMillis) start-ms)]
+ (if (> spent-ms 30000)
+ (throw (RuntimeException. "Timed out waiting for multicaster thread to exit"))
+ (do
+ (Thread/sleep 16)
+ (recur))))))))))))))
+
+(defn- all-queues [state]
+ (->> (mapcat (fn [[conn qmap]]
+ (mapv (fn [q-id] [conn q-id])
+ (keys qmap)))
+ (seq (get state :queues {})))
+ (into #{})))
+
+(comment
+ (do
+ (assert (= #{}
+ (all-queues {})))
+ (assert (= #{}
+ (all-queues {:queues {}})))
+ (assert (= #{[:conn-a :q-id]}
+ (all-queues {:queues {:conn-a {:q-id 1}}})))
+ (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]}
+ (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}})))
+ (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]}
+ (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}
+ :conn-b {:q-id-3 3}}})))))
+
+(defn- removed-queues? [old new]
+ (not= (all-queues old)
+ (all-queues new)))
+
+(defn stop-multicast-consumer-id! [conn id]
+ (assert (instance? Connection conn))
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state]
+ (let [new-state (dissoc-in old-state [:queues conn id])]
+ (if (= {} (get-in new-state [:queues conn]))
+ (dissoc-in old-state [:queues conn])
+ new-state))))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
+
+(defn stop-multicaster! [conn]
+ (assert (instance? Connection conn))
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
+
+(defn stop-all-multicasters! []
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (require '[datomic.api :as d])
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (defonce conn (let [uri (str "datomic:mem://demo")
+ _ (d/delete-database uri)
+ _ (d/create-database uri)
+ conn (d/connect uri)]
+ conn))))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (defn drain! [^BlockingQueue q]
+ (loop [items []]
+ (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)]
+ (recur (conj items elem))
+ items)))
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (log/info "********************************")
+ (defonce conn (let [uri (str "datomic:mem://demo")
+ _ (d/delete-database uri)
+ _ (d/create-database uri)
+ conn (d/connect uri)]
+ conn))
+ (log/info "stop-all!")
+ (stop-all-multicasters!)
+ (assert (= 0 @thread-count))
+ (let [q1 (get-tx-report-queue-multicast! conn :q1 false)
+ q2 (get-tx-report-queue-multicast! conn :q2 false)
+ _ (get-tx-report-queue-multicast! conn :q1 true)]
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ (log/info "begin drain q1")
+ (stop-multicast-consumer-id! conn :q1)
+ (stop-multicast-consumer-id! conn :q1)
+ (println "thread count" @thread-count)
+ (let [qitems-2 (drain! q2)
+ qitems-1 (drain! q1)]
+ (assert (= :end (last qitems-1)))
+ (println "drain count q1:" (count qitems-1))
+ (println "drain count q2:" (count qitems-2))))))
+
+(comment
+ (do
+ (let [q (get-tx-report-queue-multicast! conn :q1 true)]
+ (log/debug "stopping id :q1")
+ (stop-multicaster-id! conn :q1)
+ (let [drained (drain! q)]
+ (println "drained:" drained)
+ (assert (= [:end] drained)))
+ @multicast-state)))
+
+(comment
+ (stop-all-multicasters!))
+
+(comment
+ (do
+ (let [q (get-tx-report-queue-multicast! conn :q2 false)]
+ (println "drain count:" (count (drain! q)))
+ @multicast-state
+ nil)))
+
+(comment
+ (get-tx-report-queue-multicast! conn :q1 false)
+ (get-tx-report-queue-multicast! conn :q1 true))
+
+(comment
+ (do
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ :yay)) \ No newline at end of file
diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj
index 1aa6c02..7eae557 100644
--- a/test/com/github/ivarref/yoltq/log_init.clj
+++ b/test/com/github/ivarref/yoltq/log_init.clj
@@ -3,6 +3,8 @@
[taoensso.timbre :as timbre]
[clojure.string :as str]))
+(set! *warn-on-reflection* true)
+
(def level-colors
{;:warn colors/red
:error colors/red})
@@ -46,7 +48,7 @@
(color-f (force msg_))
- #_maybe-stacktrace))))
+ maybe-stacktrace))))
(catch Throwable t