From 1df100143cf935cca10f0afa62ef00f2673c655a Mon Sep 17 00:00:00 2001 From: ire Date: Tue, 13 May 2025 19:02:10 +0200 Subject: Fix reflection warnings --- src/com/github/ivarref/yoltq.clj | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 379d701..a7dcddf 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -12,8 +12,7 @@ (:import (datomic Connection) (java.lang.management ManagementFactory) (java.time Duration Instant ZoneOffset ZonedDateTime) - (java.util.concurrent ExecutorService Executors TimeUnit))) - + (java.util.concurrent ExecutorService Executors ScheduledExecutorService TimeUnit))) (defonce ^:dynamic *config* (atom nil)) (defonce threadpool (atom nil)) @@ -85,7 +84,7 @@ u/duration->millis)) -(defn init! [{:keys [conn] :as cfg}] +(defn init! [{:keys [conn tx-report-queue] :as cfg}] (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) (locking threadpool @(d/transact conn i/schema) @@ -97,6 +96,9 @@ :system-error (atom {}) :healthy? (atom nil) :slow? (atom nil) + :get-tx-report-queue (fn [] + (or tx-report-queue + (d/tx-report-queue conn))) :slow-thread-watcher-done? (promise)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) @@ -140,9 +142,9 @@ (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size))) queue-listener-ready (promise)] (reset! *running?* true) - (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) - (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) - (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) + (.execute ^ScheduledExecutorService pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) (future (try (slow-executor/show-slow-threads pool *config*) (finally @@ -327,7 +329,7 @@ [{:keys [age-days queue-name now db duration->long] :or {age-days 30 now (ZonedDateTime/now ZoneOffset/UTC) - duration->long (fn [duration] (.toSeconds duration))}}] + duration->long (fn [duration] (.toSeconds ^Duration duration))}}] (let [{:keys [conn]} @*config* db (or db (d/db conn)) ->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)] @@ -357,6 +359,15 @@ :min (apply min values))}))) (into (sorted-map))))) + + +(defn add-tx-report-queue! + ([conn] + (add-tx-report-queue! conn :default)) + ([conn id] + (if @*config* + :...))) + (comment (do (require 'com.github.ivarref.yoltq.log-init) -- cgit v1.2.3 From ae49a7ec82ecd3988e0f7825b0adead1dc77c911 Mon Sep 17 00:00:00 2001 From: ire Date: Tue, 13 May 2025 21:39:07 +0200 Subject: Fix tx-report-queue sharing #7 --- README.md | 34 +++++++ deps.edn | 54 +++++------ src/com/github/ivarref/yoltq.clj | 86 ++++++++++++++--- src/com/github/ivarref/yoltq/report_queue.clj | 133 ++++++++++++++++++++++++-- test/com/github/ivarref/yoltq/log_init.clj | 2 + 5 files changed, 258 insertions(+), 51 deletions(-) (limited to 'src') diff --git a/README.md b/README.md index c5f2bdb..f84a336 100644 --- a/README.md +++ b/README.md @@ -434,6 +434,40 @@ If you liked this library, you may also like: ## Change log +#### 2025-05-13 v0.2.?? [diff](https://github.com/ivarref/yoltq/compare/v0.2.64...HEAD) +Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will +then not grab the datomic report queue, but use the one provided: + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(yq/init! {:conn conn + :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq) + ; ^^ can be any `java.util.concurrent.BlockingQueue` value + }) + +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id)) + +``` + +Added multicast support for `datomic.api/tx-report-queue`: +```clojure +(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1)) +; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` + +(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +``` + +`yq/get-tx-report-queue-multicast!` returns, like +`datomic.api/tx-report-queue`, +`java.util.concurrent.BlockingQueue` and starts a background thread that does +the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!` +returns the same `BlockingQueue`. + +Changed the default for `max-retries` from `10000` to `9223372036854775807`. + +Fixed reflection warnings. + #### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64) Added support for `max-retries` being `0`, meaning the job should be retried forever (or at least 9223372036854775807 times). diff --git a/deps.edn b/deps.edn index e36885e..1e3fa9d 100644 --- a/deps.edn +++ b/deps.edn @@ -1,33 +1,31 @@ -{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} - org.clojure/tools.logging {:mvn/version "1.2.4"} - org.clojure/clojure {:mvn/version "1.11.1"}} +{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"} + com.datomic/peer {:mvn/version "1.0.7364"}} - :paths ["src"] + :paths ["src"] - :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} - :test {:extra-paths ["test"] - :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.2.1"} - com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} - clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} - com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} - org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} - com.taoensso/nippy {:mvn/version "3.2.0"} - io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} - :jvm-opts ["-DDISABLE_SPY=true" - "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] - :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + :aliases {:test {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :exec-fn cognitect.test-runner.api/test + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} - :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" - :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} - :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" + :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} + :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} - :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} - :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} - :exec-fn deps-deploy.deps-deploy/deploy - :exec-args {:installer :remote - :sign-releases? false - :artifact "target/out.jar"}}} - - :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}} + :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} + :exec-fn deps-deploy.deps-deploy/deploy + :exec-args {:installer :remote + :sign-releases? false + :artifact "target/out.jar"}}}} \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index a7dcddf..32298b7 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -12,7 +12,7 @@ (:import (datomic Connection) (java.lang.management ManagementFactory) (java.time Duration Instant ZoneOffset ZonedDateTime) - (java.util.concurrent ExecutorService Executors ScheduledExecutorService TimeUnit))) + (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit))) (defonce ^:dynamic *config* (atom nil)) (defonce threadpool (atom nil)) @@ -26,7 +26,7 @@ ; If you want no limit on the number of retries, specify ; the value `0`. That will set the effective retry limit to ; 9223372036854775807 times. - :max-retries 10000 + :max-retries 9223372036854775807 ; Minimum amount of time to wait before a failed queue job is retried :error-backoff-time (Duration/ofSeconds 5) @@ -86,6 +86,9 @@ (defn init! [{:keys [conn tx-report-queue] :as cfg}] (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (when (some? tx-report-queue) + (assert (instance? BlockingQueue tx-report-queue) + (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue"))) (locking threadpool @(d/transact conn i/schema) (let [new-cfg (swap! *config* @@ -96,9 +99,6 @@ :system-error (atom {}) :healthy? (atom nil) :slow? (atom nil) - :get-tx-report-queue (fn [] - (or tx-report-queue - (d/tx-report-queue conn))) :slow-thread-watcher-done? (promise)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) @@ -144,12 +144,37 @@ (reset! *running?* true) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) - (.execute ^ScheduledExecutorService pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.execute ^ScheduledExecutorService pool + (fn [] + (try + (log/debug "report-queue-listener starting") + (rq/report-queue-listener *running?* queue-listener-ready pool *config*) + (finally + (log/debug "report-queue-listener exiting") + (deliver queue-listener-ready :finally))))) (future (try (slow-executor/show-slow-threads pool *config*) (finally (deliver slow-thread-watcher-done? :done)))) - @queue-listener-ready))) + (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)] + (cond (= :timeout q-listener-retval) + (do + (log/error "Timed out waiting for report-queue-listener to start") + (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start"))) + + (= :finally q-listener-retval) + (do + (log/error "report-queue-listener did not start") + (throw (IllegalStateException. "report-queue-listener did not start"))) + + (= :ready q-listener-retval) + (do + (log/debug "report-queue-listener is ready")) + + :else + (do + (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))) + (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))))))))) (defn start! [] @@ -359,14 +384,13 @@ :min (apply min values))}))) (into (sorted-map))))) +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + The multicaster is started on demand. `conn` and `id` identifies the consumer. - -(defn add-tx-report-queue! - ([conn] - (add-tx-report-queue! conn :default)) - ([conn id] - (if @*config* - :...))) + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + [conn id] + (rq/get-tx-report-queue-multicast! conn id)) (comment (do @@ -401,3 +425,37 @@ @started-consuming? (stop!) nil))))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq.migrate"} :warn] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true))) + (log/info "begin start! ...") + (start!) + (log/info "begin start! ... Done") + (Thread/sleep 2000) + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop!) + (log/info "stop! done") + nil)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 20e0a93..9cddc93 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -3,8 +3,8 @@ [com.github.ivarref.yoltq.impl :as i] [datomic.api :as d] [clojure.tools.logging :as log]) - (:import (datomic Datom) - (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit))) + (:import (datomic Connection Datom) + (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) (defn process-poll-result! [cfg id-ident poll-result consumer] @@ -28,18 +28,24 @@ (i/take! cfg) (i/execute! cfg))))) (catch Throwable t - (log/error t "unexpected error in process-poll-result!"))))))))) + (log/error t "Unexpected error in process-poll-result!"))))))))) (defn report-queue-listener [running? ready? ^ScheduledExecutorService pool config-atom] - (let [conn (:conn @config-atom) - ^BlockingQueue q (d/tx-report-queue conn) + (let [cfg @config-atom + conn (:conn cfg) + tx-report-queue-given (contains? cfg :tx-report-queue) + ^BlockingQueue q (if tx-report-queue-given + (get cfg :tx-report-queue) + (d/tx-report-queue conn)) id-ident (d/q '[:find ?e . :where [?e :db/ident :com.github.ivarref.yoltq/id]] (d/db conn))] + (assert (instance? BlockingQueue q)) + (log/debug "tx-report-queue-given:" tx-report-queue-given) (try (while @running? (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] @@ -49,9 +55,118 @@ (fn [f] (when @running? (.execute ^ScheduledExecutorService pool f))))) - (deliver ready? true)) + (deliver ready? :ready)) (catch Throwable t - (log/error t "unexpected error in report-queue-listener")) + (log/error t "Unexpected error in report-queue-listener:" (.getMessage t))) (finally - (log/debug "remove tx-report-queue") - (d/remove-tx-report-queue conn))))) \ No newline at end of file + (if tx-report-queue-given + (log/debug "Remove tx-report-queue handled elsewhere") + (do + (log/debug "Remove tx-report-queue") + (d/remove-tx-report-queue conn))))))) + +(defonce ^:private multicast-state-lock (Object.)) + +(defonce ^:private multicast-state (atom {})) + +(defn- start-multicaster! [conn] + (let [multicaster-ready? (promise)] + (future + (log/debug "Multicaster starting for conn" conn) + (try + (let [input-queue (d/tx-report-queue conn)] + (loop [] + (when-let [mcast-state (get @multicast-state conn)] + (when-let [dest-queues (vals mcast-state)] + (let [element (.poll ^BlockingQueue input-queue 1 TimeUnit/SECONDS)] + (deliver multicaster-ready? :ready) + (when (some? element) + (doseq [q dest-queues] + (let [ok-offer (.offer ^BlockingQueue q element 30 TimeUnit/MINUTES)] + (when (false? ok-offer) + (log/error "Failed to offer item in multicaster for connection" conn)))))) + (recur))))) + (catch Throwable t + (deliver multicaster-ready? :error) + (log/error t "Unexpected error in multicaster:" (.getMessage t))) + (finally + (d/remove-tx-report-queue conn) + (log/debug "Multicaster exiting for conn" conn)))) + multicaster-ready?)) + +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + The multicaster is started on demand. `conn` and `id` identifies the consumer. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + [conn id] + (assert (instance? Connection conn)) + (assert (keyword? id)) + (locking multicast-state-lock + (assert (map? @multicast-state)) + (if-let [existing-q (get-in @multicast-state [conn id])] + (do + (log/debug "returning existing queue for id" id) + (assert (instance? BlockingQueue existing-q)) + existing-q) + (let [needs-multicaster? (not (contains? @multicast-state conn)) + new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [conn id] (LinkedBlockingQueue.))))] + (when needs-multicaster? + (let [multicaster-promise (start-multicaster! conn) + multicaster-result (deref multicaster-promise (* 30 60000) :timeout)] + (cond (= multicaster-result :timeout) + (do + (log/error "Timeout waiting for multicaster to start") + (throw (RuntimeException. "Timeout waiting for multicaster to start"))) + (= multicaster-result :error) + (do + (log/error "Multicaster failed to start") + (throw (RuntimeException. "Multicaster failed to start"))) + (= multicaster-result :ready) + (log/debug "Multicaster is ready") + + :else + (do + (log/error "Unexpected state from multicaster:" multicaster-result) + (throw (RuntimeException. (str "Unexpected state from multicaster: " multicaster-result))))))) + (let [new-q (get-in new-state [conn id])] + (assert (instance? BlockingQueue new-q)) + new-q))))) + +(defn stop-all-multicasters! [] + (reset! multicast-state {})) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) + +(comment + (defn drain! [^BlockingQueue q] + (loop [cnt 0] + (if (nil? (.poll q 1 TimeUnit/SECONDS)) + cnt + (recur (inc cnt)))))) + +(comment + (let [q-1 (get-tx-report-queue-multicast! conn :q1) + q-2 (get-tx-report-queue-multicast! conn :q2)])) + +(comment + (drain! (get-tx-report-queue-multicast! conn :q1))) + +(comment + (do + @(d/transact conn [{:db/doc "demo"}]) + :yay)) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj index 1aa6c02..f3fb6dc 100644 --- a/test/com/github/ivarref/yoltq/log_init.clj +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -3,6 +3,8 @@ [taoensso.timbre :as timbre] [clojure.string :as str])) +(set! *warn-on-reflection* true) + (def level-colors {;:warn colors/red :error colors/red}) -- cgit v1.2.3 From 4797e559410bce644c40b05fa9a321171a781e78 Mon Sep 17 00:00:00 2001 From: ire Date: Tue, 20 May 2025 22:43:39 +0200 Subject: Improve tx-report-queue sharing #7 --- src/com/github/ivarref/yoltq/report_queue.clj | 342 +++++++++++++++++++++----- test/com/github/ivarref/yoltq/log_init.clj | 2 +- 2 files changed, 283 insertions(+), 61 deletions(-) (limited to 'src') diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 9cddc93..239de12 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -6,7 +6,6 @@ (:import (datomic Connection Datom) (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) - (defn process-poll-result! [cfg id-ident poll-result consumer] (let [{:keys [tx-data db-after]} poll-result] (when-let [new-ids (->> tx-data @@ -30,7 +29,6 @@ (catch Throwable t (log/error t "Unexpected error in process-poll-result!"))))))))) - (defn report-queue-listener [running? ready? ^ScheduledExecutorService pool @@ -65,80 +63,253 @@ (log/debug "Remove tx-report-queue") (d/remove-tx-report-queue conn))))))) -(defonce ^:private multicast-state-lock (Object.)) +; https://stackoverflow.com/a/14488425 +(defn- dissoc-in + "Dissociates an entry from a nested associative structure returning a new + nested structure. keys is a sequence of keys. Any empty maps that result + will not be present in the new structure." + [m [k & ks :as keys]] + (if ks + (if-let [nextmap (get m k)] + (let [newmap (dissoc-in nextmap ks)] + (if (seq newmap) + (assoc m k newmap) + (dissoc m k))) + m) + (dissoc m k))) + +(defn- queues-to-shutdown [old-state new-state] + (assert (map? old-state)) + (assert (map? new-state)) + (doseq [x (vals new-state)] + (assert (vector? x))) + (doseq [x (vals old-state)] + (assert (vector? x))) + (let [new-qs (into #{} (mapv second (vals new-state)))] + (reduce + (fn [o [send-end-token? old-q]] + ;(assert (boolean? send-end-token?)) + ;(assert (instance? BlockingQueue old-q)) + (if (contains? new-qs old-q) + o + (conj o [send-end-token? old-q]))) + [] + (vals old-state)))) + +(comment + (queues-to-shutdown {:a [true 999] :b [false 777]} + {:a [true 123] :b [true 777]})) +(defn- multicast-once [conn work-item old-state new-state] + (assert (map? old-state)) + (assert (map? new-state)) + (doseq [[send-end-token? q-to-shutdown] (queues-to-shutdown old-state new-state)] + (if send-end-token? + (do + #_(log/debug "offering :end token") + (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)) + (do + #_(log/debug "not offering :end token")))) + (when (seq new-state) + (if (some? work-item) + (reduce-kv + (fn [m id [send-end-token? q]] + (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)] + (if (true? ok-offer) + (assoc m id [send-end-token? q]) + (log/warn "Failed to offer item in multicaster for connection" conn "and queue id" id)))) + {} + new-state) + new-state))) + +(defonce ^:private multicast-state-lock (Object.)) +(defonce ^:private consumer-state-lock (Object.)) (defonce ^:private multicast-state (atom {})) +(defonce ^:private thread-count (atom 0)) + +(defn- multicaster-loop [init-state conn ready?] + (let [input-queue (d/tx-report-queue conn)] + (deliver ready? true) + (loop [old-state init-state] + (let [work-item (.poll ^BlockingQueue input-queue 16 TimeUnit/MILLISECONDS) + new-state (locking multicast-state-lock + ; writer to `multicast-state` must be protected by `multicast-state-lock` + ; it should block minimally / spend minimum amount of time + (swap! multicast-state (fn [old-state] (update-in old-state [:iter-count conn] (fnil inc 0)))) + (if-let [new-state (multicast-once conn work-item old-state (get-in @multicast-state [:queues conn] {}))] + new-state + (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))) + (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec))) + (d/remove-tx-report-queue conn) + nil)))] + (if new-state + (recur new-state) + nil))))) (defn- start-multicaster! [conn] - (let [multicaster-ready? (promise)] + (let [ready? (promise)] (future (log/debug "Multicaster starting for conn" conn) (try - (let [input-queue (d/tx-report-queue conn)] - (loop [] - (when-let [mcast-state (get @multicast-state conn)] - (when-let [dest-queues (vals mcast-state)] - (let [element (.poll ^BlockingQueue input-queue 1 TimeUnit/SECONDS)] - (deliver multicaster-ready? :ready) - (when (some? element) - (doseq [q dest-queues] - (let [ok-offer (.offer ^BlockingQueue q element 30 TimeUnit/MINUTES)] - (when (false? ok-offer) - (log/error "Failed to offer item in multicaster for connection" conn)))))) - (recur))))) + (swap! thread-count inc) + (let [new-state (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] (fnil inc 0))))] + (assert (= 1 (get-in new-state [:thread-count conn]))) + ; "parent" thread holds `multicast-state-lock` and + ; waits for `ready?` promise, so effectively this new thread also holds + ; the lock until `ready?` is delivered. That is: it is safe + ; for this thread to modify multicast-state regardless of what other threads are doing + (multicaster-loop (get-in new-state [:queues conn]) conn ready?)) (catch Throwable t - (deliver multicaster-ready? :error) - (log/error t "Unexpected error in multicaster:" (.getMessage t))) + (log/error t "Unexpected error in multicaster:" (.getMessage t)) + (log/error "Multicaster exiting for conn")) (finally - (d/remove-tx-report-queue conn) + (swap! thread-count dec) (log/debug "Multicaster exiting for conn" conn)))) - multicaster-ready?)) + @ready?)) + +(defn- wait-multicast-thread-step [conn] + ; `get-tx-report-queue-multicast!` should return only when the multicaster thread + ; has picked up the new queue. + ; + ; Otherwise the following could happen: + ; 1. multicast thread is sleeping + ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` + ; 3: user-thread (or somebody else) calls `stop-multicaster`. + ; The multicast-state atom is now identical as it was in 1 + ; 4: multicast thread is scheduled and does _not_ detect any state change. + ; And therefore the multicast thread does _not_ send out an :end token as one would expect. + ; + ; Once [:iter-count conn] has changed, we know that the multicaster thread + ; will see the new queue. + ; We are still holding the consumer-state-lock, so no other thread + ; can do any stop-multicasting that would/could corrupt the state. + ; We can then be sure that the queue will receive the `:end` token when/if + ; the queue is stopped. + (let [start-ms (System/currentTimeMillis) + iter-count (get-in @multicast-state [:iter-count conn] -1)] + (loop [spin-count 0] + (if (not= iter-count (get-in @multicast-state [:iter-count conn])) + nil + (do + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count)))))))))) (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. The multicaster is started on demand. `conn` and `id` identifies the consumer. Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." - [conn id] - (assert (instance? Connection conn)) - (assert (keyword? id)) - (locking multicast-state-lock - (assert (map? @multicast-state)) - (if-let [existing-q (get-in @multicast-state [conn id])] - (do - (log/debug "returning existing queue for id" id) - (assert (instance? BlockingQueue existing-q)) - existing-q) - (let [needs-multicaster? (not (contains? @multicast-state conn)) - new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [conn id] (LinkedBlockingQueue.))))] - (when needs-multicaster? - (let [multicaster-promise (start-multicaster! conn) - multicaster-result (deref multicaster-promise (* 30 60000) :timeout)] - (cond (= multicaster-result :timeout) - (do - (log/error "Timeout waiting for multicaster to start") - (throw (RuntimeException. "Timeout waiting for multicaster to start"))) - (= multicaster-result :error) - (do - (log/error "Multicaster failed to start") - (throw (RuntimeException. "Multicaster failed to start"))) - (= multicaster-result :ready) - (log/debug "Multicaster is ready") + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (keyword? id)) + (locking consumer-state-lock + (let [the-q + (locking multicast-state-lock + (assert (map? @multicast-state)) + (if-let [existing-q (get-in @multicast-state [:queues conn id])] + (do + (swap! multicast-state + (fn [old-state] + (update-in old-state [:queues conn id] (fn [[end-token? q]] + (if (not= end-token? send-end-token?) + (log/debug "flipped `send-end-token?`") + (log/debug "identical `send-end-token?`")) + [send-end-token? q])))) + (log/debug "Returning existing queue for id" id) + (assert (instance? BlockingQueue (second existing-q))) + (second existing-q)) + (let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn])) + new-q (LinkedBlockingQueue.) + new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))] + (if needs-multicaster? + (do + (start-multicaster! conn) + (log/debug "Multicaster thread started. Returning new queue for id" id) + new-q) + (do + (log/debug "Multicaster thread already exists. Returning new queue for id" id) + new-q)))))] + ; wait for multicaster thread to pick up current Queue + (wait-multicast-thread-step conn) + the-q)))) - :else +(defn wait-multicast-threads-exit [[old-state new-state]] + (assert (map? old-state)) + (assert (map? new-state)) + (assert (map? (get old-state :queues {}))) + (assert (map? (get new-state :queues {}))) + (assert (map? (get old-state :thread-count {}))) + (assert (map? (get new-state :thread-count {}))) + (locking consumer-state-lock + ; No new multicast threads will be launched inside this block. + ; The lock is already held by parent function. + ; + ; Why do we need to _wait_ for multicaster thread(s) to exit after + ; removing all queue ids for a given connection? + ; Otherwise the following could happen: + ; 1. multicaster thread is sleeping + ; 2. user calls stop-multicaster! + ; One would expect that multicaster thread would exit, but it is still sleeping + ; 3. user calls get-tx-report-queue-multicast! with the same conn + ; The state is now empty, so a new multicaster thread is spawned. + ; 4. Now there is two multicaster threads for the same connection! + ; ... and since the datomic report queue can be shared between threads + ; it will seemingly work, but when the end event is sent, it will be + ; sent by multiple threads. + (let [old-conns (into #{} (keys (get old-state :queues {}))) + new-conns (into #{} (keys (get new-state :queues {})))] + (doseq [old-conn old-conns] + (when-not (contains? new-conns old-conn) + (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)] + (assert (= 1 old-threadcount)) + (let [start-ms (System/currentTimeMillis)] + (loop [] + (if (= 0 (get-in @multicast-state [:thread-count old-conn])) + :ok (do - (log/error "Unexpected state from multicaster:" multicaster-result) - (throw (RuntimeException. (str "Unexpected state from multicaster: " multicaster-result))))))) - (let [new-q (get-in new-state [conn id])] - (assert (instance? BlockingQueue new-q)) - new-q))))) + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread to exit")) + (do + (Thread/sleep 16) + (recur)))))))))))))) + +(defn stop-multicaster-id! [conn id] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (swap-vals! multicast-state (fn [old-state] + (let [new-state (dissoc-in old-state [:queues conn id])] + (if (= {} (get-in new-state [:queues conn])) + (dissoc-in old-state [:queues conn]) + new-state)))))))) + +(defn stop-multicaster! [conn] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))))))) (defn stop-all-multicasters! [] - (reset! multicast-state {})) + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {}))))))) (comment (do (require 'com.github.ivarref.yoltq.log-init) + (defn drain! [^BlockingQueue q] + (loop [items []] + (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)] + (recur (conj items elem)) + items))) (com.github.ivarref.yoltq.log-init/init-logging! [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] [#{"com.github.ivarref.yoltq.report-queue"} :debug] @@ -153,20 +324,71 @@ conn)))) (comment - (defn drain! [^BlockingQueue q] - (loop [cnt 0] - (if (nil? (.poll q 1 TimeUnit/SECONDS)) - cnt - (recur (inc cnt)))))) + (do + (require 'com.github.ivarref.yoltq.log-init) + (defn drain! [^BlockingQueue q] + (loop [items []] + (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)] + (recur (conj items elem)) + items))) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (log/info "********************************") + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)) + (log/info "stop-all!") + (stop-all-multicasters!) + (assert (= 0 @thread-count)) + (let [q1 (get-tx-report-queue-multicast! conn :q1 false) + q2 (get-tx-report-queue-multicast! conn :q2 false) + _ (get-tx-report-queue-multicast! conn :q1 true)] + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + (log/info "begin drain q1") + (stop-multicaster-id! conn :q1) + (println "thread count" @thread-count) + (let [qitems-2 (drain! q2) + qitems-1 (drain! q1)] + (assert (= :end (last qitems-1))) + (println "drain count q1:" (count qitems-1)) + (println "drain count q2:" (count qitems-2)))))) + +(comment + (do + (let [q (get-tx-report-queue-multicast! conn :q1 true)] + (log/debug "stopping id :q1") + (stop-multicaster-id! conn :q1) + (let [drained (drain! q)] + (println "drained:" drained) + (assert (= [:end] drained))) + @multicast-state))) (comment - (let [q-1 (get-tx-report-queue-multicast! conn :q1) - q-2 (get-tx-report-queue-multicast! conn :q2)])) + (stop-all-multicasters!)) (comment - (drain! (get-tx-report-queue-multicast! conn :q1))) + (do + (let [q (get-tx-report-queue-multicast! conn :q2 false)] + (println "drain count:" (count (drain! q))) + @multicast-state + nil))) + +(comment + (get-tx-report-queue-multicast! conn :q1 false) + (get-tx-report-queue-multicast! conn :q1 true)) (comment (do + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) @(d/transact conn [{:db/doc "demo"}]) :yay)) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj index f3fb6dc..7eae557 100644 --- a/test/com/github/ivarref/yoltq/log_init.clj +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -48,7 +48,7 @@ (color-f (force msg_)) - #_maybe-stacktrace)))) + maybe-stacktrace)))) (catch Throwable t -- cgit v1.2.3 From aa0b3d0bd9e087c7e1e36e87cd6e10f9e2796449 Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 09:51:30 +0200 Subject: Doc rationale for waiting for multicaster thread. Handle :end token in report-queue-listener #7 --- src/com/github/ivarref/yoltq/report_queue.clj | 91 +++++++++++++++------------ 1 file changed, 51 insertions(+), 40 deletions(-) (limited to 'src') diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 239de12..2a2e489 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -45,15 +45,20 @@ (assert (instance? BlockingQueue q)) (log/debug "tx-report-queue-given:" tx-report-queue-given) (try - (while @running? - (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] - (process-poll-result! @config-atom - id-ident - poll-result - (fn [f] - (when @running? - (.execute ^ScheduledExecutorService pool f))))) - (deliver ready? :ready)) + (let [running-local? (atom true)] + (while (and @running? @running-local?) + (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] + (if (= poll-result :end) + (do + (reset! running-local? false) + #_(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + (process-poll-result! @config-atom + id-ident + poll-result + (fn [f] + (when @running? + (.execute ^ScheduledExecutorService pool f)))))) + (deliver ready? :ready))) (catch Throwable t (log/error t "Unexpected error in report-queue-listener:" (.getMessage t))) (finally @@ -128,6 +133,7 @@ (defonce ^:private thread-count (atom 0)) (defn- multicaster-loop [init-state conn ready?] + (assert (instance? Connection conn)) (let [input-queue (d/tx-report-queue conn)] (deliver ready? true) (loop [old-state init-state] @@ -147,6 +153,7 @@ nil))))) (defn- start-multicaster! [conn] + (assert (instance? Connection conn)) (let [ready? (promise)] (future (log/debug "Multicaster starting for conn" conn) @@ -165,38 +172,42 @@ (finally (swap! thread-count dec) (log/debug "Multicaster exiting for conn" conn)))) - @ready?)) + (when (= :timeout (deref ready? 30000 :timeout)) + (throw (RuntimeException. "Timed out waiting for multicaster to start"))))) -(defn- wait-multicast-thread-step [conn] - ; `get-tx-report-queue-multicast!` should return only when the multicaster thread - ; has picked up the new queue. - ; - ; Otherwise the following could happen: - ; 1. multicast thread is sleeping - ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` - ; 3: user-thread (or somebody else) calls `stop-multicaster`. - ; The multicast-state atom is now identical as it was in 1 - ; 4: multicast thread is scheduled and does _not_ detect any state change. - ; And therefore the multicast thread does _not_ send out an :end token as one would expect. - ; - ; Once [:iter-count conn] has changed, we know that the multicaster thread - ; will see the new queue. - ; We are still holding the consumer-state-lock, so no other thread - ; can do any stop-multicasting that would/could corrupt the state. - ; We can then be sure that the queue will receive the `:end` token when/if - ; the queue is stopped. - (let [start-ms (System/currentTimeMillis) - iter-count (get-in @multicast-state [:iter-count conn] -1)] - (loop [spin-count 0] - (if (not= iter-count (get-in @multicast-state [:iter-count conn])) - nil - (do - (let [spent-ms (- (System/currentTimeMillis) start-ms)] - (if (> spent-ms 30000) - (throw (RuntimeException. "Timed out waiting for multicaster thread")) - (do - (Thread/sleep 16) - (recur (inc spin-count)))))))))) +(defn- wait-multicast-thread-step [conn]) +; `get-tx-report-queue-multicast!` should return only when the multicaster thread +; has picked up the new queue. +; +; Otherwise the following could happen: +; 1. multicast thread is sleeping +; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` +; 3: user-thread (or somebody else) calls `stop-multicaster`. +; The multicast-state atom is now identical as it was in step 1. +; , Step 2 and 3 happened while the multicast thread was sleeping. +; 4: The multicast thread is scheduled and does _not_ detect any state change. +; Therefore the multicast thread does _not_ send out an :end token as one would expect. +; +; The new queue is written to memory at this point. No other thread can remove it because +; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. +; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, +; we can be sure that no other thread changes or has changed the state. +; +; Once [:iter-count conn] has changed, we know that the multicaster thread +; will see the new queue. This means that we can be sure that the queue +; will receive the `:end` token if the queue is stopped. +(let [start-ms (System/currentTimeMillis) + iter-count (get-in @multicast-state [:iter-count conn] -1)] + (loop [spin-count 0] + (if (not= iter-count (get-in @multicast-state [:iter-count conn])) + nil + (do + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count))))))))) (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. -- cgit v1.2.3 From ccfe353aebe8c22429fdaf76a5e0bf34cefca955 Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 12:04:15 +0200 Subject: Small fixes #7 --- src/com/github/ivarref/yoltq.clj | 51 ++++++++++++++-- src/com/github/ivarref/yoltq/report_queue.clj | 86 +++++++++++++-------------- 2 files changed, 88 insertions(+), 49 deletions(-) (limited to 'src') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 32298b7..80c9491 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -352,8 +352,8 @@ :p95 ... :p99 ...}}" [{:keys [age-days queue-name now db duration->long] - :or {age-days 30 - now (ZonedDateTime/now ZoneOffset/UTC) + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC) duration->long (fn [duration] (.toSeconds ^Duration duration))}}] (let [{:keys [conn]} @*config* db (or db (d/db conn)) @@ -386,11 +386,50 @@ (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. - The multicaster is started on demand. `conn` and `id` identifies the consumer. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. The default value is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (boolean? send-end-token?)) + (rq/get-tx-report-queue-multicast! conn id send-end-token?))) + +(defn stop-multicaster-id! + "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. + If this is the last report destination for the given `conn`, the multicaster thread will exit. + Repeated calls are no-op. - Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + Returns nil." [conn id] - (rq/get-tx-report-queue-multicast! conn id)) + (assert (instance? Connection conn)) + (rq/stop-multicaster-id! conn id)) + +(defn stop-multicaster! + "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. + The multicaster thread will exit. + Repeated calls are no-op. + + Returns nil." + [conn] + (assert (instance? Connection conn)) + (rq/stop-multicaster! conn)) + +(defn stop-all-multicasters! + "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`. + All multicaster threads will exit. + Repeated calls are no-op. + + Returns nil." + [] + (rq/stop-all-multicasters!)) (comment (do @@ -446,7 +485,7 @@ started-consuming? (promise) n 1] (init! {:conn conn - :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true))) diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 2a2e489..a9f7e07 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -6,6 +6,8 @@ (:import (datomic Connection Datom) (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) +; Private API, subject to change! + (defn process-poll-result! [cfg id-ident poll-result consumer] (let [{:keys [tx-data db-after]} poll-result] (when-let [new-ids (->> tx-data @@ -50,8 +52,9 @@ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] (if (= poll-result :end) (do - (reset! running-local? false) - #_(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + (log/debug "Report queue listener received :end token. Exiting") + (reset! running-local? false)) + ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) (process-poll-result! @config-atom id-ident poll-result @@ -175,7 +178,8 @@ (when (= :timeout (deref ready? 30000 :timeout)) (throw (RuntimeException. "Timed out waiting for multicaster to start"))))) -(defn- wait-multicast-thread-step [conn]) +(defn- wait-multicast-thread-step + [conn] ; `get-tx-report-queue-multicast!` should return only when the multicaster thread ; has picked up the new queue. ; @@ -196,29 +200,34 @@ ; Once [:iter-count conn] has changed, we know that the multicaster thread ; will see the new queue. This means that we can be sure that the queue ; will receive the `:end` token if the queue is stopped. -(let [start-ms (System/currentTimeMillis) - iter-count (get-in @multicast-state [:iter-count conn] -1)] - (loop [spin-count 0] - (if (not= iter-count (get-in @multicast-state [:iter-count conn])) - nil - (do - (let [spent-ms (- (System/currentTimeMillis) start-ms)] - (if (> spent-ms 30000) - (throw (RuntimeException. "Timed out waiting for multicaster thread")) - (do - (Thread/sleep 16) - (recur (inc spin-count))))))))) + (let [start-ms (System/currentTimeMillis) + iter-count (get-in @multicast-state [:iter-count conn] -1)] + (loop [spin-count 0] + (if (not= iter-count (get-in @multicast-state [:iter-count conn])) + nil + (do + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count)))))))))) (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. - The multicaster is started on demand. `conn` and `id` identifies the consumer. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. The default value is `false`. - Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." ([conn id] (get-tx-report-queue-multicast! conn id false)) ([conn id send-end-token?] (assert (instance? Connection conn)) - (assert (keyword? id)) (locking consumer-state-lock (let [the-q (locking multicast-state-lock @@ -250,7 +259,7 @@ (wait-multicast-thread-step conn) the-q)))) -(defn wait-multicast-threads-exit [[old-state new-state]] +(defn- wait-multicast-threads-exit [[old-state new-state]] (assert (map? old-state)) (assert (map? new-state)) (assert (map? (get old-state :queues {}))) @@ -275,6 +284,12 @@ ; sent by multiple threads. (let [old-conns (into #{} (keys (get old-state :queues {}))) new-conns (into #{} (keys (get new-state :queues {})))] + (assert (every? + (fn [x] (instance? Connection x)) + old-conns)) + (assert (every? + (fn [x] (instance? Connection x)) + new-conns)) (doseq [old-conn old-conns] (when-not (contains? new-conns old-conn) (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)] @@ -292,6 +307,7 @@ (recur)))))))))))))) (defn stop-multicaster-id! [conn id] + (assert (instance? Connection conn)) (locking consumer-state-lock (wait-multicast-threads-exit (locking multicast-state-lock @@ -299,40 +315,23 @@ (let [new-state (dissoc-in old-state [:queues conn id])] (if (= {} (get-in new-state [:queues conn])) (dissoc-in old-state [:queues conn]) - new-state)))))))) + new-state))))))) + nil) (defn stop-multicaster! [conn] + (assert (instance? Connection conn)) (locking consumer-state-lock (wait-multicast-threads-exit (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))))))) + (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))))) + nil) (defn stop-all-multicasters! [] (locking consumer-state-lock (wait-multicast-threads-exit (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {}))))))) - -(comment - (do - (require 'com.github.ivarref.yoltq.log-init) - (defn drain! [^BlockingQueue q] - (loop [items []] - (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)] - (recur (conj items elem)) - items))) - (com.github.ivarref.yoltq.log-init/init-logging! - [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] - [#{"com.github.ivarref.yoltq.report-queue"} :debug] - [#{"com.github.ivarref.yoltq.poller"} :info] - [#{"com.github.ivarref.yoltq"} :debug] - ;[#{"ivarref.yoltq*"} :info] - [#{"*"} :info]]) - (defonce conn (let [uri (str "datomic:mem://demo") - _ (d/delete-database uri) - _ (d/create-database uri) - conn (d/connect uri)] - conn)))) + (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))))) + nil) (comment (do @@ -366,6 +365,7 @@ @(d/transact conn [{:db/doc "demo"}]) (log/info "begin drain q1") (stop-multicaster-id! conn :q1) + (stop-multicaster-id! conn :q1) (println "thread count" @thread-count) (let [qitems-2 (drain! q2) qitems-1 (drain! q1)] -- cgit v1.2.3 From a1e4e1b96fd254ec7d7e467648dd5e88f1c9530b Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 13:38:51 +0200 Subject: Doc. Return true/false if queues were stopped or not #7 --- README.md | 37 ++++++- deps.edn | 12 +++ src/com/github/ivarref/yoltq.clj | 23 +++- src/com/github/ivarref/yoltq/report_queue.clj | 146 ++++++++++++++++++-------- 4 files changed, 163 insertions(+), 55 deletions(-) (limited to 'src') diff --git a/README.md b/README.md index f84a336..675fa9a 100644 --- a/README.md +++ b/README.md @@ -441,21 +441,48 @@ then not grab the datomic report queue, but use the one provided: ```clojure (require '[com.github.ivarref.yoltq :as yq]) (yq/init! {:conn conn - :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq) + :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq) ; ^^ can be any `java.util.concurrent.BlockingQueue` value }) -(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id)) +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id)) ``` Added multicast support for `datomic.api/tx-report-queue`: ```clojure -(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1)) +(require '[com.github.ivarref.yoltq :as yq]) +(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1)) ; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` -(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2)) -; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` for the given `conn` + +(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true)) +; my-q3 specifies the third argument, `send-end-token?`, to true, so it will receive `:end` if the queue is stopped. +; This can enable simpler consuming of queues: +(future + (loop [] + (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)] + (if (= q-item :end) + (println "Time to exit. Goodbye!") + (do + (println "Processing q-item" q-item) + (recur)))))) + +@(d/transact conn [{:db/doc "new-data"}]) + +; Stop the queue: +(yq/stop-multicaster-id! conn :q-id-3) +=> true +; The multicaster thread will send `:end` and the consumer thread will then print "Time to exit. Goodbye!". + +; if the queue is already stopped (or never was started), `stop-multicaster...` functions will return false: +(yq/stop-multicaster-id! conn :already-stopped-queue-or-typo) +=> false + +; Stop all queues for all connections: +(yq/stop-all-multicasters!) ``` `yq/get-tx-report-queue-multicast!` returns, like diff --git a/deps.edn b/deps.edn index 1e3fa9d..a328c86 100644 --- a/deps.edn +++ b/deps.edn @@ -22,6 +22,18 @@ :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :repl {:extra-paths ["test"] + :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} + ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"}} + :exec-fn rebel-readline.tool/repl + :exec-args {} + :main-opts ["-m" "rebel-readline.main"]} + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 80c9491..298b9d5 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -407,7 +407,11 @@ If this is the last report destination for the given `conn`, the multicaster thread will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if the queue was stopped. + Return `false` if the queue does not exist." [conn id] (assert (instance? Connection conn)) (rq/stop-multicaster-id! conn id)) @@ -417,7 +421,11 @@ The multicaster thread will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue belonging to `conn` was stopped. + Returns `false` is `conn` did not have any associated queues." [conn] (assert (instance? Connection conn)) (rq/stop-multicaster! conn)) @@ -427,7 +435,11 @@ All multicaster threads will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue was stopped. + Returns `false` if no queues existed." [] (rq/stop-all-multicasters!)) @@ -485,7 +497,7 @@ started-consuming? (promise) n 1] (init! {:conn conn - :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true))) @@ -493,8 +505,11 @@ (start!) (log/info "begin start! ... Done") (Thread/sleep 2000) + (log/info "*******************************************") @(d/transact conn [(put :q {:work 123})]) @started-consuming? + (stop-multicaster! conn) + (log/info "*******************************************") (stop!) (log/info "stop! done") nil)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index a9f7e07..c3fd383 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -52,9 +52,9 @@ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] (if (= poll-result :end) (do - (log/debug "Report queue listener received :end token. Exiting") + (log/debug "report-queue-listener received :end token. Exiting") (reset! running-local? false)) - ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) (process-poll-result! @config-atom id-ident poll-result @@ -115,9 +115,11 @@ (if send-end-token? (do #_(log/debug "offering :end token") - (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)) + (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS) + (log/debug "Multicaster sent :end token") + (log/debug "Multicaster failed to send :end token"))) (do - #_(log/debug "not offering :end token")))) + (log/debug "Multicaster not sending :end token")))) (when (seq new-state) (if (some? work-item) (reduce-kv @@ -125,7 +127,7 @@ (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)] (if (true? ok-offer) (assoc m id [send-end-token? q]) - (log/warn "Failed to offer item in multicaster for connection" conn "and queue id" id)))) + (log/error "Multicaster failed to offer item for connection" conn "and queue id" id)))) {} new-state) new-state))) @@ -150,6 +152,7 @@ (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))) (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec))) (d/remove-tx-report-queue conn) + (log/debug "Multicaster removed tx-report-queue for conn" conn) nil)))] (if new-state (recur new-state) @@ -180,26 +183,26 @@ (defn- wait-multicast-thread-step [conn] -; `get-tx-report-queue-multicast!` should return only when the multicaster thread -; has picked up the new queue. -; -; Otherwise the following could happen: -; 1. multicast thread is sleeping -; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` -; 3: user-thread (or somebody else) calls `stop-multicaster`. -; The multicast-state atom is now identical as it was in step 1. -; , Step 2 and 3 happened while the multicast thread was sleeping. -; 4: The multicast thread is scheduled and does _not_ detect any state change. -; Therefore the multicast thread does _not_ send out an :end token as one would expect. -; -; The new queue is written to memory at this point. No other thread can remove it because -; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. -; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, -; we can be sure that no other thread changes or has changed the state. -; -; Once [:iter-count conn] has changed, we know that the multicaster thread -; will see the new queue. This means that we can be sure that the queue -; will receive the `:end` token if the queue is stopped. + ; `get-tx-report-queue-multicast!` should return only when the multicaster thread + ; has picked up the new queue. + ; + ; Otherwise the following could happen: + ; 1. multicast thread is sleeping + ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` + ; 3: user-thread (or somebody else) calls `stop-multicaster`. + ; The multicast-state atom is now identical as it was in step 1. + ; , Step 2 and 3 happened while the multicast thread was sleeping. + ; 4: The multicast thread is scheduled and does _not_ detect any state change. + ; Therefore the multicast thread does _not_ send out an :end token as one would expect. + ; + ; The new queue is written to memory at this point. No other thread can remove it because + ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. + ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, + ; we can be sure that no other thread changes or has changed the state. + ; + ; Once [:iter-count conn] has changed, we know that the multicaster thread + ; will see the new queue. This means that we can be sure that the queue + ; will receive the `:end` token if the queue is stopped. (let [start-ms (System/currentTimeMillis) iter-count (get-in @multicast-state [:iter-count conn] -1)] (loop [spin-count 0] @@ -250,10 +253,10 @@ (if needs-multicaster? (do (start-multicaster! conn) - (log/debug "Multicaster thread started. Returning new queue for id" id) + (log/debug "Returning new queue for id" id "(multicaster thread started)") new-q) (do - (log/debug "Multicaster thread already exists. Returning new queue for id" id) + (log/debug "Returning new queue for id" id "(multicaster thread already running)") new-q)))))] ; wait for multicaster thread to pick up current Queue (wait-multicast-thread-step conn) @@ -306,32 +309,83 @@ (Thread/sleep 16) (recur)))))))))))))) +(defn- all-queues [state] + (->> (mapcat (fn [[conn qmap]] + (mapv (fn [q-id] [conn q-id]) + (keys qmap))) + (seq (get state :queues {}))) + (into #{}))) + +(comment + (do + (assert (= #{} + (all-queues {}))) + (assert (= #{} + (all-queues {:queues {}}))) + (assert (= #{[:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1}}}))) + (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}}))) + (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2} + :conn-b {:q-id-3 3}}}))))) + +(defn- removed-queues? [old new] + (not= (all-queues old) + (all-queues new))) + (defn stop-multicaster-id! [conn id] (assert (instance? Connection conn)) - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] - (let [new-state (dissoc-in old-state [:queues conn id])] - (if (= {} (get-in new-state [:queues conn])) - (dissoc-in old-state [:queues conn]) - new-state))))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] + (let [new-state (dissoc-in old-state [:queues conn id])] + (if (= {} (get-in new-state [:queues conn])) + (dissoc-in old-state [:queues conn]) + new-state))))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) (defn stop-multicaster! [conn] (assert (instance? Connection conn)) - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) (defn stop-all-multicasters! [] - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (require '[datomic.api :as d]) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) (comment (do -- cgit v1.2.3 From 4a3a9a5da6e7a8771eb3adb79172aca5ce8f26a6 Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 13:48:10 +0200 Subject: Misc #7 --- README.md | 4 ++-- src/com/github/ivarref/yoltq.clj | 4 ++-- src/com/github/ivarref/yoltq/report_queue.clj | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/README.md b/README.md index 675fa9a..dda9b47 100644 --- a/README.md +++ b/README.md @@ -473,12 +473,12 @@ Added multicast support for `datomic.api/tx-report-queue`: @(d/transact conn [{:db/doc "new-data"}]) ; Stop the queue: -(yq/stop-multicaster-id! conn :q-id-3) +(yq/stop-multicast-consumer-id! conn :q-id-3) => true ; The multicaster thread will send `:end` and the consumer thread will then print "Time to exit. Goodbye!". ; if the queue is already stopped (or never was started), `stop-multicaster...` functions will return false: -(yq/stop-multicaster-id! conn :already-stopped-queue-or-typo) +(yq/stop-multicast-consumer-id! conn :already-stopped-queue-or-typo) => false ; Stop all queues for all connections: diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 298b9d5..45f2051 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -402,7 +402,7 @@ (assert (boolean? send-end-token?)) (rq/get-tx-report-queue-multicast! conn id send-end-token?))) -(defn stop-multicaster-id! +(defn stop-multicast-consumer-id! "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. If this is the last report destination for the given `conn`, the multicaster thread will exit. Repeated calls are no-op. @@ -414,7 +414,7 @@ Return `false` if the queue does not exist." [conn id] (assert (instance? Connection conn)) - (rq/stop-multicaster-id! conn id)) + (rq/stop-multicast-consumer-id! conn id)) (defn stop-multicaster! "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index c3fd383..b3685b9 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -334,7 +334,7 @@ (not= (all-queues old) (all-queues new))) -(defn stop-multicaster-id! [conn id] +(defn stop-multicast-consumer-id! [conn id] (assert (instance? Connection conn)) (let [did-remove? (atom nil)] (locking consumer-state-lock -- cgit v1.2.3 From 22ca1bb29111f9a0246c54e7f81806794198c25f Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 13:59:55 +0200 Subject: Doc #7 --- README.md | 3 +++ src/com/github/ivarref/yoltq.clj | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/README.md b/README.md index 836ed49..0635d5f 100644 --- a/README.md +++ b/README.md @@ -471,6 +471,9 @@ Added multicast support for `datomic.api/tx-report-queue`: (println "Processing q-item" q-item) (recur)))))) +; The default value for `send-end-token?` is `false`, i.e. the behaviour will be +; identical to that of datomic.api/tx-report-queue. + @(d/transact conn [{:db/doc "new-data"}]) ; Stop the queue: diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 45f2051..0f63e25 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -390,7 +390,8 @@ Repeated calls using the same `conn` and `id` returns the same queue. The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread - to send `:end` if the queue is stopped. The default value is `false`. + to send `:end` if the queue is stopped. + The default value for `send-end-token?` is `false`. A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. -- cgit v1.2.3 From 28cd44d2bc25dbb3651d33bc80efe4173e0479f5 Mon Sep 17 00:00:00 2001 From: ire Date: Mon, 26 May 2025 12:58:34 +0200 Subject: Be paranoid #7 --- src/com/github/ivarref/yoltq/report_queue.clj | 54 +++++++++++++-------------- 1 file changed, 27 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index b3685b9..f83e3ba 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -182,7 +182,7 @@ (throw (RuntimeException. "Timed out waiting for multicaster to start"))))) (defn- wait-multicast-thread-step - [conn] + [conn state] ; `get-tx-report-queue-multicast!` should return only when the multicaster thread ; has picked up the new queue. ; @@ -201,20 +201,20 @@ ; we can be sure that no other thread changes or has changed the state. ; ; Once [:iter-count conn] has changed, we know that the multicaster thread - ; will see the new queue. This means that we can be sure that the queue + ; has seen the new queue. This means that we can be sure that the queue ; will receive the `:end` token if the queue is stopped. (let [start-ms (System/currentTimeMillis) - iter-count (get-in @multicast-state [:iter-count conn] -1)] + iter-count (get-in state [:iter-count conn] -1)] (loop [spin-count 0] - (if (not= iter-count (get-in @multicast-state [:iter-count conn])) + (if (not= iter-count (locking multicast-state-lock + (get-in @multicast-state [:iter-count conn] -1))) nil - (do - (let [spent-ms (- (System/currentTimeMillis) start-ms)] - (if (> spent-ms 30000) - (throw (RuntimeException. "Timed out waiting for multicaster thread")) - (do - (Thread/sleep 16) - (recur (inc spin-count)))))))))) + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count))))))))) (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. @@ -232,21 +232,21 @@ ([conn id send-end-token?] (assert (instance? Connection conn)) (locking consumer-state-lock - (let [the-q + (let [[new-state the-q] (locking multicast-state-lock (assert (map? @multicast-state)) (if-let [existing-q (get-in @multicast-state [:queues conn id])] (do - (swap! multicast-state - (fn [old-state] - (update-in old-state [:queues conn id] (fn [[end-token? q]] - (if (not= end-token? send-end-token?) - (log/debug "flipped `send-end-token?`") - (log/debug "identical `send-end-token?`")) - [send-end-token? q])))) - (log/debug "Returning existing queue for id" id) - (assert (instance? BlockingQueue (second existing-q))) - (second existing-q)) + (let [new-state (swap! multicast-state + (fn [old-state] + (update-in old-state [:queues conn id] (fn [[end-token? q]] + (if (not= end-token? send-end-token?) + (log/debug "flipped `send-end-token?`") + (log/debug "identical `send-end-token?`")) + [send-end-token? q]))))] + (log/debug "Returning existing queue for id" id) + (assert (instance? BlockingQueue (second existing-q))) + [new-state (second existing-q)])) (let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn])) new-q (LinkedBlockingQueue.) new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))] @@ -254,12 +254,12 @@ (do (start-multicaster! conn) (log/debug "Returning new queue for id" id "(multicaster thread started)") - new-q) + [new-state new-q]) (do (log/debug "Returning new queue for id" id "(multicaster thread already running)") - new-q)))))] + [new-state new-q])))))] ; wait for multicaster thread to pick up current Queue - (wait-multicast-thread-step conn) + (wait-multicast-thread-step conn new-state) the-q)))) (defn- wait-multicast-threads-exit [[old-state new-state]] @@ -418,8 +418,8 @@ @(d/transact conn [{:db/doc "demo"}]) @(d/transact conn [{:db/doc "demo"}]) (log/info "begin drain q1") - (stop-multicaster-id! conn :q1) - (stop-multicaster-id! conn :q1) + (stop-multicast-consumer-id! conn :q1) + (stop-multicast-consumer-id! conn :q1) (println "thread count" @thread-count) (let [qitems-2 (drain! q2) qitems-1 (drain! q1)] -- cgit v1.2.3