aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq.clj
diff options
context:
space:
mode:
authorStefan van den Oord <stefan@viduet.eu>2025-09-09 15:10:22 +0200
committerStefan van den Oord <stefan@viduet.eu>2025-09-09 15:10:22 +0200
commit84b22b3ec7e1f132bddb2fa95de8a4ef7d8b43e5 (patch)
tree7f3ad37646403ef6baa3883379e9cd5b44e52ec4 /src/com/github/ivarref/yoltq.clj
parent#3 Add optional batch name to queue jobs (diff)
parentUpdate for release (diff)
downloadfiinha-84b22b3ec7e1f132bddb2fa95de8a4ef7d8b43e5.tar.gz
fiinha-84b22b3ec7e1f132bddb2fa95de8a4ef7d8b43e5.tar.xz
Merge branch 'main' into batches-of-jobs
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
-rw-r--r--src/com/github/ivarref/yoltq.clj146
1 files changed, 135 insertions, 11 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 1ba286e..ccd9062 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -12,8 +12,7 @@
(:import (datomic Connection)
(java.lang.management ManagementFactory)
(java.time Duration Instant ZoneOffset ZonedDateTime)
- (java.util.concurrent ExecutorService Executors TimeUnit)))
-
+ (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit)))
(defonce ^:dynamic *config* (atom nil))
(defonce threadpool (atom nil))
@@ -27,7 +26,7 @@
; If you want no limit on the number of retries, specify
; the value `0`. That will set the effective retry limit to
; 9223372036854775807 times.
- :max-retries 10000
+ :max-retries 9223372036854775807
; Minimum amount of time to wait before a failed queue job is retried
:error-backoff-time (Duration/ofSeconds 5)
@@ -85,8 +84,11 @@
u/duration->millis))
-(defn init! [{:keys [conn] :as cfg}]
+(defn init! [{:keys [conn tx-report-queue] :as cfg}]
(assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil")))
+ (when (some? tx-report-queue)
+ (assert (instance? BlockingQueue tx-report-queue)
+ (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue")))
(locking threadpool
@(d/transact conn i/schema)
(let [new-cfg (swap! *config*
@@ -140,14 +142,39 @@
(let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size)))
queue-listener-ready (promise)]
(reset! *running?* true)
- (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
- (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
- (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*)))
+ (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
+ (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
+ (.execute ^ScheduledExecutorService pool
+ (fn []
+ (try
+ (log/debug "report-queue-listener starting")
+ (rq/report-queue-listener *running?* queue-listener-ready pool *config*)
+ (finally
+ (log/debug "report-queue-listener exiting")
+ (deliver queue-listener-ready :finally)))))
(future (try
(slow-executor/show-slow-threads pool *config*)
(finally
(deliver slow-thread-watcher-done? :done))))
- @queue-listener-ready)))
+ (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)]
+ (cond (= :timeout q-listener-retval)
+ (do
+ (log/error "Timed out waiting for report-queue-listener to start")
+ (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start")))
+
+ (= :finally q-listener-retval)
+ (do
+ (log/error "report-queue-listener did not start")
+ (throw (IllegalStateException. "report-queue-listener did not start")))
+
+ (= :ready q-listener-retval)
+ (do
+ (log/debug "report-queue-listener is ready"))
+
+ :else
+ (do
+ (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))
+ (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))))))))))
(defn start! []
@@ -348,9 +375,9 @@
:p95 ...
:p99 ...}}"
[{:keys [age-days queue-name now db duration->long]
- :or {age-days 30
- now (ZonedDateTime/now ZoneOffset/UTC)
- duration->long (fn [duration] (.toSeconds duration))}}]
+ :or {age-days 30
+ now (ZonedDateTime/now ZoneOffset/UTC)
+ duration->long (fn [duration] (.toSeconds ^Duration duration))}}]
(let [{:keys [conn]} @*config*
db (or db (d/db conn))
->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)]
@@ -380,6 +407,66 @@
:min (apply min values))})))
(into (sorted-map)))))
+(defn get-tx-report-queue-multicast!
+ "Multicast the datomic.api/tx-report-queue to different consumers.
+ A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer.
+ Repeated calls using the same `conn` and `id` returns the same queue.
+
+ The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread
+ to send `:end` if the queue is stopped.
+ The default value for `send-end-token?` is `false`.
+
+ A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`.
+
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ ([conn id]
+ (get-tx-report-queue-multicast! conn id false))
+ ([conn id send-end-token?]
+ (assert (instance? Connection conn))
+ (assert (boolean? send-end-token?))
+ (rq/get-tx-report-queue-multicast! conn id send-end-token?)))
+
+(defn stop-multicast-consumer-id!
+ "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`.
+ If this is the last report destination for the given `conn`, the multicaster thread will exit.
+ Repeated calls are no-op.
+
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if the queue was stopped.
+ Return `false` if the queue does not exist."
+ [conn id]
+ (assert (instance? Connection conn))
+ (rq/stop-multicast-consumer-id! conn id))
+
+(defn stop-multicaster!
+ "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`.
+ The multicaster thread will exit.
+ Repeated calls are no-op.
+
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if any queue belonging to `conn` was stopped.
+ Returns `false` is `conn` did not have any associated queues."
+ [conn]
+ (assert (instance? Connection conn))
+ (rq/stop-multicaster! conn))
+
+(defn stop-all-multicasters!
+ "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`.
+ All multicaster threads will exit.
+ Repeated calls are no-op.
+
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if any queue was stopped.
+ Returns `false` if no queues existed."
+ []
+ (rq/stop-all-multicasters!))
+
(comment
(do
(require 'com.github.ivarref.yoltq.log-init)
@@ -413,3 +500,40 @@
@started-consuming?
(stop!)
nil)))))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq.migrate"} :warn]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://demo")]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)
+ started-consuming? (promise)
+ n 1]
+ (init! {:conn conn
+ :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true)
+ :slow-thread-show-stacktrace? false})
+ (add-consumer! :q (fn [_]
+ (deliver started-consuming? true)))
+ (log/info "begin start! ...")
+ (start!)
+ (log/info "begin start! ... Done")
+ (Thread/sleep 2000)
+ (log/info "*******************************************")
+ @(d/transact conn [(put :q {:work 123})])
+ @started-consuming?
+ (stop-multicaster! conn)
+ (log/info "*******************************************")
+ (stop!)
+ (log/info "stop! done")
+ nil)))) \ No newline at end of file