diff options
| author | ire <refsdal.ivar@gmail.com> | 2025-05-13 21:39:07 +0200 |
|---|---|---|
| committer | ire <refsdal.ivar@gmail.com> | 2025-05-13 21:39:07 +0200 |
| commit | ae49a7ec82ecd3988e0f7825b0adead1dc77c911 (patch) | |
| tree | b484cb08144fd3385db77e3edb4766e7b04d991e /src/com/github/ivarref/yoltq.clj | |
| parent | Fix reflection warnings (diff) | |
| download | fiinha-ae49a7ec82ecd3988e0f7825b0adead1dc77c911.tar.gz fiinha-ae49a7ec82ecd3988e0f7825b0adead1dc77c911.tar.xz | |
Fix tx-report-queue sharing #7
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 86 |
1 files changed, 72 insertions, 14 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index a7dcddf..32298b7 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -12,7 +12,7 @@ (:import (datomic Connection) (java.lang.management ManagementFactory) (java.time Duration Instant ZoneOffset ZonedDateTime) - (java.util.concurrent ExecutorService Executors ScheduledExecutorService TimeUnit))) + (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit))) (defonce ^:dynamic *config* (atom nil)) (defonce threadpool (atom nil)) @@ -26,7 +26,7 @@ ; If you want no limit on the number of retries, specify ; the value `0`. That will set the effective retry limit to ; 9223372036854775807 times. - :max-retries 10000 + :max-retries 9223372036854775807 ; Minimum amount of time to wait before a failed queue job is retried :error-backoff-time (Duration/ofSeconds 5) @@ -86,6 +86,9 @@ (defn init! [{:keys [conn tx-report-queue] :as cfg}] (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (when (some? tx-report-queue) + (assert (instance? BlockingQueue tx-report-queue) + (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue"))) (locking threadpool @(d/transact conn i/schema) (let [new-cfg (swap! *config* @@ -96,9 +99,6 @@ :system-error (atom {}) :healthy? (atom nil) :slow? (atom nil) - :get-tx-report-queue (fn [] - (or tx-report-queue - (d/tx-report-queue conn))) :slow-thread-watcher-done? (promise)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) @@ -144,12 +144,37 @@ (reset! *running?* true) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) - (.execute ^ScheduledExecutorService pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.execute ^ScheduledExecutorService pool + (fn [] + (try + (log/debug "report-queue-listener starting") + (rq/report-queue-listener *running?* queue-listener-ready pool *config*) + (finally + (log/debug "report-queue-listener exiting") + (deliver queue-listener-ready :finally))))) (future (try (slow-executor/show-slow-threads pool *config*) (finally (deliver slow-thread-watcher-done? :done)))) - @queue-listener-ready))) + (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)] + (cond (= :timeout q-listener-retval) + (do + (log/error "Timed out waiting for report-queue-listener to start") + (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start"))) + + (= :finally q-listener-retval) + (do + (log/error "report-queue-listener did not start") + (throw (IllegalStateException. "report-queue-listener did not start"))) + + (= :ready q-listener-retval) + (do + (log/debug "report-queue-listener is ready")) + + :else + (do + (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))) + (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))))))))) (defn start! [] @@ -359,14 +384,13 @@ :min (apply min values))}))) (into (sorted-map))))) +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + The multicaster is started on demand. `conn` and `id` identifies the consumer. - -(defn add-tx-report-queue! - ([conn] - (add-tx-report-queue! conn :default)) - ([conn id] - (if @*config* - :...))) + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + [conn id] + (rq/get-tx-report-queue-multicast! conn id)) (comment (do @@ -401,3 +425,37 @@ @started-consuming? (stop!) nil))))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq.migrate"} :warn] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true))) + (log/info "begin start! ...") + (start!) + (log/info "begin start! ... Done") + (Thread/sleep 2000) + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop!) + (log/info "stop! done") + nil))))
\ No newline at end of file |
