diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 539 |
1 files changed, 539 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj new file mode 100644 index 0000000..8c8ca7a --- /dev/null +++ b/src/com/github/ivarref/yoltq.clj @@ -0,0 +1,539 @@ +(ns com.github.ivarref.yoltq + (:require + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.error-poller :as errpoller] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.poller :as poller] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) + (:import (datomic Connection) + (java.lang.management ManagementFactory) + (java.time Duration Instant ZoneOffset ZonedDateTime) + (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit))) + +(defonce ^:dynamic *config* (atom nil)) +(defonce threadpool (atom nil)) +(defonce ^:dynamic *running?* (atom false)) +(defonce ^:dynamic *test-mode* false) + +(def default-opts + (-> {; Default number of times a queue job will be retried before giving up + ; Can be overridden on a per-consumer basis with + ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200}) + ; If you want no limit on the number of retries, specify + ; the value `0`. That will set the effective retry limit to + ; 9223372036854775807 times. + :max-retries 9223372036854775807 + + ; Minimum amount of time to wait before a failed queue job is retried + :error-backoff-time (Duration/ofSeconds 5) + + ; Max time a queue job can execute before an error is logged + :max-execute-time (Duration/ofMinutes 5) + + ; Amount of time an in progress queue job can run before it is considered failed + ; and will be marked as such. + :hung-backoff-time (Duration/ofMinutes 30) + + ; Most queue jobs in init state will be consumed by the tx-report-queue listener. + ; However, in the case where an init job was added right before the application + ; was shut down and did not have time to be processed by the tx-report-queue listener, + ; it will be consumer by the init poller. This init poller backs off by + ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could + ; otherwise occur if competing with the tx-report-queue listener. + :init-backoff-time (Duration/ofSeconds 60) + + ; If you are dealing with a flaky downstream service, you may not want + ; yoltq to mark itself as unhealthy on the first failure encounter with + ; the downstream service. Change this setting to let yoltq mark itself + ; as healthy even though a queue item has been failing for some time. + :healthy-allowed-error-time (Duration/ofMinutes 15) + + ; How frequent polling for init, error and hung jobs should be done. + :poll-delay (Duration/ofSeconds 10) + + ; Specifies the number of threads available for executing queue and polling jobs. + ; The final thread pool will be this size + 2. + ; + ; One thread is permanently allocated for listening to the + ; tx-report-queue. + ; + ; Another thread is permanently allocated for checking :max-execute-time. + ; This means that if all executing queue jobs are stuck and the thread pool is unavailable + ; as such, at least an error will be logged about this. The log entry will + ; contain the stacktrace of the stuck threads. + :pool-size 4 + + :capture-bindings (if-let [s (resolve (symbol "taoensso.timbre/*context*"))] + [s] + []) + + ; How often should the system be polled for failed queue jobs + :system-error-poll-delay (Duration/ofMinutes 1) + + ; How often should the system invoke + :system-error-callback-backoff (Duration/ofHours 1) + + ; Should old, possibly stalled jobs be automatically be migrated + ; as part of `start!`? + :auto-migrate? true} + + u/duration->millis)) + + +(defn init! [{:keys [conn tx-report-queue] :as cfg}] + (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (when (some? tx-report-queue) + (assert (instance? BlockingQueue tx-report-queue) + (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue"))) + (locking threadpool + @(d/transact conn i/schema) + (let [new-cfg (swap! *config* + (fn [old-conf] + (-> (merge-with (fn [_ b] b) + {:running-queues (atom #{}) + :start-execute-time (atom {}) + :system-error (atom {}) + :healthy? (atom nil) + :slow? (atom nil) + :slow-thread-watcher-done? (promise)} + default-opts + (if *test-mode* old-conf (select-keys old-conf [:handlers])) + cfg) + u/duration->millis)))] + new-cfg))) + + +(defn get-queue-id + [queue-id-or-var] + (cond (and (var? queue-id-or-var) + (keyword? (:yoltq/queue-id (meta queue-id-or-var)))) + (:yoltq/queue-id (meta queue-id-or-var)) + + (keyword? queue-id-or-var) + queue-id-or-var + + :else + (throw (ex-info (str "Could not get queue-id for " queue-id-or-var) {:queue-id queue-id-or-var})))) + +(defn add-consumer! + ([queue-id f] + (add-consumer! queue-id f {})) + ([queue-id f opts] + (swap! *config* (fn [old-config] (assoc-in old-config [:handlers (get-queue-id queue-id)] (merge opts {:f f})))))) + + +(defn put + ([queue-id payload] (put queue-id payload {})) + ([queue-id payload opts] + (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] + (when (and *test-mode* bootstrap-poller!) + (bootstrap-poller! conn)) + (i/put cfg (get-queue-id queue-id) payload opts)))) + + +(defn- do-start! [] + (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate? slow-thread-watcher-done?] :as cfg} @*config*] + (when auto-migrate? + (future (migrate/migrate! cfg))) + (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size))) + queue-listener-ready (promise)] + (reset! *running?* true) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) + (.execute ^ScheduledExecutorService pool + (fn [] + (try + (log/debug "report-queue-listener starting") + (rq/report-queue-listener *running?* queue-listener-ready pool *config*) + (finally + (log/debug "report-queue-listener exiting") + (deliver queue-listener-ready :finally))))) + (future (try + (slow-executor/show-slow-threads pool *config*) + (finally + (deliver slow-thread-watcher-done? :done)))) + (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)] + (cond (= :timeout q-listener-retval) + (do + (log/error "Timed out waiting for report-queue-listener to start") + (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start"))) + + (= :finally q-listener-retval) + (do + (log/error "report-queue-listener did not start") + (throw (IllegalStateException. "report-queue-listener did not start"))) + + (= :ready q-listener-retval) + (do + (log/debug "report-queue-listener is ready")) + + :else + (do + (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))) + (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))))))))) + + +(defn start! [] + (locking threadpool + (cond (true? *test-mode*) + (log/info "test mode enabled, doing nothing for start!") + + (true? @*running?*) + nil + + (false? @*running?*) + (do-start!)))) + + +(defn stop! [] + (locking threadpool + (cond (true? *test-mode*) + (log/info "test mode enabled, doing nothing for stop!") + + (false? @*running?*) + nil + + (true? @*running?*) + (do + (reset! *running?* false) + (when-let [^ExecutorService tp @threadpool] + (log/debug "shutting down threadpool") + (.shutdown tp) + (while (not (.awaitTermination tp 1 TimeUnit/SECONDS)) + (log/trace "waiting for threadpool to stop")) + (log/debug "stopped!") + (reset! threadpool nil)) + (when-let [wait-slow-threads (some->> *config* deref :slow-thread-watcher-done?)] + (log/debug "waiting for slow-thread-watcher to stop ...") + @wait-slow-threads + (log/debug "waiting for slow-thread-watcher to stop ... OK")))))) + + +(defn healthy? [] + (cond + (< (.toMinutes (Duration/ofMillis (.getUptime (ManagementFactory/getRuntimeMXBean)))) 10) + true + + (false? (some->> @*config* + :healthy? + (deref))) + false + + (true? (some->> @*config* + :slow? + (deref))) + false + + :else + true)) + +(defn unhealthy? + "Returns `true` if there are queues in error or a thread is slow, and the application has been up for over 10 minutes, otherwise `false`." + [] + (false? (healthy?))) + +(defn queue-stats [] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?status + :in $ + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status]] + db) + (mapv (partial zipmap [:e :qname :status])) + (mapv #(select-keys % [:qname :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname status]} v]] + (array-map + :qname qname + :status status + :count v))) + (sort-by (juxt :qname :status)) + (vec)))) + +(defn job-group-progress [queue-name job-group] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?job-group ?status + :keys :e :qname :job-group :status + :in $ ?qname ?job-group + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/job-group ?job-group] + [?e :com.github.ivarref.yoltq/status ?status]] + db queue-name job-group) + (mapv #(select-keys % [:qname :job-group :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname job-group status]} v]] + (array-map + :qname qname + :job-group job-group + :status status + :count v))) + (sort-by (juxt :qname :job-group :status)) + (vec)))) + +(defn get-errors [qname] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find [?id ...] + :in $ ?qname ?status + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/id ?id]] + db + qname + :error) + (mapv (partial u/get-queue-item db))))) + +(defn retry-one-error! [qname] + (let [{:keys [handlers] :as cfg} @*config* + _ (assert (contains? handlers qname) "Queue not found") + cfg (assoc-in cfg [:handlers qname :max-retries] Long/MAX_VALUE)] + (poller/poll-once! cfg qname :error))) + +(defn retry-stats + "Gather retry statistics. + + Optional keyword arguments: + * :age-days — last number of days to look at data from. Defaults to 30. + * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues. + + Example return value: + {:queue-a {:ok 100, :retries 2, :retry-percentage 2.0} + :queue-b {:ok 100, :retries 75, :retry-percentage 75.0}} + + From the example value above, we can see that :queue-b fails at a much higher rate than :queue-a. + Assuming that the queue consumers are correctly implemented, this means that the service representing :queue-b + is much more unstable than the one representing :queue-a. This again implies + that you will probably want to fix the downstream service of :queue-b, if that is possible. + " + [{:keys [age-days queue-name now db] + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC)}}] + (let [{:keys [conn]} @*config* + db (or db (d/db conn))] + (->> (d/query {:query {:find '[?qname ?status ?tries ?init-time] + :in (into '[$] (when queue-name '[?qname])) + :where '[[?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/tries ?tries] + [?e :com.github.ivarref.yoltq/init-time ?init-time]]} + :args (remove nil? [db queue-name])}) + (mapv (partial zipmap [:qname :status :tries :init-time])) + (mapv #(update % :init-time (fn [init-time] (.atZone (Instant/ofEpochMilli init-time) ZoneOffset/UTC)))) + (mapv #(assoc % :age-days (.toDays (Duration/between (:init-time %) now)))) + (filter #(<= (:age-days %) age-days)) + (group-by :qname) + (mapv (fn [[q values]] + {q (let [{:keys [ok retries] :as m} (->> values + (mapv (fn [{:keys [tries status]}] + (condp = status + u/status-init {} + u/status-processing {:processing 1 :retries (dec tries)} + u/status-done {:ok 1 :retries (dec tries)} + u/status-error {:error 1 :retries (dec tries)}))) + (reduce (partial merge-with +) {}))] + (into (sorted-map) (merge m + (when (pos-int? ok) + {:retry-percentage (double (* 100 (/ retries ok)))}))))})) + (into (sorted-map))))) + +(defn- percentile [n values] + (let [idx (int (Math/floor (* (count values) (/ n 100))))] + (nth values idx))) + +(defn processing-time-stats + "Gather processing time statistics. Default unit is seconds. + + Optional keyword arguments: + * :age-days — last number of days to look at data from. Defaults to 30. + Use nil to have no limit. + + * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues. + + * :duration->long - Specify what unit should be used for values. + Must take a java.time.Duration as input and return a long. + + Defaults to (fn [duration] (.toSeconds duration). + I.e. the default unit is seconds. + + Example return value: + {:queue-a {:avg 1 + :max 10 + :min 0 + :p50 ... + :p90 ... + :p95 ... + :p99 ...}}" + [{:keys [age-days queue-name now db duration->long] + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC) + duration->long (fn [duration] (.toSeconds ^Duration duration))}}] + (let [{:keys [conn]} @*config* + db (or db (d/db conn)) + ->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)] + (->> (d/query {:query {:find '[?qname ?status ?init-time ?done-time] + :in (into '[$ ?status] (when queue-name '[?qname])) + :where '[[?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [?e :com.github.ivarref.yoltq/done-time ?done-time]]} + :args (vec (remove nil? [db u/status-done queue-name]))}) + (mapv (partial zipmap [:qname :status :init-time :done-time])) + (mapv #(update % :init-time ->zdt)) + (mapv #(update % :done-time ->zdt)) + (mapv #(assoc % :age-days (.toDays (Duration/between (:init-time %) now)))) + (mapv #(assoc % :spent-time (duration->long (Duration/between (:init-time %) (:done-time %))))) + (filter #(or (nil? age-days) (<= (:age-days %) age-days))) + (group-by :qname) + (mapv (fn [[q values]] + (let [values (vec (sort (mapv :spent-time values)))] + {q (sorted-map + :max (apply max values) + :avg (int (Math/floor (/ (reduce + 0 values) (count values)))) + :p50 (percentile 50 values) + :p90 (percentile 90 values) + :p95 (percentile 95 values) + :p99 (percentile 99 values) + :min (apply min values))}))) + (into (sorted-map))))) + +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. + The default value for `send-end-token?` is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (boolean? send-end-token?)) + (rq/get-tx-report-queue-multicast! conn id send-end-token?))) + +(defn stop-multicast-consumer-id! + "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. + If this is the last report destination for the given `conn`, the multicaster thread will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if the queue was stopped. + Return `false` if the queue does not exist." + [conn id] + (assert (instance? Connection conn)) + (rq/stop-multicast-consumer-id! conn id)) + +(defn stop-multicaster! + "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. + The multicaster thread will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue belonging to `conn` was stopped. + Returns `false` is `conn` did not have any associated queues." + [conn] + (assert (instance? Connection conn)) + (rq/stop-multicaster! conn)) + +(defn stop-all-multicasters! + "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`. + All multicaster threads will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue was stopped. + Returns `false` if no queues existed." + [] + (rq/stop-all-multicasters!)) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"ivarref.yoltq.report-queue"} :info] + [#{"ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (future (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 1) + :poll-delay (Duration/ofSeconds 1) + :max-execute-time (Duration/ofSeconds 3) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true) + (log/info "sleeping...") + (Thread/sleep (.toMillis (Duration/ofSeconds 60))) + (log/info "done sleeping"))) + (start!) + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop!) + nil))))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq.migrate"} :warn] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true))) + (log/info "begin start! ...") + (start!) + (log/info "begin start! ... Done") + (Thread/sleep 2000) + (log/info "*******************************************") + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop-multicaster! conn) + (log/info "*******************************************") + (stop!) + (log/info "stop! done") + nil))))
\ No newline at end of file |
