From 1df100143cf935cca10f0afa62ef00f2673c655a Mon Sep 17 00:00:00 2001 From: ire Date: Tue, 13 May 2025 19:02:10 +0200 Subject: Fix reflection warnings --- src/com/github/ivarref/yoltq.clj | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) (limited to 'src/com/github/ivarref/yoltq.clj') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 379d701..a7dcddf 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -12,8 +12,7 @@ (:import (datomic Connection) (java.lang.management ManagementFactory) (java.time Duration Instant ZoneOffset ZonedDateTime) - (java.util.concurrent ExecutorService Executors TimeUnit))) - + (java.util.concurrent ExecutorService Executors ScheduledExecutorService TimeUnit))) (defonce ^:dynamic *config* (atom nil)) (defonce threadpool (atom nil)) @@ -85,7 +84,7 @@ u/duration->millis)) -(defn init! [{:keys [conn] :as cfg}] +(defn init! [{:keys [conn tx-report-queue] :as cfg}] (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) (locking threadpool @(d/transact conn i/schema) @@ -97,6 +96,9 @@ :system-error (atom {}) :healthy? (atom nil) :slow? (atom nil) + :get-tx-report-queue (fn [] + (or tx-report-queue + (d/tx-report-queue conn))) :slow-thread-watcher-done? (promise)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) @@ -140,9 +142,9 @@ (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size))) queue-listener-ready (promise)] (reset! *running?* true) - (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) - (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) - (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) + (.execute ^ScheduledExecutorService pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) (future (try (slow-executor/show-slow-threads pool *config*) (finally @@ -327,7 +329,7 @@ [{:keys [age-days queue-name now db duration->long] :or {age-days 30 now (ZonedDateTime/now ZoneOffset/UTC) - duration->long (fn [duration] (.toSeconds duration))}}] + duration->long (fn [duration] (.toSeconds ^Duration duration))}}] (let [{:keys [conn]} @*config* db (or db (d/db conn)) ->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)] @@ -357,6 +359,15 @@ :min (apply min values))}))) (into (sorted-map))))) + + +(defn add-tx-report-queue! + ([conn] + (add-tx-report-queue! conn :default)) + ([conn id] + (if @*config* + :...))) + (comment (do (require 'com.github.ivarref.yoltq.log-init) -- cgit v1.2.3 From ae49a7ec82ecd3988e0f7825b0adead1dc77c911 Mon Sep 17 00:00:00 2001 From: ire Date: Tue, 13 May 2025 21:39:07 +0200 Subject: Fix tx-report-queue sharing #7 --- README.md | 34 +++++++ deps.edn | 54 +++++------ src/com/github/ivarref/yoltq.clj | 86 ++++++++++++++--- src/com/github/ivarref/yoltq/report_queue.clj | 133 ++++++++++++++++++++++++-- test/com/github/ivarref/yoltq/log_init.clj | 2 + 5 files changed, 258 insertions(+), 51 deletions(-) (limited to 'src/com/github/ivarref/yoltq.clj') diff --git a/README.md b/README.md index c5f2bdb..f84a336 100644 --- a/README.md +++ b/README.md @@ -434,6 +434,40 @@ If you liked this library, you may also like: ## Change log +#### 2025-05-13 v0.2.?? [diff](https://github.com/ivarref/yoltq/compare/v0.2.64...HEAD) +Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will +then not grab the datomic report queue, but use the one provided: + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(yq/init! {:conn conn + :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq) + ; ^^ can be any `java.util.concurrent.BlockingQueue` value + }) + +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id)) + +``` + +Added multicast support for `datomic.api/tx-report-queue`: +```clojure +(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1)) +; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` + +(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +``` + +`yq/get-tx-report-queue-multicast!` returns, like +`datomic.api/tx-report-queue`, +`java.util.concurrent.BlockingQueue` and starts a background thread that does +the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!` +returns the same `BlockingQueue`. + +Changed the default for `max-retries` from `10000` to `9223372036854775807`. + +Fixed reflection warnings. + #### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64) Added support for `max-retries` being `0`, meaning the job should be retried forever (or at least 9223372036854775807 times). diff --git a/deps.edn b/deps.edn index e36885e..1e3fa9d 100644 --- a/deps.edn +++ b/deps.edn @@ -1,33 +1,31 @@ -{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} - org.clojure/tools.logging {:mvn/version "1.2.4"} - org.clojure/clojure {:mvn/version "1.11.1"}} +{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"} + com.datomic/peer {:mvn/version "1.0.7364"}} - :paths ["src"] + :paths ["src"] - :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} - :test {:extra-paths ["test"] - :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.2.1"} - com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} - clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} - com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} - org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} - com.taoensso/nippy {:mvn/version "3.2.0"} - io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} - :jvm-opts ["-DDISABLE_SPY=true" - "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] - :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + :aliases {:test {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :exec-fn cognitect.test-runner.api/test + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} - :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" - :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} - :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" + :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} + :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} - :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} - :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} - :exec-fn deps-deploy.deps-deploy/deploy - :exec-args {:installer :remote - :sign-releases? false - :artifact "target/out.jar"}}} - - :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}} + :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} + :exec-fn deps-deploy.deps-deploy/deploy + :exec-args {:installer :remote + :sign-releases? false + :artifact "target/out.jar"}}}} \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index a7dcddf..32298b7 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -12,7 +12,7 @@ (:import (datomic Connection) (java.lang.management ManagementFactory) (java.time Duration Instant ZoneOffset ZonedDateTime) - (java.util.concurrent ExecutorService Executors ScheduledExecutorService TimeUnit))) + (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit))) (defonce ^:dynamic *config* (atom nil)) (defonce threadpool (atom nil)) @@ -26,7 +26,7 @@ ; If you want no limit on the number of retries, specify ; the value `0`. That will set the effective retry limit to ; 9223372036854775807 times. - :max-retries 10000 + :max-retries 9223372036854775807 ; Minimum amount of time to wait before a failed queue job is retried :error-backoff-time (Duration/ofSeconds 5) @@ -86,6 +86,9 @@ (defn init! [{:keys [conn tx-report-queue] :as cfg}] (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (when (some? tx-report-queue) + (assert (instance? BlockingQueue tx-report-queue) + (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue"))) (locking threadpool @(d/transact conn i/schema) (let [new-cfg (swap! *config* @@ -96,9 +99,6 @@ :system-error (atom {}) :healthy? (atom nil) :slow? (atom nil) - :get-tx-report-queue (fn [] - (or tx-report-queue - (d/tx-report-queue conn))) :slow-thread-watcher-done? (promise)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) @@ -144,12 +144,37 @@ (reset! *running?* true) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) - (.execute ^ScheduledExecutorService pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.execute ^ScheduledExecutorService pool + (fn [] + (try + (log/debug "report-queue-listener starting") + (rq/report-queue-listener *running?* queue-listener-ready pool *config*) + (finally + (log/debug "report-queue-listener exiting") + (deliver queue-listener-ready :finally))))) (future (try (slow-executor/show-slow-threads pool *config*) (finally (deliver slow-thread-watcher-done? :done)))) - @queue-listener-ready))) + (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)] + (cond (= :timeout q-listener-retval) + (do + (log/error "Timed out waiting for report-queue-listener to start") + (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start"))) + + (= :finally q-listener-retval) + (do + (log/error "report-queue-listener did not start") + (throw (IllegalStateException. "report-queue-listener did not start"))) + + (= :ready q-listener-retval) + (do + (log/debug "report-queue-listener is ready")) + + :else + (do + (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))) + (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))))))))) (defn start! [] @@ -359,14 +384,13 @@ :min (apply min values))}))) (into (sorted-map))))) +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + The multicaster is started on demand. `conn` and `id` identifies the consumer. - -(defn add-tx-report-queue! - ([conn] - (add-tx-report-queue! conn :default)) - ([conn id] - (if @*config* - :...))) + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + [conn id] + (rq/get-tx-report-queue-multicast! conn id)) (comment (do @@ -401,3 +425,37 @@ @started-consuming? (stop!) nil))))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq.migrate"} :warn] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true))) + (log/info "begin start! ...") + (start!) + (log/info "begin start! ... Done") + (Thread/sleep 2000) + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop!) + (log/info "stop! done") + nil)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 20e0a93..9cddc93 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -3,8 +3,8 @@ [com.github.ivarref.yoltq.impl :as i] [datomic.api :as d] [clojure.tools.logging :as log]) - (:import (datomic Datom) - (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit))) + (:import (datomic Connection Datom) + (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) (defn process-poll-result! [cfg id-ident poll-result consumer] @@ -28,18 +28,24 @@ (i/take! cfg) (i/execute! cfg))))) (catch Throwable t - (log/error t "unexpected error in process-poll-result!"))))))))) + (log/error t "Unexpected error in process-poll-result!"))))))))) (defn report-queue-listener [running? ready? ^ScheduledExecutorService pool config-atom] - (let [conn (:conn @config-atom) - ^BlockingQueue q (d/tx-report-queue conn) + (let [cfg @config-atom + conn (:conn cfg) + tx-report-queue-given (contains? cfg :tx-report-queue) + ^BlockingQueue q (if tx-report-queue-given + (get cfg :tx-report-queue) + (d/tx-report-queue conn)) id-ident (d/q '[:find ?e . :where [?e :db/ident :com.github.ivarref.yoltq/id]] (d/db conn))] + (assert (instance? BlockingQueue q)) + (log/debug "tx-report-queue-given:" tx-report-queue-given) (try (while @running? (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] @@ -49,9 +55,118 @@ (fn [f] (when @running? (.execute ^ScheduledExecutorService pool f))))) - (deliver ready? true)) + (deliver ready? :ready)) (catch Throwable t - (log/error t "unexpected error in report-queue-listener")) + (log/error t "Unexpected error in report-queue-listener:" (.getMessage t))) (finally - (log/debug "remove tx-report-queue") - (d/remove-tx-report-queue conn))))) \ No newline at end of file + (if tx-report-queue-given + (log/debug "Remove tx-report-queue handled elsewhere") + (do + (log/debug "Remove tx-report-queue") + (d/remove-tx-report-queue conn))))))) + +(defonce ^:private multicast-state-lock (Object.)) + +(defonce ^:private multicast-state (atom {})) + +(defn- start-multicaster! [conn] + (let [multicaster-ready? (promise)] + (future + (log/debug "Multicaster starting for conn" conn) + (try + (let [input-queue (d/tx-report-queue conn)] + (loop [] + (when-let [mcast-state (get @multicast-state conn)] + (when-let [dest-queues (vals mcast-state)] + (let [element (.poll ^BlockingQueue input-queue 1 TimeUnit/SECONDS)] + (deliver multicaster-ready? :ready) + (when (some? element) + (doseq [q dest-queues] + (let [ok-offer (.offer ^BlockingQueue q element 30 TimeUnit/MINUTES)] + (when (false? ok-offer) + (log/error "Failed to offer item in multicaster for connection" conn)))))) + (recur))))) + (catch Throwable t + (deliver multicaster-ready? :error) + (log/error t "Unexpected error in multicaster:" (.getMessage t))) + (finally + (d/remove-tx-report-queue conn) + (log/debug "Multicaster exiting for conn" conn)))) + multicaster-ready?)) + +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + The multicaster is started on demand. `conn` and `id` identifies the consumer. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + [conn id] + (assert (instance? Connection conn)) + (assert (keyword? id)) + (locking multicast-state-lock + (assert (map? @multicast-state)) + (if-let [existing-q (get-in @multicast-state [conn id])] + (do + (log/debug "returning existing queue for id" id) + (assert (instance? BlockingQueue existing-q)) + existing-q) + (let [needs-multicaster? (not (contains? @multicast-state conn)) + new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [conn id] (LinkedBlockingQueue.))))] + (when needs-multicaster? + (let [multicaster-promise (start-multicaster! conn) + multicaster-result (deref multicaster-promise (* 30 60000) :timeout)] + (cond (= multicaster-result :timeout) + (do + (log/error "Timeout waiting for multicaster to start") + (throw (RuntimeException. "Timeout waiting for multicaster to start"))) + (= multicaster-result :error) + (do + (log/error "Multicaster failed to start") + (throw (RuntimeException. "Multicaster failed to start"))) + (= multicaster-result :ready) + (log/debug "Multicaster is ready") + + :else + (do + (log/error "Unexpected state from multicaster:" multicaster-result) + (throw (RuntimeException. (str "Unexpected state from multicaster: " multicaster-result))))))) + (let [new-q (get-in new-state [conn id])] + (assert (instance? BlockingQueue new-q)) + new-q))))) + +(defn stop-all-multicasters! [] + (reset! multicast-state {})) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) + +(comment + (defn drain! [^BlockingQueue q] + (loop [cnt 0] + (if (nil? (.poll q 1 TimeUnit/SECONDS)) + cnt + (recur (inc cnt)))))) + +(comment + (let [q-1 (get-tx-report-queue-multicast! conn :q1) + q-2 (get-tx-report-queue-multicast! conn :q2)])) + +(comment + (drain! (get-tx-report-queue-multicast! conn :q1))) + +(comment + (do + @(d/transact conn [{:db/doc "demo"}]) + :yay)) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj index 1aa6c02..f3fb6dc 100644 --- a/test/com/github/ivarref/yoltq/log_init.clj +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -3,6 +3,8 @@ [taoensso.timbre :as timbre] [clojure.string :as str])) +(set! *warn-on-reflection* true) + (def level-colors {;:warn colors/red :error colors/red}) -- cgit v1.2.3 From ccfe353aebe8c22429fdaf76a5e0bf34cefca955 Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 12:04:15 +0200 Subject: Small fixes #7 --- src/com/github/ivarref/yoltq.clj | 51 ++++++++++++++-- src/com/github/ivarref/yoltq/report_queue.clj | 86 +++++++++++++-------------- 2 files changed, 88 insertions(+), 49 deletions(-) (limited to 'src/com/github/ivarref/yoltq.clj') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 32298b7..80c9491 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -352,8 +352,8 @@ :p95 ... :p99 ...}}" [{:keys [age-days queue-name now db duration->long] - :or {age-days 30 - now (ZonedDateTime/now ZoneOffset/UTC) + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC) duration->long (fn [duration] (.toSeconds ^Duration duration))}}] (let [{:keys [conn]} @*config* db (or db (d/db conn)) @@ -386,11 +386,50 @@ (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. - The multicaster is started on demand. `conn` and `id` identifies the consumer. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. The default value is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (boolean? send-end-token?)) + (rq/get-tx-report-queue-multicast! conn id send-end-token?))) + +(defn stop-multicaster-id! + "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. + If this is the last report destination for the given `conn`, the multicaster thread will exit. + Repeated calls are no-op. - Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + Returns nil." [conn id] - (rq/get-tx-report-queue-multicast! conn id)) + (assert (instance? Connection conn)) + (rq/stop-multicaster-id! conn id)) + +(defn stop-multicaster! + "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. + The multicaster thread will exit. + Repeated calls are no-op. + + Returns nil." + [conn] + (assert (instance? Connection conn)) + (rq/stop-multicaster! conn)) + +(defn stop-all-multicasters! + "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`. + All multicaster threads will exit. + Repeated calls are no-op. + + Returns nil." + [] + (rq/stop-all-multicasters!)) (comment (do @@ -446,7 +485,7 @@ started-consuming? (promise) n 1] (init! {:conn conn - :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true))) diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 2a2e489..a9f7e07 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -6,6 +6,8 @@ (:import (datomic Connection Datom) (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) +; Private API, subject to change! + (defn process-poll-result! [cfg id-ident poll-result consumer] (let [{:keys [tx-data db-after]} poll-result] (when-let [new-ids (->> tx-data @@ -50,8 +52,9 @@ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] (if (= poll-result :end) (do - (reset! running-local? false) - #_(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + (log/debug "Report queue listener received :end token. Exiting") + (reset! running-local? false)) + ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) (process-poll-result! @config-atom id-ident poll-result @@ -175,7 +178,8 @@ (when (= :timeout (deref ready? 30000 :timeout)) (throw (RuntimeException. "Timed out waiting for multicaster to start"))))) -(defn- wait-multicast-thread-step [conn]) +(defn- wait-multicast-thread-step + [conn] ; `get-tx-report-queue-multicast!` should return only when the multicaster thread ; has picked up the new queue. ; @@ -196,29 +200,34 @@ ; Once [:iter-count conn] has changed, we know that the multicaster thread ; will see the new queue. This means that we can be sure that the queue ; will receive the `:end` token if the queue is stopped. -(let [start-ms (System/currentTimeMillis) - iter-count (get-in @multicast-state [:iter-count conn] -1)] - (loop [spin-count 0] - (if (not= iter-count (get-in @multicast-state [:iter-count conn])) - nil - (do - (let [spent-ms (- (System/currentTimeMillis) start-ms)] - (if (> spent-ms 30000) - (throw (RuntimeException. "Timed out waiting for multicaster thread")) - (do - (Thread/sleep 16) - (recur (inc spin-count))))))))) + (let [start-ms (System/currentTimeMillis) + iter-count (get-in @multicast-state [:iter-count conn] -1)] + (loop [spin-count 0] + (if (not= iter-count (get-in @multicast-state [:iter-count conn])) + nil + (do + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count)))))))))) (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. - The multicaster is started on demand. `conn` and `id` identifies the consumer. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. The default value is `false`. - Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." ([conn id] (get-tx-report-queue-multicast! conn id false)) ([conn id send-end-token?] (assert (instance? Connection conn)) - (assert (keyword? id)) (locking consumer-state-lock (let [the-q (locking multicast-state-lock @@ -250,7 +259,7 @@ (wait-multicast-thread-step conn) the-q)))) -(defn wait-multicast-threads-exit [[old-state new-state]] +(defn- wait-multicast-threads-exit [[old-state new-state]] (assert (map? old-state)) (assert (map? new-state)) (assert (map? (get old-state :queues {}))) @@ -275,6 +284,12 @@ ; sent by multiple threads. (let [old-conns (into #{} (keys (get old-state :queues {}))) new-conns (into #{} (keys (get new-state :queues {})))] + (assert (every? + (fn [x] (instance? Connection x)) + old-conns)) + (assert (every? + (fn [x] (instance? Connection x)) + new-conns)) (doseq [old-conn old-conns] (when-not (contains? new-conns old-conn) (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)] @@ -292,6 +307,7 @@ (recur)))))))))))))) (defn stop-multicaster-id! [conn id] + (assert (instance? Connection conn)) (locking consumer-state-lock (wait-multicast-threads-exit (locking multicast-state-lock @@ -299,40 +315,23 @@ (let [new-state (dissoc-in old-state [:queues conn id])] (if (= {} (get-in new-state [:queues conn])) (dissoc-in old-state [:queues conn]) - new-state)))))))) + new-state))))))) + nil) (defn stop-multicaster! [conn] + (assert (instance? Connection conn)) (locking consumer-state-lock (wait-multicast-threads-exit (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))))))) + (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))))) + nil) (defn stop-all-multicasters! [] (locking consumer-state-lock (wait-multicast-threads-exit (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {}))))))) - -(comment - (do - (require 'com.github.ivarref.yoltq.log-init) - (defn drain! [^BlockingQueue q] - (loop [items []] - (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)] - (recur (conj items elem)) - items))) - (com.github.ivarref.yoltq.log-init/init-logging! - [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] - [#{"com.github.ivarref.yoltq.report-queue"} :debug] - [#{"com.github.ivarref.yoltq.poller"} :info] - [#{"com.github.ivarref.yoltq"} :debug] - ;[#{"ivarref.yoltq*"} :info] - [#{"*"} :info]]) - (defonce conn (let [uri (str "datomic:mem://demo") - _ (d/delete-database uri) - _ (d/create-database uri) - conn (d/connect uri)] - conn)))) + (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))))) + nil) (comment (do @@ -366,6 +365,7 @@ @(d/transact conn [{:db/doc "demo"}]) (log/info "begin drain q1") (stop-multicaster-id! conn :q1) + (stop-multicaster-id! conn :q1) (println "thread count" @thread-count) (let [qitems-2 (drain! q2) qitems-1 (drain! q1)] -- cgit v1.2.3 From a1e4e1b96fd254ec7d7e467648dd5e88f1c9530b Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 13:38:51 +0200 Subject: Doc. Return true/false if queues were stopped or not #7 --- README.md | 37 ++++++- deps.edn | 12 +++ src/com/github/ivarref/yoltq.clj | 23 +++- src/com/github/ivarref/yoltq/report_queue.clj | 146 ++++++++++++++++++-------- 4 files changed, 163 insertions(+), 55 deletions(-) (limited to 'src/com/github/ivarref/yoltq.clj') diff --git a/README.md b/README.md index f84a336..675fa9a 100644 --- a/README.md +++ b/README.md @@ -441,21 +441,48 @@ then not grab the datomic report queue, but use the one provided: ```clojure (require '[com.github.ivarref.yoltq :as yq]) (yq/init! {:conn conn - :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq) + :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq) ; ^^ can be any `java.util.concurrent.BlockingQueue` value }) -(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id)) +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id)) ``` Added multicast support for `datomic.api/tx-report-queue`: ```clojure -(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1)) +(require '[com.github.ivarref.yoltq :as yq]) +(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1)) ; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` -(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2)) -; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` for the given `conn` + +(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true)) +; my-q3 specifies the third argument, `send-end-token?`, to true, so it will receive `:end` if the queue is stopped. +; This can enable simpler consuming of queues: +(future + (loop [] + (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)] + (if (= q-item :end) + (println "Time to exit. Goodbye!") + (do + (println "Processing q-item" q-item) + (recur)))))) + +@(d/transact conn [{:db/doc "new-data"}]) + +; Stop the queue: +(yq/stop-multicaster-id! conn :q-id-3) +=> true +; The multicaster thread will send `:end` and the consumer thread will then print "Time to exit. Goodbye!". + +; if the queue is already stopped (or never was started), `stop-multicaster...` functions will return false: +(yq/stop-multicaster-id! conn :already-stopped-queue-or-typo) +=> false + +; Stop all queues for all connections: +(yq/stop-all-multicasters!) ``` `yq/get-tx-report-queue-multicast!` returns, like diff --git a/deps.edn b/deps.edn index 1e3fa9d..a328c86 100644 --- a/deps.edn +++ b/deps.edn @@ -22,6 +22,18 @@ :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :repl {:extra-paths ["test"] + :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} + ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"}} + :exec-fn rebel-readline.tool/repl + :exec-args {} + :main-opts ["-m" "rebel-readline.main"]} + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 80c9491..298b9d5 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -407,7 +407,11 @@ If this is the last report destination for the given `conn`, the multicaster thread will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if the queue was stopped. + Return `false` if the queue does not exist." [conn id] (assert (instance? Connection conn)) (rq/stop-multicaster-id! conn id)) @@ -417,7 +421,11 @@ The multicaster thread will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue belonging to `conn` was stopped. + Returns `false` is `conn` did not have any associated queues." [conn] (assert (instance? Connection conn)) (rq/stop-multicaster! conn)) @@ -427,7 +435,11 @@ All multicaster threads will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue was stopped. + Returns `false` if no queues existed." [] (rq/stop-all-multicasters!)) @@ -485,7 +497,7 @@ started-consuming? (promise) n 1] (init! {:conn conn - :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true))) @@ -493,8 +505,11 @@ (start!) (log/info "begin start! ... Done") (Thread/sleep 2000) + (log/info "*******************************************") @(d/transact conn [(put :q {:work 123})]) @started-consuming? + (stop-multicaster! conn) + (log/info "*******************************************") (stop!) (log/info "stop! done") nil)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index a9f7e07..c3fd383 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -52,9 +52,9 @@ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] (if (= poll-result :end) (do - (log/debug "Report queue listener received :end token. Exiting") + (log/debug "report-queue-listener received :end token. Exiting") (reset! running-local? false)) - ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) (process-poll-result! @config-atom id-ident poll-result @@ -115,9 +115,11 @@ (if send-end-token? (do #_(log/debug "offering :end token") - (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)) + (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS) + (log/debug "Multicaster sent :end token") + (log/debug "Multicaster failed to send :end token"))) (do - #_(log/debug "not offering :end token")))) + (log/debug "Multicaster not sending :end token")))) (when (seq new-state) (if (some? work-item) (reduce-kv @@ -125,7 +127,7 @@ (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)] (if (true? ok-offer) (assoc m id [send-end-token? q]) - (log/warn "Failed to offer item in multicaster for connection" conn "and queue id" id)))) + (log/error "Multicaster failed to offer item for connection" conn "and queue id" id)))) {} new-state) new-state))) @@ -150,6 +152,7 @@ (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))) (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec))) (d/remove-tx-report-queue conn) + (log/debug "Multicaster removed tx-report-queue for conn" conn) nil)))] (if new-state (recur new-state) @@ -180,26 +183,26 @@ (defn- wait-multicast-thread-step [conn] -; `get-tx-report-queue-multicast!` should return only when the multicaster thread -; has picked up the new queue. -; -; Otherwise the following could happen: -; 1. multicast thread is sleeping -; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` -; 3: user-thread (or somebody else) calls `stop-multicaster`. -; The multicast-state atom is now identical as it was in step 1. -; , Step 2 and 3 happened while the multicast thread was sleeping. -; 4: The multicast thread is scheduled and does _not_ detect any state change. -; Therefore the multicast thread does _not_ send out an :end token as one would expect. -; -; The new queue is written to memory at this point. No other thread can remove it because -; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. -; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, -; we can be sure that no other thread changes or has changed the state. -; -; Once [:iter-count conn] has changed, we know that the multicaster thread -; will see the new queue. This means that we can be sure that the queue -; will receive the `:end` token if the queue is stopped. + ; `get-tx-report-queue-multicast!` should return only when the multicaster thread + ; has picked up the new queue. + ; + ; Otherwise the following could happen: + ; 1. multicast thread is sleeping + ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` + ; 3: user-thread (or somebody else) calls `stop-multicaster`. + ; The multicast-state atom is now identical as it was in step 1. + ; , Step 2 and 3 happened while the multicast thread was sleeping. + ; 4: The multicast thread is scheduled and does _not_ detect any state change. + ; Therefore the multicast thread does _not_ send out an :end token as one would expect. + ; + ; The new queue is written to memory at this point. No other thread can remove it because + ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. + ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, + ; we can be sure that no other thread changes or has changed the state. + ; + ; Once [:iter-count conn] has changed, we know that the multicaster thread + ; will see the new queue. This means that we can be sure that the queue + ; will receive the `:end` token if the queue is stopped. (let [start-ms (System/currentTimeMillis) iter-count (get-in @multicast-state [:iter-count conn] -1)] (loop [spin-count 0] @@ -250,10 +253,10 @@ (if needs-multicaster? (do (start-multicaster! conn) - (log/debug "Multicaster thread started. Returning new queue for id" id) + (log/debug "Returning new queue for id" id "(multicaster thread started)") new-q) (do - (log/debug "Multicaster thread already exists. Returning new queue for id" id) + (log/debug "Returning new queue for id" id "(multicaster thread already running)") new-q)))))] ; wait for multicaster thread to pick up current Queue (wait-multicast-thread-step conn) @@ -306,32 +309,83 @@ (Thread/sleep 16) (recur)))))))))))))) +(defn- all-queues [state] + (->> (mapcat (fn [[conn qmap]] + (mapv (fn [q-id] [conn q-id]) + (keys qmap))) + (seq (get state :queues {}))) + (into #{}))) + +(comment + (do + (assert (= #{} + (all-queues {}))) + (assert (= #{} + (all-queues {:queues {}}))) + (assert (= #{[:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1}}}))) + (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}}))) + (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2} + :conn-b {:q-id-3 3}}}))))) + +(defn- removed-queues? [old new] + (not= (all-queues old) + (all-queues new))) + (defn stop-multicaster-id! [conn id] (assert (instance? Connection conn)) - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] - (let [new-state (dissoc-in old-state [:queues conn id])] - (if (= {} (get-in new-state [:queues conn])) - (dissoc-in old-state [:queues conn]) - new-state))))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] + (let [new-state (dissoc-in old-state [:queues conn id])] + (if (= {} (get-in new-state [:queues conn])) + (dissoc-in old-state [:queues conn]) + new-state))))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) (defn stop-multicaster! [conn] (assert (instance? Connection conn)) - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) (defn stop-all-multicasters! [] - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (require '[datomic.api :as d]) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) (comment (do -- cgit v1.2.3 From 4a3a9a5da6e7a8771eb3adb79172aca5ce8f26a6 Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 13:48:10 +0200 Subject: Misc #7 --- README.md | 4 ++-- src/com/github/ivarref/yoltq.clj | 4 ++-- src/com/github/ivarref/yoltq/report_queue.clj | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) (limited to 'src/com/github/ivarref/yoltq.clj') diff --git a/README.md b/README.md index 675fa9a..dda9b47 100644 --- a/README.md +++ b/README.md @@ -473,12 +473,12 @@ Added multicast support for `datomic.api/tx-report-queue`: @(d/transact conn [{:db/doc "new-data"}]) ; Stop the queue: -(yq/stop-multicaster-id! conn :q-id-3) +(yq/stop-multicast-consumer-id! conn :q-id-3) => true ; The multicaster thread will send `:end` and the consumer thread will then print "Time to exit. Goodbye!". ; if the queue is already stopped (or never was started), `stop-multicaster...` functions will return false: -(yq/stop-multicaster-id! conn :already-stopped-queue-or-typo) +(yq/stop-multicast-consumer-id! conn :already-stopped-queue-or-typo) => false ; Stop all queues for all connections: diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 298b9d5..45f2051 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -402,7 +402,7 @@ (assert (boolean? send-end-token?)) (rq/get-tx-report-queue-multicast! conn id send-end-token?))) -(defn stop-multicaster-id! +(defn stop-multicast-consumer-id! "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. If this is the last report destination for the given `conn`, the multicaster thread will exit. Repeated calls are no-op. @@ -414,7 +414,7 @@ Return `false` if the queue does not exist." [conn id] (assert (instance? Connection conn)) - (rq/stop-multicaster-id! conn id)) + (rq/stop-multicast-consumer-id! conn id)) (defn stop-multicaster! "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index c3fd383..b3685b9 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -334,7 +334,7 @@ (not= (all-queues old) (all-queues new))) -(defn stop-multicaster-id! [conn id] +(defn stop-multicast-consumer-id! [conn id] (assert (instance? Connection conn)) (let [did-remove? (atom nil)] (locking consumer-state-lock -- cgit v1.2.3 From 22ca1bb29111f9a0246c54e7f81806794198c25f Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 13:59:55 +0200 Subject: Doc #7 --- README.md | 3 +++ src/com/github/ivarref/yoltq.clj | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) (limited to 'src/com/github/ivarref/yoltq.clj') diff --git a/README.md b/README.md index 836ed49..0635d5f 100644 --- a/README.md +++ b/README.md @@ -471,6 +471,9 @@ Added multicast support for `datomic.api/tx-report-queue`: (println "Processing q-item" q-item) (recur)))))) +; The default value for `send-end-token?` is `false`, i.e. the behaviour will be +; identical to that of datomic.api/tx-report-queue. + @(d/transact conn [{:db/doc "new-data"}]) ; Stop the queue: diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 45f2051..0f63e25 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -390,7 +390,8 @@ Repeated calls using the same `conn` and `id` returns the same queue. The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread - to send `:end` if the queue is stopped. The default value is `false`. + to send `:end` if the queue is stopped. + The default value for `send-end-token?` is `false`. A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. -- cgit v1.2.3