diff options
| author | ire <refsdal.ivar@gmail.com> | 2025-05-21 12:04:15 +0200 |
|---|---|---|
| committer | ire <refsdal.ivar@gmail.com> | 2025-05-21 12:04:15 +0200 |
| commit | ccfe353aebe8c22429fdaf76a5e0bf34cefca955 (patch) | |
| tree | 37a28c07942c6a7a48ad8bbb6a549f8579492d85 | |
| parent | Doc rationale for waiting for multicaster thread. Handle :end token in report... (diff) | |
| download | fiinha-ccfe353aebe8c22429fdaf76a5e0bf34cefca955.tar.gz fiinha-ccfe353aebe8c22429fdaf76a5e0bf34cefca955.tar.xz | |
Small fixes #7
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 51 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/report_queue.clj | 86 |
2 files changed, 88 insertions, 49 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 32298b7..80c9491 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -352,8 +352,8 @@ :p95 ... :p99 ...}}" [{:keys [age-days queue-name now db duration->long] - :or {age-days 30 - now (ZonedDateTime/now ZoneOffset/UTC) + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC) duration->long (fn [duration] (.toSeconds ^Duration duration))}}] (let [{:keys [conn]} @*config* db (or db (d/db conn)) @@ -386,11 +386,50 @@ (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. - The multicaster is started on demand. `conn` and `id` identifies the consumer. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. The default value is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (boolean? send-end-token?)) + (rq/get-tx-report-queue-multicast! conn id send-end-token?))) + +(defn stop-multicaster-id! + "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. + If this is the last report destination for the given `conn`, the multicaster thread will exit. + Repeated calls are no-op. - Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + Returns nil." [conn id] - (rq/get-tx-report-queue-multicast! conn id)) + (assert (instance? Connection conn)) + (rq/stop-multicaster-id! conn id)) + +(defn stop-multicaster! + "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. + The multicaster thread will exit. + Repeated calls are no-op. + + Returns nil." + [conn] + (assert (instance? Connection conn)) + (rq/stop-multicaster! conn)) + +(defn stop-all-multicasters! + "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`. + All multicaster threads will exit. + Repeated calls are no-op. + + Returns nil." + [] + (rq/stop-all-multicasters!)) (comment (do @@ -446,7 +485,7 @@ started-consuming? (promise) n 1] (init! {:conn conn - :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true))) diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 2a2e489..a9f7e07 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -6,6 +6,8 @@ (:import (datomic Connection Datom) (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) +; Private API, subject to change! + (defn process-poll-result! [cfg id-ident poll-result consumer] (let [{:keys [tx-data db-after]} poll-result] (when-let [new-ids (->> tx-data @@ -50,8 +52,9 @@ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] (if (= poll-result :end) (do - (reset! running-local? false) - #_(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + (log/debug "Report queue listener received :end token. Exiting") + (reset! running-local? false)) + ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) (process-poll-result! @config-atom id-ident poll-result @@ -175,7 +178,8 @@ (when (= :timeout (deref ready? 30000 :timeout)) (throw (RuntimeException. "Timed out waiting for multicaster to start"))))) -(defn- wait-multicast-thread-step [conn]) +(defn- wait-multicast-thread-step + [conn] ; `get-tx-report-queue-multicast!` should return only when the multicaster thread ; has picked up the new queue. ; @@ -196,29 +200,34 @@ ; Once [:iter-count conn] has changed, we know that the multicaster thread ; will see the new queue. This means that we can be sure that the queue ; will receive the `:end` token if the queue is stopped. -(let [start-ms (System/currentTimeMillis) - iter-count (get-in @multicast-state [:iter-count conn] -1)] - (loop [spin-count 0] - (if (not= iter-count (get-in @multicast-state [:iter-count conn])) - nil - (do - (let [spent-ms (- (System/currentTimeMillis) start-ms)] - (if (> spent-ms 30000) - (throw (RuntimeException. "Timed out waiting for multicaster thread")) - (do - (Thread/sleep 16) - (recur (inc spin-count))))))))) + (let [start-ms (System/currentTimeMillis) + iter-count (get-in @multicast-state [:iter-count conn] -1)] + (loop [spin-count 0] + (if (not= iter-count (get-in @multicast-state [:iter-count conn])) + nil + (do + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count)))))))))) (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. - The multicaster is started on demand. `conn` and `id` identifies the consumer. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. The default value is `false`. - Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." ([conn id] (get-tx-report-queue-multicast! conn id false)) ([conn id send-end-token?] (assert (instance? Connection conn)) - (assert (keyword? id)) (locking consumer-state-lock (let [the-q (locking multicast-state-lock @@ -250,7 +259,7 @@ (wait-multicast-thread-step conn) the-q)))) -(defn wait-multicast-threads-exit [[old-state new-state]] +(defn- wait-multicast-threads-exit [[old-state new-state]] (assert (map? old-state)) (assert (map? new-state)) (assert (map? (get old-state :queues {}))) @@ -275,6 +284,12 @@ ; sent by multiple threads. (let [old-conns (into #{} (keys (get old-state :queues {}))) new-conns (into #{} (keys (get new-state :queues {})))] + (assert (every? + (fn [x] (instance? Connection x)) + old-conns)) + (assert (every? + (fn [x] (instance? Connection x)) + new-conns)) (doseq [old-conn old-conns] (when-not (contains? new-conns old-conn) (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)] @@ -292,6 +307,7 @@ (recur)))))))))))))) (defn stop-multicaster-id! [conn id] + (assert (instance? Connection conn)) (locking consumer-state-lock (wait-multicast-threads-exit (locking multicast-state-lock @@ -299,40 +315,23 @@ (let [new-state (dissoc-in old-state [:queues conn id])] (if (= {} (get-in new-state [:queues conn])) (dissoc-in old-state [:queues conn]) - new-state)))))))) + new-state))))))) + nil) (defn stop-multicaster! [conn] + (assert (instance? Connection conn)) (locking consumer-state-lock (wait-multicast-threads-exit (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))))))) + (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))))) + nil) (defn stop-all-multicasters! [] (locking consumer-state-lock (wait-multicast-threads-exit (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {}))))))) - -(comment - (do - (require 'com.github.ivarref.yoltq.log-init) - (defn drain! [^BlockingQueue q] - (loop [items []] - (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)] - (recur (conj items elem)) - items))) - (com.github.ivarref.yoltq.log-init/init-logging! - [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] - [#{"com.github.ivarref.yoltq.report-queue"} :debug] - [#{"com.github.ivarref.yoltq.poller"} :info] - [#{"com.github.ivarref.yoltq"} :debug] - ;[#{"ivarref.yoltq*"} :info] - [#{"*"} :info]]) - (defonce conn (let [uri (str "datomic:mem://demo") - _ (d/delete-database uri) - _ (d/create-database uri) - conn (d/connect uri)] - conn)))) + (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))))) + nil) (comment (do @@ -366,6 +365,7 @@ @(d/transact conn [{:db/doc "demo"}]) (log/info "begin drain q1") (stop-multicaster-id! conn :q1) + (stop-multicaster-id! conn :q1) (println "thread count" @thread-count) (let [qitems-2 (drain! q2) qitems-1 (drain! q1)] |
