aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq.clj
diff options
context:
space:
mode:
authorire <refsdal.ivar@gmail.com>2025-05-21 12:04:15 +0200
committerire <refsdal.ivar@gmail.com>2025-05-21 12:04:15 +0200
commitccfe353aebe8c22429fdaf76a5e0bf34cefca955 (patch)
tree37a28c07942c6a7a48ad8bbb6a549f8579492d85 /src/com/github/ivarref/yoltq.clj
parentDoc rationale for waiting for multicaster thread. Handle :end token in report... (diff)
downloadfiinha-ccfe353aebe8c22429fdaf76a5e0bf34cefca955.tar.gz
fiinha-ccfe353aebe8c22429fdaf76a5e0bf34cefca955.tar.xz
Small fixes #7
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
-rw-r--r--src/com/github/ivarref/yoltq.clj51
1 files changed, 45 insertions, 6 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 32298b7..80c9491 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -352,8 +352,8 @@
:p95 ...
:p99 ...}}"
[{:keys [age-days queue-name now db duration->long]
- :or {age-days 30
- now (ZonedDateTime/now ZoneOffset/UTC)
+ :or {age-days 30
+ now (ZonedDateTime/now ZoneOffset/UTC)
duration->long (fn [duration] (.toSeconds ^Duration duration))}}]
(let [{:keys [conn]} @*config*
db (or db (d/db conn))
@@ -386,11 +386,50 @@
(defn get-tx-report-queue-multicast!
"Multicast the datomic.api/tx-report-queue to different consumers.
- The multicaster is started on demand. `conn` and `id` identifies the consumer.
+ A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer.
+ Repeated calls using the same `conn` and `id` returns the same queue.
+
+ The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread
+ to send `:end` if the queue is stopped. The default value is `false`.
+
+ A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`.
+
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ ([conn id]
+ (get-tx-report-queue-multicast! conn id false))
+ ([conn id send-end-token?]
+ (assert (instance? Connection conn))
+ (assert (boolean? send-end-token?))
+ (rq/get-tx-report-queue-multicast! conn id send-end-token?)))
+
+(defn stop-multicaster-id!
+ "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`.
+ If this is the last report destination for the given `conn`, the multicaster thread will exit.
+ Repeated calls are no-op.
- Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ Returns nil."
[conn id]
- (rq/get-tx-report-queue-multicast! conn id))
+ (assert (instance? Connection conn))
+ (rq/stop-multicaster-id! conn id))
+
+(defn stop-multicaster!
+ "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`.
+ The multicaster thread will exit.
+ Repeated calls are no-op.
+
+ Returns nil."
+ [conn]
+ (assert (instance? Connection conn))
+ (rq/stop-multicaster! conn))
+
+(defn stop-all-multicasters!
+ "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`.
+ All multicaster threads will exit.
+ Repeated calls are no-op.
+
+ Returns nil."
+ []
+ (rq/stop-all-multicasters!))
(comment
(do
@@ -446,7 +485,7 @@
started-consuming? (promise)
n 1]
(init! {:conn conn
- :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq)
+ :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq)
:slow-thread-show-stacktrace? false})
(add-consumer! :q (fn [_]
(deliver started-consuming? true)))