diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 146 |
1 files changed, 135 insertions, 11 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 1ba286e..ccd9062 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -12,8 +12,7 @@ (:import (datomic Connection) (java.lang.management ManagementFactory) (java.time Duration Instant ZoneOffset ZonedDateTime) - (java.util.concurrent ExecutorService Executors TimeUnit))) - + (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit))) (defonce ^:dynamic *config* (atom nil)) (defonce threadpool (atom nil)) @@ -27,7 +26,7 @@ ; If you want no limit on the number of retries, specify ; the value `0`. That will set the effective retry limit to ; 9223372036854775807 times. - :max-retries 10000 + :max-retries 9223372036854775807 ; Minimum amount of time to wait before a failed queue job is retried :error-backoff-time (Duration/ofSeconds 5) @@ -85,8 +84,11 @@ u/duration->millis)) -(defn init! [{:keys [conn] :as cfg}] +(defn init! [{:keys [conn tx-report-queue] :as cfg}] (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (when (some? tx-report-queue) + (assert (instance? BlockingQueue tx-report-queue) + (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue"))) (locking threadpool @(d/transact conn i/schema) (let [new-cfg (swap! *config* @@ -140,14 +142,39 @@ (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size))) queue-listener-ready (promise)] (reset! *running?* true) - (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) - (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) - (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) + (.execute ^ScheduledExecutorService pool + (fn [] + (try + (log/debug "report-queue-listener starting") + (rq/report-queue-listener *running?* queue-listener-ready pool *config*) + (finally + (log/debug "report-queue-listener exiting") + (deliver queue-listener-ready :finally))))) (future (try (slow-executor/show-slow-threads pool *config*) (finally (deliver slow-thread-watcher-done? :done)))) - @queue-listener-ready))) + (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)] + (cond (= :timeout q-listener-retval) + (do + (log/error "Timed out waiting for report-queue-listener to start") + (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start"))) + + (= :finally q-listener-retval) + (do + (log/error "report-queue-listener did not start") + (throw (IllegalStateException. "report-queue-listener did not start"))) + + (= :ready q-listener-retval) + (do + (log/debug "report-queue-listener is ready")) + + :else + (do + (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))) + (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))))))))) (defn start! [] @@ -348,9 +375,9 @@ :p95 ... :p99 ...}}" [{:keys [age-days queue-name now db duration->long] - :or {age-days 30 - now (ZonedDateTime/now ZoneOffset/UTC) - duration->long (fn [duration] (.toSeconds duration))}}] + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC) + duration->long (fn [duration] (.toSeconds ^Duration duration))}}] (let [{:keys [conn]} @*config* db (or db (d/db conn)) ->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)] @@ -380,6 +407,66 @@ :min (apply min values))}))) (into (sorted-map))))) +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. + The default value for `send-end-token?` is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (boolean? send-end-token?)) + (rq/get-tx-report-queue-multicast! conn id send-end-token?))) + +(defn stop-multicast-consumer-id! + "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. + If this is the last report destination for the given `conn`, the multicaster thread will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if the queue was stopped. + Return `false` if the queue does not exist." + [conn id] + (assert (instance? Connection conn)) + (rq/stop-multicast-consumer-id! conn id)) + +(defn stop-multicaster! + "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. + The multicaster thread will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue belonging to `conn` was stopped. + Returns `false` is `conn` did not have any associated queues." + [conn] + (assert (instance? Connection conn)) + (rq/stop-multicaster! conn)) + +(defn stop-all-multicasters! + "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`. + All multicaster threads will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue was stopped. + Returns `false` if no queues existed." + [] + (rq/stop-all-multicasters!)) + (comment (do (require 'com.github.ivarref.yoltq.log-init) @@ -413,3 +500,40 @@ @started-consuming? (stop!) nil))))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq.migrate"} :warn] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true))) + (log/info "begin start! ...") + (start!) + (log/info "begin start! ... Done") + (Thread/sleep 2000) + (log/info "*******************************************") + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop-multicaster! conn) + (log/info "*******************************************") + (stop!) + (log/info "stop! done") + nil))))
\ No newline at end of file |
