diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 51 |
1 files changed, 45 insertions, 6 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 32298b7..80c9491 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -352,8 +352,8 @@ :p95 ... :p99 ...}}" [{:keys [age-days queue-name now db duration->long] - :or {age-days 30 - now (ZonedDateTime/now ZoneOffset/UTC) + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC) duration->long (fn [duration] (.toSeconds ^Duration duration))}}] (let [{:keys [conn]} @*config* db (or db (d/db conn)) @@ -386,11 +386,50 @@ (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. - The multicaster is started on demand. `conn` and `id` identifies the consumer. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. The default value is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (boolean? send-end-token?)) + (rq/get-tx-report-queue-multicast! conn id send-end-token?))) + +(defn stop-multicaster-id! + "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. + If this is the last report destination for the given `conn`, the multicaster thread will exit. + Repeated calls are no-op. - Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + Returns nil." [conn id] - (rq/get-tx-report-queue-multicast! conn id)) + (assert (instance? Connection conn)) + (rq/stop-multicaster-id! conn id)) + +(defn stop-multicaster! + "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. + The multicaster thread will exit. + Repeated calls are no-op. + + Returns nil." + [conn] + (assert (instance? Connection conn)) + (rq/stop-multicaster! conn)) + +(defn stop-all-multicasters! + "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`. + All multicaster threads will exit. + Repeated calls are no-op. + + Returns nil." + [] + (rq/stop-all-multicasters!)) (comment (do @@ -446,7 +485,7 @@ started-consuming? (promise) n 1] (init! {:conn conn - :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true))) |
