diff options
| author | ire <refsdal.ivar@gmail.com> | 2025-05-21 13:38:51 +0200 |
|---|---|---|
| committer | ire <refsdal.ivar@gmail.com> | 2025-05-21 13:38:51 +0200 |
| commit | a1e4e1b96fd254ec7d7e467648dd5e88f1c9530b (patch) | |
| tree | 001cfddbdc7bc4dabe540981afe2a2e4e32bda93 | |
| parent | Small fixes #7 (diff) | |
| download | fiinha-a1e4e1b96fd254ec7d7e467648dd5e88f1c9530b.tar.gz fiinha-a1e4e1b96fd254ec7d7e467648dd5e88f1c9530b.tar.xz | |
Doc. Return true/false if queues were stopped or not #7
| -rw-r--r-- | README.md | 37 | ||||
| -rw-r--r-- | deps.edn | 12 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 23 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/report_queue.clj | 146 |
4 files changed, 163 insertions, 55 deletions
@@ -441,21 +441,48 @@ then not grab the datomic report queue, but use the one provided: ```clojure (require '[com.github.ivarref.yoltq :as yq]) (yq/init! {:conn conn - :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq) + :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq) ; ^^ can be any `java.util.concurrent.BlockingQueue` value }) -(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id)) +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id)) ``` Added multicast support for `datomic.api/tx-report-queue`: ```clojure -(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1)) +(require '[com.github.ivarref.yoltq :as yq]) +(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1)) ; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` -(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2)) -; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` for the given `conn` + +(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true)) +; my-q3 specifies the third argument, `send-end-token?`, to true, so it will receive `:end` if the queue is stopped. +; This can enable simpler consuming of queues: +(future + (loop [] + (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)] + (if (= q-item :end) + (println "Time to exit. Goodbye!") + (do + (println "Processing q-item" q-item) + (recur)))))) + +@(d/transact conn [{:db/doc "new-data"}]) + +; Stop the queue: +(yq/stop-multicaster-id! conn :q-id-3) +=> true +; The multicaster thread will send `:end` and the consumer thread will then print "Time to exit. Goodbye!". + +; if the queue is already stopped (or never was started), `stop-multicaster...` functions will return false: +(yq/stop-multicaster-id! conn :already-stopped-queue-or-typo) +=> false + +; Stop all queues for all connections: +(yq/stop-all-multicasters!) ``` `yq/get-tx-report-queue-multicast!` returns, like @@ -22,6 +22,18 @@ :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :repl {:extra-paths ["test"] + :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} + ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"}} + :exec-fn rebel-readline.tool/repl + :exec-args {} + :main-opts ["-m" "rebel-readline.main"]} + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 80c9491..298b9d5 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -407,7 +407,11 @@ If this is the last report destination for the given `conn`, the multicaster thread will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if the queue was stopped. + Return `false` if the queue does not exist." [conn id] (assert (instance? Connection conn)) (rq/stop-multicaster-id! conn id)) @@ -417,7 +421,11 @@ The multicaster thread will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue belonging to `conn` was stopped. + Returns `false` is `conn` did not have any associated queues." [conn] (assert (instance? Connection conn)) (rq/stop-multicaster! conn)) @@ -427,7 +435,11 @@ All multicaster threads will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue was stopped. + Returns `false` if no queues existed." [] (rq/stop-all-multicasters!)) @@ -485,7 +497,7 @@ started-consuming? (promise) n 1] (init! {:conn conn - :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true))) @@ -493,8 +505,11 @@ (start!) (log/info "begin start! ... Done") (Thread/sleep 2000) + (log/info "*******************************************") @(d/transact conn [(put :q {:work 123})]) @started-consuming? + (stop-multicaster! conn) + (log/info "*******************************************") (stop!) (log/info "stop! done") nil))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index a9f7e07..c3fd383 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -52,9 +52,9 @@ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] (if (= poll-result :end) (do - (log/debug "Report queue listener received :end token. Exiting") + (log/debug "report-queue-listener received :end token. Exiting") (reset! running-local? false)) - ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) (process-poll-result! @config-atom id-ident poll-result @@ -115,9 +115,11 @@ (if send-end-token? (do #_(log/debug "offering :end token") - (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)) + (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS) + (log/debug "Multicaster sent :end token") + (log/debug "Multicaster failed to send :end token"))) (do - #_(log/debug "not offering :end token")))) + (log/debug "Multicaster not sending :end token")))) (when (seq new-state) (if (some? work-item) (reduce-kv @@ -125,7 +127,7 @@ (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)] (if (true? ok-offer) (assoc m id [send-end-token? q]) - (log/warn "Failed to offer item in multicaster for connection" conn "and queue id" id)))) + (log/error "Multicaster failed to offer item for connection" conn "and queue id" id)))) {} new-state) new-state))) @@ -150,6 +152,7 @@ (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))) (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec))) (d/remove-tx-report-queue conn) + (log/debug "Multicaster removed tx-report-queue for conn" conn) nil)))] (if new-state (recur new-state) @@ -180,26 +183,26 @@ (defn- wait-multicast-thread-step [conn] -; `get-tx-report-queue-multicast!` should return only when the multicaster thread -; has picked up the new queue. -; -; Otherwise the following could happen: -; 1. multicast thread is sleeping -; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` -; 3: user-thread (or somebody else) calls `stop-multicaster`. -; The multicast-state atom is now identical as it was in step 1. -; , Step 2 and 3 happened while the multicast thread was sleeping. -; 4: The multicast thread is scheduled and does _not_ detect any state change. -; Therefore the multicast thread does _not_ send out an :end token as one would expect. -; -; The new queue is written to memory at this point. No other thread can remove it because -; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. -; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, -; we can be sure that no other thread changes or has changed the state. -; -; Once [:iter-count conn] has changed, we know that the multicaster thread -; will see the new queue. This means that we can be sure that the queue -; will receive the `:end` token if the queue is stopped. + ; `get-tx-report-queue-multicast!` should return only when the multicaster thread + ; has picked up the new queue. + ; + ; Otherwise the following could happen: + ; 1. multicast thread is sleeping + ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` + ; 3: user-thread (or somebody else) calls `stop-multicaster`. + ; The multicast-state atom is now identical as it was in step 1. + ; , Step 2 and 3 happened while the multicast thread was sleeping. + ; 4: The multicast thread is scheduled and does _not_ detect any state change. + ; Therefore the multicast thread does _not_ send out an :end token as one would expect. + ; + ; The new queue is written to memory at this point. No other thread can remove it because + ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. + ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, + ; we can be sure that no other thread changes or has changed the state. + ; + ; Once [:iter-count conn] has changed, we know that the multicaster thread + ; will see the new queue. This means that we can be sure that the queue + ; will receive the `:end` token if the queue is stopped. (let [start-ms (System/currentTimeMillis) iter-count (get-in @multicast-state [:iter-count conn] -1)] (loop [spin-count 0] @@ -250,10 +253,10 @@ (if needs-multicaster? (do (start-multicaster! conn) - (log/debug "Multicaster thread started. Returning new queue for id" id) + (log/debug "Returning new queue for id" id "(multicaster thread started)") new-q) (do - (log/debug "Multicaster thread already exists. Returning new queue for id" id) + (log/debug "Returning new queue for id" id "(multicaster thread already running)") new-q)))))] ; wait for multicaster thread to pick up current Queue (wait-multicast-thread-step conn) @@ -306,32 +309,83 @@ (Thread/sleep 16) (recur)))))))))))))) +(defn- all-queues [state] + (->> (mapcat (fn [[conn qmap]] + (mapv (fn [q-id] [conn q-id]) + (keys qmap))) + (seq (get state :queues {}))) + (into #{}))) + +(comment + (do + (assert (= #{} + (all-queues {}))) + (assert (= #{} + (all-queues {:queues {}}))) + (assert (= #{[:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1}}}))) + (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}}))) + (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2} + :conn-b {:q-id-3 3}}}))))) + +(defn- removed-queues? [old new] + (not= (all-queues old) + (all-queues new))) + (defn stop-multicaster-id! [conn id] (assert (instance? Connection conn)) - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] - (let [new-state (dissoc-in old-state [:queues conn id])] - (if (= {} (get-in new-state [:queues conn])) - (dissoc-in old-state [:queues conn]) - new-state))))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] + (let [new-state (dissoc-in old-state [:queues conn id])] + (if (= {} (get-in new-state [:queues conn])) + (dissoc-in old-state [:queues conn]) + new-state))))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) (defn stop-multicaster! [conn] (assert (instance? Connection conn)) - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) (defn stop-all-multicasters! [] - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (require '[datomic.api :as d]) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) (comment (do |
