diff options
| -rw-r--r-- | .github/workflows/release.yml | 52 | ||||
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | README.md | 130 | ||||
| -rw-r--r-- | build.edn | 34 | ||||
| -rw-r--r-- | deps.edn | 63 | ||||
| -rw-r--r-- | pom.xml | 41 | ||||
| -rwxr-xr-x | release.sh | 37 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 146 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/report_queue.clj | 438 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/log_init.clj | 4 |
10 files changed, 791 insertions, 155 deletions
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..1350854 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,52 @@ +name: Tag and Release +on: workflow_dispatch + +jobs: + tag-and-release: + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + with: + # NOTE: Fetch all for counting commits + fetch-depth: 0 + - uses: actions/setup-java@v3 + with: + distribution: 'adopt' + java-version: 21 + - uses: DeLaGuardo/setup-clojure@13.4 + with: + cli: 1.12.0.1530 + + - name: Show versions + run: | + java -version + clojure --version + + - name: deploy to clojars + # NOTE: Specify ID to refer outputs from other steps + id: deploy + run: | + clojure -T:build deploy + env: + CLOJARS_PASSWORD: ${{secrets.CLOJARS_PASSWORD}} + CLOJARS_USERNAME: ${{secrets.CLOJARS_USERNAME}} + + - uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + # NOTE: Refer outputs + tag_name: ${{ steps.deploy.outputs.version }} + release_name: ${{ steps.deploy.outputs.version }} + body: released + draft: false + prerelease: false + + - run: | + clojure -T:build update-documents + git diff + git config --global user.email "github-actions@example.com" + git config --global user.name "github-actions" + git add -A + git commit -m "Update for release" + git push
\ No newline at end of file @@ -11,3 +11,4 @@ tree.txt *.pom temp/ .clj-kondo/ +.rebel_readline_history
\ No newline at end of file @@ -1,7 +1,8 @@ # yoltq -An opinionated Datomic queue for building (more) reliable systems. -Implements the [transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html) +An opinionated Datomic queue for building (more) reliable systems. +Implements the +[transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html) pattern. Supports retries, backoff, ordering and more. On-prem only. @@ -62,8 +63,8 @@ Imagine the following code: ```clojure (defn post-handler [user-input] (let [db-item (process user-input) - ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds - :socket-timeout 10000 ; timeout in milliseconds + ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds + :socket-timeout 10000 ; milliseconds ...})] ; may throw exception @(d/transact conn [(assoc db-item :some/ext-ref ext-ref)]))) ``` @@ -78,8 +79,8 @@ The queue way to solve this would be: ```clojure (defn get-ext-ref [{:keys [id]}] - (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds - :socket-timeout 10000 ; timeout in milliseconds + (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds + :socket-timeout 10000 ; milliseconds ...})] ; may throw exception @(d/transact conn [[:db/cas [:some/id id] :some/ext-ref @@ -105,7 +106,8 @@ Thus `get-ext-ref` and any queue consumer should tolerate to be executed successfully several times. For `get-ext-ref` this is solved by using -the database function [:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas) +the database function +[:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas) to achieve a write-once behaviour. The yoltq system treats cas failures as job successes when a consumer has `:allow-cas-failure?` set to `true` in its options. @@ -154,18 +156,19 @@ the payload. It can be added like this: ; An optional map of queue opts {:allow-cas-failure? true ; Treat [:db.cas ...] failures as success. This is one way for the ; consumer function to ensure idempotence. - :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload. Should return truthy for valid payloads. + :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload. + ; Should return truthy for valid payloads. ; The default function always returns true. :max-retries 10}) ; Specify maximum number of times an item will be retried. Default: 10000. ; If :max-retries is given as 0, the job will ~always be retried, i.e. ; 9223372036854775807 times (Long/MAX_VALUE). ``` -The `payload` will be deserialized from the database using `clojure.edn/read-string` before invocation, i.e. -you will get back what you put into `yq/put`. +The `payload` will be deserialized from the database using `clojure.edn/read-string` before +invocation, i.e. you will get back what you put into `yq/put`. -The yoltq system treats a queue consumer function invocation as successful if it does not throw an exception. -Any return value, be it `nil`, `false`, `true`, etc. is considered a success. +The yoltq system treats a queue consumer function invocation as successful if it does not throw +an exception. Any return value, be it `nil`, `false`, `true`, etc. is considered a success. ### Listening for queue jobs @@ -251,7 +254,8 @@ For example if you want to use [nippy](https://github.com/ptaoussanis/nippy): ### Partitions -Yoltq supports specifying which [partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions) +Yoltq supports specifying which +[partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions) queue entities should belong to. The default function is: ```clojure @@ -270,7 +274,8 @@ E.g.: ### All configuration options For an exhaustive list of all configuration options, -see [yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21). +see +[yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21). # Regular and REPL usage @@ -426,15 +431,94 @@ Note: I have not tried these libraries myself. If you liked this library, you may also like: -* [conformity](https://github.com/avescodes/conformity): A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, data, or otherwise. -* [datomic-schema](https://github.com/ivarref/datomic-schema): Simplified writing of Datomic schemas (works with conformity). -* [double-trouble](https://github.com/ivarref/double-trouble): Handle duplicate Datomic transactions with ease. -* [gen-fn](https://github.com/ivarref/gen-fn): Generate Datomic function literals from regular Clojure namespaces. -* [rewriting-history](https://github.com/ivarref/rewriting-history): A library to rewrite Datomic history. +* [conformity](https://github.com/avescodes/conformity): + A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, + data, or otherwise. +* [datomic-schema](https://github.com/ivarref/datomic-schema): + Simplified writing of Datomic schemas (works with conformity). +* [double-trouble](https://github.com/ivarref/double-trouble): + Handle duplicate Datomic transactions with ease. +* [gen-fn](https://github.com/ivarref/gen-fn): + Generate Datomic function literals from regular Clojure namespaces. +* [rewriting-history](https://github.com/ivarref/rewriting-history): + A library to rewrite Datomic history. ## Change log +#### [Unreleased] + +#### [0.2.85] - 2025-07-29 + +#### [v0.2.82] - 2025-06-18 +Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will +then not grab the datomic report queue, but use the one provided: + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(yq/init! {:conn conn + :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq) + ; ^^ can be any `java.util.concurrent.BlockingQueue` value + }) + +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id)) + +``` + +Added multicast support for `datomic.api/tx-report-queue`: +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1)) +; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` + +(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +; for the given `conn` + +(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true)) +; my-q3 sets the optional third argument, `send-end-token?`, to true. +; The queue will then receive `:end` if the queue is stopped. +; This can enable simpler consuming of queues: +(future + (loop [] + (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)] + (if (= q-item :end) + (println "Time to exit. Goodbye!") + (do + (println "Processing q-item" q-item) + (recur)))))) + +; The default value for `send-end-token?` is `false`, i.e. the behaviour will be +; identical to that of datomic.api/tx-report-queue. + +@(d/transact conn [{:db/doc "new-data"}]) + +; Stop the queue: +(yq/stop-multicast-consumer-id! conn :q-id-3) +=> true +; The multicaster thread will send `:end`. +; The consumer thread will then print "Time to exit. Goodbye!". + +; if the queue is already stopped (or never was started), the `stop-multicaster...` +; functions will return false: +(yq/stop-multicast-consumer-id! conn :already-stopped-queue-or-typo) +=> false + +; Stop all queues for all connections: +(yq/stop-all-multicasters!) +``` + +`yq/get-tx-report-queue-multicast!` returns, like +`datomic.api/tx-report-queue`, +`java.util.concurrent.BlockingQueue` and starts a background thread that does +the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!` +returns the same `BlockingQueue`. + +Changed the default for `max-retries` from `10000` to `9223372036854775807`. + +Fixed reflection warnings. + #### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64) + Added support for `max-retries` being `0`, meaning the job should be retried forever (or at least 9223372036854775807 times). @@ -443,7 +527,8 @@ Changed the default for `max-retries` from `100` to `10000`. #### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63) Added custom `:encode` and `:decode` support. -Added support for specifying `:partifion-fn` to specify which partition a queue item should belong to. +Added support for specifying `:partifion-fn` to specify +which partition a queue item should belong to. It defaults to: ```clojure (defn default-partition-fn [_queue-name] @@ -619,8 +704,13 @@ Added `:valid-payload?` option for queue consumers. Improved error reporting. #### 2021-09-24 v0.2.33 + First publicly announced release. +## Making a new release + +Go to https://github.com/ivarref/yoltq/actions/workflows/release.yml and press `Run workflow`. + ## License Copyright © 2021-2022 Ivar Refsdal diff --git a/build.edn b/build.edn new file mode 100644 index 0000000..3e1f016 --- /dev/null +++ b/build.edn @@ -0,0 +1,34 @@ +{:lib + com.github.ivarref/yoltq + + :version + "0.2.{{git/commit-count}}" + + :github-actions? + true + + :scm + {:connection "scm:git:git://github.com/ivarref/yoltq.git" + :developerConnection "scm:git:ssh://git@github.com/ivarref/yoltq.git" + :url "https://github.com/ivarref/yoltq"} + + :documents + [{:file "README.md" + :match-exactly "#### [Unreleased]" + :action :append-after + :text "\n#### [{{version}}] - {{now/yyyy}}-{{now/mm}}-{{now/dd}}"} + {:file "README.md" + :match-exactly "com.github.ivarref/yoltq {:git/tag" + :action :replace + :keep-indent? true + :text + "com.github.ivarref/yoltq {:git/tag \"{{version}}\" :git/sha \"{{git/head-long-sha}}\"}"} + {:file "README.md" + :match-exactly "com.github.ivarref/yoltq {:mvn/version" + :action :replace + :keep-indent? true + :text "com.github.ivarref/yoltq {:mvn/version \"{{version}}\"}"}] + + :licenses + [{:name "Eclipse Public License - v 2.0" + :url "https://www.eclipse.org/legal/epl-2.0/"}]}
\ No newline at end of file @@ -1,31 +1,40 @@ -{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} - org.clojure/tools.logging {:mvn/version "1.2.4"} - org.clojure/clojure {:mvn/version "1.11.1"}} +{:deps + {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"} + com.datomic/peer {:mvn/version "1.0.7364"}} - :paths ["src"] + :paths + ["src"] - :aliases {:datomic {:extra-deps {com.datomic/peer {:mvn/version "1.0.7075" :exclusions [org.slf4j/slf4j-nop]}}} - :test {:extra-paths ["test"] - :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.2.1"} - com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} - clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} - com.datomic/peer {:mvn/version "1.0.7075" :exclusions [org.slf4j/slf4j-nop]} - org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} - com.taoensso/nippy {:mvn/version "3.2.0"} - io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} - :jvm-opts ["-DDISABLE_SPY=true" - "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] - :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + :aliases + {:test + {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :exec-fn cognitect.test-runner.api/test + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} - :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" - :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} - :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :repl + {:extra-paths ["test"] + :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} + ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"}} + :exec-fn rebel-readline.tool/repl + :exec-args {} + :main-opts ["-m" "rebel-readline.main"]} - :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} - - :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} - :exec-fn deps-deploy.deps-deploy/deploy - :exec-args {:installer :remote - :sign-releases? false - :artifact "target/out.jar"}}}} + :build + {:deps {com.github.liquidz/build.edn {:mvn/version "0.11.241"}} + :ns-default build-edn.main}}} diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 466f47a..0000000 --- a/pom.xml +++ /dev/null @@ -1,41 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <packaging>jar</packaging> - <groupId>com.github.ivarref</groupId> - <artifactId>yoltq</artifactId> - <version>0.2.64</version> - <name>yoltq</name> - <dependencies> - <dependency> - <groupId>org.clojure</groupId> - <artifactId>clojure</artifactId> - <version>1.11.1</version> - </dependency> - <dependency> - <groupId>com.github.ivarref</groupId> - <artifactId>double-trouble</artifactId> - <version>0.1.102</version> - </dependency> - <dependency> - <groupId>org.clojure</groupId> - <artifactId>tools.logging</artifactId> - <version>1.2.4</version> - </dependency> - </dependencies> - <build> - <sourceDirectory>src</sourceDirectory> - </build> - <repositories> - <repository> - <id>clojars</id> - <url>https://repo.clojars.org/</url> - </repository> - </repositories> - <scm> - <connection>scm:git:git://github.com/ivarref/yoltq.git</connection> - <developerConnection>scm:git:ssh://git@github.com/ivarref/yoltq.git</developerConnection> - <tag>v0.2.64</tag> - <url>https://github.com/ivarref/yoltq</url> - </scm> -</project>
\ No newline at end of file diff --git a/release.sh b/release.sh deleted file mode 100755 index 3d06135..0000000 --- a/release.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash - -if [[ $# -ne 1 ]]; then - echo "Illegal number of parameters" >&2 - exit 2 -fi - -set -ex - -git update-index --refresh -git diff-index --quiet HEAD -- - -clojure -Spom -clojure -M:test -clojure -M:jar -clojure -X:release ivarref.pom-patch/clojars-repo-only! - -LAST_TAG="$(git rev-list --tags --no-walk --max-count=1)" -COMMITS_SINCE_LAST_TAG="$(git rev-list "$LAST_TAG"..HEAD --count)" -echo "Squashing $COMMITS_SINCE_LAST_TAG commits ..." -git reset --soft HEAD~"$COMMITS_SINCE_LAST_TAG" -MSG="$(git log --format=%B --reverse HEAD..HEAD@{1})" -git commit -m"$MSG" - -VERSION="$(clojure -X:release ivarref.pom-patch/set-patch-version! :patch :commit-count)" -echo "Releasing $VERSION" -sed -i "s/HEAD/v$VERSION/g" ./README.md -git add pom.xml README.md -git commit -m "Release $VERSION" -git reset --soft HEAD~2 -git commit -m"Release $VERSION: $1" - -git tag -a v"$VERSION" -m "Release v$VERSION: $1" -git push --follow-tags --force - -clojure -X:deploy -echo "Released $VERSION" diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 1ba286e..ccd9062 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -12,8 +12,7 @@ (:import (datomic Connection) (java.lang.management ManagementFactory) (java.time Duration Instant ZoneOffset ZonedDateTime) - (java.util.concurrent ExecutorService Executors TimeUnit))) - + (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit))) (defonce ^:dynamic *config* (atom nil)) (defonce threadpool (atom nil)) @@ -27,7 +26,7 @@ ; If you want no limit on the number of retries, specify ; the value `0`. That will set the effective retry limit to ; 9223372036854775807 times. - :max-retries 10000 + :max-retries 9223372036854775807 ; Minimum amount of time to wait before a failed queue job is retried :error-backoff-time (Duration/ofSeconds 5) @@ -85,8 +84,11 @@ u/duration->millis)) -(defn init! [{:keys [conn] :as cfg}] +(defn init! [{:keys [conn tx-report-queue] :as cfg}] (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (when (some? tx-report-queue) + (assert (instance? BlockingQueue tx-report-queue) + (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue"))) (locking threadpool @(d/transact conn i/schema) (let [new-cfg (swap! *config* @@ -140,14 +142,39 @@ (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size))) queue-listener-ready (promise)] (reset! *running?* true) - (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) - (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) - (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) + (.execute ^ScheduledExecutorService pool + (fn [] + (try + (log/debug "report-queue-listener starting") + (rq/report-queue-listener *running?* queue-listener-ready pool *config*) + (finally + (log/debug "report-queue-listener exiting") + (deliver queue-listener-ready :finally))))) (future (try (slow-executor/show-slow-threads pool *config*) (finally (deliver slow-thread-watcher-done? :done)))) - @queue-listener-ready))) + (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)] + (cond (= :timeout q-listener-retval) + (do + (log/error "Timed out waiting for report-queue-listener to start") + (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start"))) + + (= :finally q-listener-retval) + (do + (log/error "report-queue-listener did not start") + (throw (IllegalStateException. "report-queue-listener did not start"))) + + (= :ready q-listener-retval) + (do + (log/debug "report-queue-listener is ready")) + + :else + (do + (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))) + (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))))))))) (defn start! [] @@ -348,9 +375,9 @@ :p95 ... :p99 ...}}" [{:keys [age-days queue-name now db duration->long] - :or {age-days 30 - now (ZonedDateTime/now ZoneOffset/UTC) - duration->long (fn [duration] (.toSeconds duration))}}] + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC) + duration->long (fn [duration] (.toSeconds ^Duration duration))}}] (let [{:keys [conn]} @*config* db (or db (d/db conn)) ->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)] @@ -380,6 +407,66 @@ :min (apply min values))}))) (into (sorted-map))))) +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. + The default value for `send-end-token?` is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (boolean? send-end-token?)) + (rq/get-tx-report-queue-multicast! conn id send-end-token?))) + +(defn stop-multicast-consumer-id! + "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. + If this is the last report destination for the given `conn`, the multicaster thread will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if the queue was stopped. + Return `false` if the queue does not exist." + [conn id] + (assert (instance? Connection conn)) + (rq/stop-multicast-consumer-id! conn id)) + +(defn stop-multicaster! + "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. + The multicaster thread will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue belonging to `conn` was stopped. + Returns `false` is `conn` did not have any associated queues." + [conn] + (assert (instance? Connection conn)) + (rq/stop-multicaster! conn)) + +(defn stop-all-multicasters! + "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`. + All multicaster threads will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue was stopped. + Returns `false` if no queues existed." + [] + (rq/stop-all-multicasters!)) + (comment (do (require 'com.github.ivarref.yoltq.log-init) @@ -413,3 +500,40 @@ @started-consuming? (stop!) nil))))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq.migrate"} :warn] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true))) + (log/info "begin start! ...") + (start!) + (log/info "begin start! ... Done") + (Thread/sleep 2000) + (log/info "*******************************************") + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop-multicaster! conn) + (log/info "*******************************************") + (stop!) + (log/info "stop! done") + nil))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 20e0a93..f83e3ba 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -3,9 +3,10 @@ [com.github.ivarref.yoltq.impl :as i] [datomic.api :as d] [clojure.tools.logging :as log]) - (:import (datomic Datom) - (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit))) + (:import (datomic Connection Datom) + (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) +; Private API, subject to change! (defn process-poll-result! [cfg id-ident poll-result consumer] (let [{:keys [tx-data db-after]} poll-result] @@ -28,30 +29,431 @@ (i/take! cfg) (i/execute! cfg))))) (catch Throwable t - (log/error t "unexpected error in process-poll-result!"))))))))) - + (log/error t "Unexpected error in process-poll-result!"))))))))) (defn report-queue-listener [running? ready? ^ScheduledExecutorService pool config-atom] - (let [conn (:conn @config-atom) - ^BlockingQueue q (d/tx-report-queue conn) + (let [cfg @config-atom + conn (:conn cfg) + tx-report-queue-given (contains? cfg :tx-report-queue) + ^BlockingQueue q (if tx-report-queue-given + (get cfg :tx-report-queue) + (d/tx-report-queue conn)) id-ident (d/q '[:find ?e . :where [?e :db/ident :com.github.ivarref.yoltq/id]] (d/db conn))] + (assert (instance? BlockingQueue q)) + (log/debug "tx-report-queue-given:" tx-report-queue-given) (try - (while @running? - (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] - (process-poll-result! @config-atom - id-ident - poll-result - (fn [f] - (when @running? - (.execute ^ScheduledExecutorService pool f))))) - (deliver ready? true)) + (let [running-local? (atom true)] + (while (and @running? @running-local?) + (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] + (if (= poll-result :end) + (do + (log/debug "report-queue-listener received :end token. Exiting") + (reset! running-local? false)) + ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + (process-poll-result! @config-atom + id-ident + poll-result + (fn [f] + (when @running? + (.execute ^ScheduledExecutorService pool f)))))) + (deliver ready? :ready))) (catch Throwable t - (log/error t "unexpected error in report-queue-listener")) + (log/error t "Unexpected error in report-queue-listener:" (.getMessage t))) (finally - (log/debug "remove tx-report-queue") - (d/remove-tx-report-queue conn)))))
\ No newline at end of file + (if tx-report-queue-given + (log/debug "Remove tx-report-queue handled elsewhere") + (do + (log/debug "Remove tx-report-queue") + (d/remove-tx-report-queue conn))))))) + +; https://stackoverflow.com/a/14488425 +(defn- dissoc-in + "Dissociates an entry from a nested associative structure returning a new + nested structure. keys is a sequence of keys. Any empty maps that result + will not be present in the new structure." + [m [k & ks :as keys]] + (if ks + (if-let [nextmap (get m k)] + (let [newmap (dissoc-in nextmap ks)] + (if (seq newmap) + (assoc m k newmap) + (dissoc m k))) + m) + (dissoc m k))) + +(defn- queues-to-shutdown [old-state new-state] + (assert (map? old-state)) + (assert (map? new-state)) + (doseq [x (vals new-state)] + (assert (vector? x))) + (doseq [x (vals old-state)] + (assert (vector? x))) + (let [new-qs (into #{} (mapv second (vals new-state)))] + (reduce + (fn [o [send-end-token? old-q]] + ;(assert (boolean? send-end-token?)) + ;(assert (instance? BlockingQueue old-q)) + (if (contains? new-qs old-q) + o + (conj o [send-end-token? old-q]))) + [] + (vals old-state)))) + +(comment + (queues-to-shutdown {:a [true 999] :b [false 777]} + {:a [true 123] :b [true 777]})) + +(defn- multicast-once [conn work-item old-state new-state] + (assert (map? old-state)) + (assert (map? new-state)) + (doseq [[send-end-token? q-to-shutdown] (queues-to-shutdown old-state new-state)] + (if send-end-token? + (do + #_(log/debug "offering :end token") + (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS) + (log/debug "Multicaster sent :end token") + (log/debug "Multicaster failed to send :end token"))) + (do + (log/debug "Multicaster not sending :end token")))) + (when (seq new-state) + (if (some? work-item) + (reduce-kv + (fn [m id [send-end-token? q]] + (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)] + (if (true? ok-offer) + (assoc m id [send-end-token? q]) + (log/error "Multicaster failed to offer item for connection" conn "and queue id" id)))) + {} + new-state) + new-state))) + +(defonce ^:private multicast-state-lock (Object.)) +(defonce ^:private consumer-state-lock (Object.)) +(defonce ^:private multicast-state (atom {})) +(defonce ^:private thread-count (atom 0)) + +(defn- multicaster-loop [init-state conn ready?] + (assert (instance? Connection conn)) + (let [input-queue (d/tx-report-queue conn)] + (deliver ready? true) + (loop [old-state init-state] + (let [work-item (.poll ^BlockingQueue input-queue 16 TimeUnit/MILLISECONDS) + new-state (locking multicast-state-lock + ; writer to `multicast-state` must be protected by `multicast-state-lock` + ; it should block minimally / spend minimum amount of time + (swap! multicast-state (fn [old-state] (update-in old-state [:iter-count conn] (fnil inc 0)))) + (if-let [new-state (multicast-once conn work-item old-state (get-in @multicast-state [:queues conn] {}))] + new-state + (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))) + (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec))) + (d/remove-tx-report-queue conn) + (log/debug "Multicaster removed tx-report-queue for conn" conn) + nil)))] + (if new-state + (recur new-state) + nil))))) + +(defn- start-multicaster! [conn] + (assert (instance? Connection conn)) + (let [ready? (promise)] + (future + (log/debug "Multicaster starting for conn" conn) + (try + (swap! thread-count inc) + (let [new-state (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] (fnil inc 0))))] + (assert (= 1 (get-in new-state [:thread-count conn]))) + ; "parent" thread holds `multicast-state-lock` and + ; waits for `ready?` promise, so effectively this new thread also holds + ; the lock until `ready?` is delivered. That is: it is safe + ; for this thread to modify multicast-state regardless of what other threads are doing + (multicaster-loop (get-in new-state [:queues conn]) conn ready?)) + (catch Throwable t + (log/error t "Unexpected error in multicaster:" (.getMessage t)) + (log/error "Multicaster exiting for conn")) + (finally + (swap! thread-count dec) + (log/debug "Multicaster exiting for conn" conn)))) + (when (= :timeout (deref ready? 30000 :timeout)) + (throw (RuntimeException. "Timed out waiting for multicaster to start"))))) + +(defn- wait-multicast-thread-step + [conn state] + ; `get-tx-report-queue-multicast!` should return only when the multicaster thread + ; has picked up the new queue. + ; + ; Otherwise the following could happen: + ; 1. multicast thread is sleeping + ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` + ; 3: user-thread (or somebody else) calls `stop-multicaster`. + ; The multicast-state atom is now identical as it was in step 1. + ; , Step 2 and 3 happened while the multicast thread was sleeping. + ; 4: The multicast thread is scheduled and does _not_ detect any state change. + ; Therefore the multicast thread does _not_ send out an :end token as one would expect. + ; + ; The new queue is written to memory at this point. No other thread can remove it because + ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. + ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, + ; we can be sure that no other thread changes or has changed the state. + ; + ; Once [:iter-count conn] has changed, we know that the multicaster thread + ; has seen the new queue. This means that we can be sure that the queue + ; will receive the `:end` token if the queue is stopped. + (let [start-ms (System/currentTimeMillis) + iter-count (get-in state [:iter-count conn] -1)] + (loop [spin-count 0] + (if (not= iter-count (locking multicast-state-lock + (get-in @multicast-state [:iter-count conn] -1))) + nil + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count))))))))) + +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. The default value is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (locking consumer-state-lock + (let [[new-state the-q] + (locking multicast-state-lock + (assert (map? @multicast-state)) + (if-let [existing-q (get-in @multicast-state [:queues conn id])] + (do + (let [new-state (swap! multicast-state + (fn [old-state] + (update-in old-state [:queues conn id] (fn [[end-token? q]] + (if (not= end-token? send-end-token?) + (log/debug "flipped `send-end-token?`") + (log/debug "identical `send-end-token?`")) + [send-end-token? q]))))] + (log/debug "Returning existing queue for id" id) + (assert (instance? BlockingQueue (second existing-q))) + [new-state (second existing-q)])) + (let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn])) + new-q (LinkedBlockingQueue.) + new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))] + (if needs-multicaster? + (do + (start-multicaster! conn) + (log/debug "Returning new queue for id" id "(multicaster thread started)") + [new-state new-q]) + (do + (log/debug "Returning new queue for id" id "(multicaster thread already running)") + [new-state new-q])))))] + ; wait for multicaster thread to pick up current Queue + (wait-multicast-thread-step conn new-state) + the-q)))) + +(defn- wait-multicast-threads-exit [[old-state new-state]] + (assert (map? old-state)) + (assert (map? new-state)) + (assert (map? (get old-state :queues {}))) + (assert (map? (get new-state :queues {}))) + (assert (map? (get old-state :thread-count {}))) + (assert (map? (get new-state :thread-count {}))) + (locking consumer-state-lock + ; No new multicast threads will be launched inside this block. + ; The lock is already held by parent function. + ; + ; Why do we need to _wait_ for multicaster thread(s) to exit after + ; removing all queue ids for a given connection? + ; Otherwise the following could happen: + ; 1. multicaster thread is sleeping + ; 2. user calls stop-multicaster! + ; One would expect that multicaster thread would exit, but it is still sleeping + ; 3. user calls get-tx-report-queue-multicast! with the same conn + ; The state is now empty, so a new multicaster thread is spawned. + ; 4. Now there is two multicaster threads for the same connection! + ; ... and since the datomic report queue can be shared between threads + ; it will seemingly work, but when the end event is sent, it will be + ; sent by multiple threads. + (let [old-conns (into #{} (keys (get old-state :queues {}))) + new-conns (into #{} (keys (get new-state :queues {})))] + (assert (every? + (fn [x] (instance? Connection x)) + old-conns)) + (assert (every? + (fn [x] (instance? Connection x)) + new-conns)) + (doseq [old-conn old-conns] + (when-not (contains? new-conns old-conn) + (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)] + (assert (= 1 old-threadcount)) + (let [start-ms (System/currentTimeMillis)] + (loop [] + (if (= 0 (get-in @multicast-state [:thread-count old-conn])) + :ok + (do + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread to exit")) + (do + (Thread/sleep 16) + (recur)))))))))))))) + +(defn- all-queues [state] + (->> (mapcat (fn [[conn qmap]] + (mapv (fn [q-id] [conn q-id]) + (keys qmap))) + (seq (get state :queues {}))) + (into #{}))) + +(comment + (do + (assert (= #{} + (all-queues {}))) + (assert (= #{} + (all-queues {:queues {}}))) + (assert (= #{[:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1}}}))) + (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}}))) + (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2} + :conn-b {:q-id-3 3}}}))))) + +(defn- removed-queues? [old new] + (not= (all-queues old) + (all-queues new))) + +(defn stop-multicast-consumer-id! [conn id] + (assert (instance? Connection conn)) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] + (let [new-state (dissoc-in old-state [:queues conn id])] + (if (= {} (get-in new-state [:queues conn])) + (dissoc-in old-state [:queues conn]) + new-state))))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(defn stop-multicaster! [conn] + (assert (instance? Connection conn)) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(defn stop-all-multicasters! [] + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (require '[datomic.api :as d]) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (defn drain! [^BlockingQueue q] + (loop [items []] + (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)] + (recur (conj items elem)) + items))) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (log/info "********************************") + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)) + (log/info "stop-all!") + (stop-all-multicasters!) + (assert (= 0 @thread-count)) + (let [q1 (get-tx-report-queue-multicast! conn :q1 false) + q2 (get-tx-report-queue-multicast! conn :q2 false) + _ (get-tx-report-queue-multicast! conn :q1 true)] + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + (log/info "begin drain q1") + (stop-multicast-consumer-id! conn :q1) + (stop-multicast-consumer-id! conn :q1) + (println "thread count" @thread-count) + (let [qitems-2 (drain! q2) + qitems-1 (drain! q1)] + (assert (= :end (last qitems-1))) + (println "drain count q1:" (count qitems-1)) + (println "drain count q2:" (count qitems-2)))))) + +(comment + (do + (let [q (get-tx-report-queue-multicast! conn :q1 true)] + (log/debug "stopping id :q1") + (stop-multicaster-id! conn :q1) + (let [drained (drain! q)] + (println "drained:" drained) + (assert (= [:end] drained))) + @multicast-state))) + +(comment + (stop-all-multicasters!)) + +(comment + (do + (let [q (get-tx-report-queue-multicast! conn :q2 false)] + (println "drain count:" (count (drain! q))) + @multicast-state + nil))) + +(comment + (get-tx-report-queue-multicast! conn :q1 false) + (get-tx-report-queue-multicast! conn :q1 true)) + +(comment + (do + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + :yay))
\ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj index 1aa6c02..7eae557 100644 --- a/test/com/github/ivarref/yoltq/log_init.clj +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -3,6 +3,8 @@ [taoensso.timbre :as timbre] [clojure.string :as str])) +(set! *warn-on-reflection* true) + (def level-colors {;:warn colors/red :error colors/red}) @@ -46,7 +48,7 @@ (color-f (force msg_)) - #_maybe-stacktrace)))) + maybe-stacktrace)))) (catch Throwable t |
