aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorire <refsdal.ivar@gmail.com>2025-05-13 21:39:07 +0200
committerire <refsdal.ivar@gmail.com>2025-05-13 21:39:07 +0200
commitae49a7ec82ecd3988e0f7825b0adead1dc77c911 (patch)
treeb484cb08144fd3385db77e3edb4766e7b04d991e
parentFix reflection warnings (diff)
downloadfiinha-ae49a7ec82ecd3988e0f7825b0adead1dc77c911.tar.gz
fiinha-ae49a7ec82ecd3988e0f7825b0adead1dc77c911.tar.xz
Fix tx-report-queue sharing #7
-rw-r--r--README.md34
-rw-r--r--deps.edn54
-rw-r--r--src/com/github/ivarref/yoltq.clj86
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj133
-rw-r--r--test/com/github/ivarref/yoltq/log_init.clj2
5 files changed, 258 insertions, 51 deletions
diff --git a/README.md b/README.md
index c5f2bdb..f84a336 100644
--- a/README.md
+++ b/README.md
@@ -434,6 +434,40 @@ If you liked this library, you may also like:
## Change log
+#### 2025-05-13 v0.2.?? [diff](https://github.com/ivarref/yoltq/compare/v0.2.64...HEAD)
+Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will
+then not grab the datomic report queue, but use the one provided:
+
+```clojure
+(require '[com.github.ivarref.yoltq :as yq])
+(yq/init! {:conn conn
+ :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq)
+ ; ^^ can be any `java.util.concurrent.BlockingQueue` value
+ })
+
+(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id))
+
+```
+
+Added multicast support for `datomic.api/tx-report-queue`:
+```clojure
+(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1))
+; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue`
+
+(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2))
+; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue`
+```
+
+`yq/get-tx-report-queue-multicast!` returns, like
+`datomic.api/tx-report-queue`,
+`java.util.concurrent.BlockingQueue` and starts a background thread that does
+the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!`
+returns the same `BlockingQueue`.
+
+Changed the default for `max-retries` from `10000` to `9223372036854775807`.
+
+Fixed reflection warnings.
+
#### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64)
Added support for `max-retries` being `0`, meaning the job should be retried forever
(or at least 9223372036854775807 times).
diff --git a/deps.edn b/deps.edn
index e36885e..1e3fa9d 100644
--- a/deps.edn
+++ b/deps.edn
@@ -1,33 +1,31 @@
-{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"}
- org.clojure/tools.logging {:mvn/version "1.2.4"}
- org.clojure/clojure {:mvn/version "1.11.1"}}
+{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"}
+ org.clojure/tools.logging {:mvn/version "1.2.4"}
+ org.clojure/clojure {:mvn/version "1.11.1"}
+ com.datomic/peer {:mvn/version "1.0.7364"}}
- :paths ["src"]
+ :paths ["src"]
- :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}}
- :test {:extra-paths ["test"]
- :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"}
- com.taoensso/timbre {:mvn/version "5.2.1"}
- com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
- clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
- com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}
- org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
- com.taoensso/nippy {:mvn/version "3.2.0"}
- io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}}
- :jvm-opts ["-DDISABLE_SPY=true"
- "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"]
- :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]}
+ :aliases {:test {:extra-paths ["test"]
+ :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"}
+ com.taoensso/timbre {:mvn/version "5.2.1"}
+ com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
+ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
+ org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
+ com.taoensso/nippy {:mvn/version "3.2.0"}
+ io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}}
+ :exec-fn cognitect.test-runner.api/test
+ :jvm-opts ["-DDISABLE_SPY=true"
+ "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"]
+ :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]}
- :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git"
- :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}}
- :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]}
+ :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git"
+ :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}}
+ :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]}
- :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}}
+ :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}}
- :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}}
- :exec-fn deps-deploy.deps-deploy/deploy
- :exec-args {:installer :remote
- :sign-releases? false
- :artifact "target/out.jar"}}}
-
- :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}}
+ :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}}
+ :exec-fn deps-deploy.deps-deploy/deploy
+ :exec-args {:installer :remote
+ :sign-releases? false
+ :artifact "target/out.jar"}}}} \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index a7dcddf..32298b7 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -12,7 +12,7 @@
(:import (datomic Connection)
(java.lang.management ManagementFactory)
(java.time Duration Instant ZoneOffset ZonedDateTime)
- (java.util.concurrent ExecutorService Executors ScheduledExecutorService TimeUnit)))
+ (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit)))
(defonce ^:dynamic *config* (atom nil))
(defonce threadpool (atom nil))
@@ -26,7 +26,7 @@
; If you want no limit on the number of retries, specify
; the value `0`. That will set the effective retry limit to
; 9223372036854775807 times.
- :max-retries 10000
+ :max-retries 9223372036854775807
; Minimum amount of time to wait before a failed queue job is retried
:error-backoff-time (Duration/ofSeconds 5)
@@ -86,6 +86,9 @@
(defn init! [{:keys [conn tx-report-queue] :as cfg}]
(assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil")))
+ (when (some? tx-report-queue)
+ (assert (instance? BlockingQueue tx-report-queue)
+ (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue")))
(locking threadpool
@(d/transact conn i/schema)
(let [new-cfg (swap! *config*
@@ -96,9 +99,6 @@
:system-error (atom {})
:healthy? (atom nil)
:slow? (atom nil)
- :get-tx-report-queue (fn []
- (or tx-report-queue
- (d/tx-report-queue conn)))
:slow-thread-watcher-done? (promise)}
default-opts
(if *test-mode* old-conf (select-keys old-conf [:handlers]))
@@ -144,12 +144,37 @@
(reset! *running?* true)
(.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
(.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
- (.execute ^ScheduledExecutorService pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*)))
+ (.execute ^ScheduledExecutorService pool
+ (fn []
+ (try
+ (log/debug "report-queue-listener starting")
+ (rq/report-queue-listener *running?* queue-listener-ready pool *config*)
+ (finally
+ (log/debug "report-queue-listener exiting")
+ (deliver queue-listener-ready :finally)))))
(future (try
(slow-executor/show-slow-threads pool *config*)
(finally
(deliver slow-thread-watcher-done? :done))))
- @queue-listener-ready)))
+ (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)]
+ (cond (= :timeout q-listener-retval)
+ (do
+ (log/error "Timed out waiting for report-queue-listener to start")
+ (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start")))
+
+ (= :finally q-listener-retval)
+ (do
+ (log/error "report-queue-listener did not start")
+ (throw (IllegalStateException. "report-queue-listener did not start")))
+
+ (= :ready q-listener-retval)
+ (do
+ (log/debug "report-queue-listener is ready"))
+
+ :else
+ (do
+ (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))
+ (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))))))))))
(defn start! []
@@ -359,14 +384,13 @@
:min (apply min values))})))
(into (sorted-map)))))
+(defn get-tx-report-queue-multicast!
+ "Multicast the datomic.api/tx-report-queue to different consumers.
+ The multicaster is started on demand. `conn` and `id` identifies the consumer.
-
-(defn add-tx-report-queue!
- ([conn]
- (add-tx-report-queue! conn :default))
- ([conn id]
- (if @*config*
- :...)))
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ [conn id]
+ (rq/get-tx-report-queue-multicast! conn id))
(comment
(do
@@ -401,3 +425,37 @@
@started-consuming?
(stop!)
nil)))))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq.migrate"} :warn]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://demo")]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)
+ started-consuming? (promise)
+ n 1]
+ (init! {:conn conn
+ :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq)
+ :slow-thread-show-stacktrace? false})
+ (add-consumer! :q (fn [_]
+ (deliver started-consuming? true)))
+ (log/info "begin start! ...")
+ (start!)
+ (log/info "begin start! ... Done")
+ (Thread/sleep 2000)
+ @(d/transact conn [(put :q {:work 123})])
+ @started-consuming?
+ (stop!)
+ (log/info "stop! done")
+ nil)))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index 20e0a93..9cddc93 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -3,8 +3,8 @@
[com.github.ivarref.yoltq.impl :as i]
[datomic.api :as d]
[clojure.tools.logging :as log])
- (:import (datomic Datom)
- (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit)))
+ (:import (datomic Connection Datom)
+ (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit)))
(defn process-poll-result! [cfg id-ident poll-result consumer]
@@ -28,18 +28,24 @@
(i/take! cfg)
(i/execute! cfg)))))
(catch Throwable t
- (log/error t "unexpected error in process-poll-result!")))))))))
+ (log/error t "Unexpected error in process-poll-result!")))))))))
(defn report-queue-listener [running?
ready?
^ScheduledExecutorService pool
config-atom]
- (let [conn (:conn @config-atom)
- ^BlockingQueue q (d/tx-report-queue conn)
+ (let [cfg @config-atom
+ conn (:conn cfg)
+ tx-report-queue-given (contains? cfg :tx-report-queue)
+ ^BlockingQueue q (if tx-report-queue-given
+ (get cfg :tx-report-queue)
+ (d/tx-report-queue conn))
id-ident (d/q '[:find ?e .
:where [?e :db/ident :com.github.ivarref.yoltq/id]]
(d/db conn))]
+ (assert (instance? BlockingQueue q))
+ (log/debug "tx-report-queue-given:" tx-report-queue-given)
(try
(while @running?
(when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
@@ -49,9 +55,118 @@
(fn [f]
(when @running?
(.execute ^ScheduledExecutorService pool f)))))
- (deliver ready? true))
+ (deliver ready? :ready))
(catch Throwable t
- (log/error t "unexpected error in report-queue-listener"))
+ (log/error t "Unexpected error in report-queue-listener:" (.getMessage t)))
(finally
- (log/debug "remove tx-report-queue")
- (d/remove-tx-report-queue conn))))) \ No newline at end of file
+ (if tx-report-queue-given
+ (log/debug "Remove tx-report-queue handled elsewhere")
+ (do
+ (log/debug "Remove tx-report-queue")
+ (d/remove-tx-report-queue conn)))))))
+
+(defonce ^:private multicast-state-lock (Object.))
+
+(defonce ^:private multicast-state (atom {}))
+
+(defn- start-multicaster! [conn]
+ (let [multicaster-ready? (promise)]
+ (future
+ (log/debug "Multicaster starting for conn" conn)
+ (try
+ (let [input-queue (d/tx-report-queue conn)]
+ (loop []
+ (when-let [mcast-state (get @multicast-state conn)]
+ (when-let [dest-queues (vals mcast-state)]
+ (let [element (.poll ^BlockingQueue input-queue 1 TimeUnit/SECONDS)]
+ (deliver multicaster-ready? :ready)
+ (when (some? element)
+ (doseq [q dest-queues]
+ (let [ok-offer (.offer ^BlockingQueue q element 30 TimeUnit/MINUTES)]
+ (when (false? ok-offer)
+ (log/error "Failed to offer item in multicaster for connection" conn))))))
+ (recur)))))
+ (catch Throwable t
+ (deliver multicaster-ready? :error)
+ (log/error t "Unexpected error in multicaster:" (.getMessage t)))
+ (finally
+ (d/remove-tx-report-queue conn)
+ (log/debug "Multicaster exiting for conn" conn))))
+ multicaster-ready?))
+
+(defn get-tx-report-queue-multicast!
+ "Multicast the datomic.api/tx-report-queue to different consumers.
+ The multicaster is started on demand. `conn` and `id` identifies the consumer.
+
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ [conn id]
+ (assert (instance? Connection conn))
+ (assert (keyword? id))
+ (locking multicast-state-lock
+ (assert (map? @multicast-state))
+ (if-let [existing-q (get-in @multicast-state [conn id])]
+ (do
+ (log/debug "returning existing queue for id" id)
+ (assert (instance? BlockingQueue existing-q))
+ existing-q)
+ (let [needs-multicaster? (not (contains? @multicast-state conn))
+ new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [conn id] (LinkedBlockingQueue.))))]
+ (when needs-multicaster?
+ (let [multicaster-promise (start-multicaster! conn)
+ multicaster-result (deref multicaster-promise (* 30 60000) :timeout)]
+ (cond (= multicaster-result :timeout)
+ (do
+ (log/error "Timeout waiting for multicaster to start")
+ (throw (RuntimeException. "Timeout waiting for multicaster to start")))
+ (= multicaster-result :error)
+ (do
+ (log/error "Multicaster failed to start")
+ (throw (RuntimeException. "Multicaster failed to start")))
+ (= multicaster-result :ready)
+ (log/debug "Multicaster is ready")
+
+ :else
+ (do
+ (log/error "Unexpected state from multicaster:" multicaster-result)
+ (throw (RuntimeException. (str "Unexpected state from multicaster: " multicaster-result)))))))
+ (let [new-q (get-in new-state [conn id])]
+ (assert (instance? BlockingQueue new-q))
+ new-q)))))
+
+(defn stop-all-multicasters! []
+ (reset! multicast-state {}))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (defonce conn (let [uri (str "datomic:mem://demo")
+ _ (d/delete-database uri)
+ _ (d/create-database uri)
+ conn (d/connect uri)]
+ conn))))
+
+(comment
+ (defn drain! [^BlockingQueue q]
+ (loop [cnt 0]
+ (if (nil? (.poll q 1 TimeUnit/SECONDS))
+ cnt
+ (recur (inc cnt))))))
+
+(comment
+ (let [q-1 (get-tx-report-queue-multicast! conn :q1)
+ q-2 (get-tx-report-queue-multicast! conn :q2)]))
+
+(comment
+ (drain! (get-tx-report-queue-multicast! conn :q1)))
+
+(comment
+ (do
+ @(d/transact conn [{:db/doc "demo"}])
+ :yay)) \ No newline at end of file
diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj
index 1aa6c02..f3fb6dc 100644
--- a/test/com/github/ivarref/yoltq/log_init.clj
+++ b/test/com/github/ivarref/yoltq/log_init.clj
@@ -3,6 +3,8 @@
[taoensso.timbre :as timbre]
[clojure.string :as str]))
+(set! *warn-on-reflection* true)
+
(def level-colors
{;:warn colors/red
:error colors/red})