(ns com.github.ivarref.yoltq
(:require
[clojure.tools.logging :as log]
[com.github.ivarref.yoltq.error-poller :as errpoller]
[com.github.ivarref.yoltq.impl :as i]
[com.github.ivarref.yoltq.migrate :as migrate]
[com.github.ivarref.yoltq.poller :as poller]
[com.github.ivarref.yoltq.report-queue :as rq]
[com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
[com.github.ivarref.yoltq.utils :as u]
[datomic.api :as d])
(:import (datomic Connection)
(java.lang.management ManagementFactory)
(java.time Duration Instant ZoneOffset ZonedDateTime)
(java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit)))
(defonce ^:dynamic *config* (atom nil))
(defonce threadpool (atom nil))
(defonce ^:dynamic *running?* (atom false))
(defonce ^:dynamic *test-mode* false)
(def default-opts
(-> {; Default number of times a queue job will be retried before giving up
; Can be overridden on a per-consumer basis with
; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200})
; If you want no limit on the number of retries, specify
; the value `0`. That will set the effective retry limit to
; 9223372036854775807 times.
:max-retries 9223372036854775807
; Minimum amount of time to wait before a failed queue job is retried
:error-backoff-time (Duration/ofSeconds 5)
; Max time a queue job can execute before an error is logged
:max-execute-time (Duration/ofMinutes 5)
; Amount of time an in progress queue job can run before it is considered failed
; and will be marked as such.
:hung-backoff-time (Duration/ofMinutes 30)
; Most queue jobs in init state will be consumed by the tx-report-queue listener.
; However, in the case where an init job was added right before the application
; was shut down and did not have time to be processed by the tx-report-queue listener,
; it will be consumer by the init poller. This init poller backs off by
; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could
; otherwise occur if competing with the tx-report-queue listener.
:init-backoff-time (Duration/ofSeconds 60)
; If you are dealing with a flaky downstream service, you may not want
; yoltq to mark itself as unhealthy on the first failure encounter with
; the downstream service. Change this setting to let yoltq mark itself
; as healthy even though a queue item has been failing for some time.
:healthy-allowed-error-time (Duration/ofMinutes 15)
; How frequent polling for init, error and hung jobs should be done.
:poll-delay (Duration/ofSeconds 10)
; Specifies the number of threads available for executing queue and polling jobs.
; The final thread pool will be this size + 2.
;
; One thread is permanently allocated for listening to the
; tx-report-queue.
;
; Another thread is permanently allocated for checking :max-execute-time.
; This means that if all executing queue jobs are stuck and the thread pool is unavailable
; as such, at least an error will be logged about this. The log entry will
; contain the stacktrace of the stuck threads.
:pool-size 4
:capture-bindings (if-let [s (resolve (symbol "taoensso.timbre/*context*"))]
[s]
[])
; How often should the system be polled for failed queue jobs
:system-error-poll-delay (Duration/ofMinutes 1)
; How often should the system invoke
:system-error-callback-backoff (Duration/ofHours 1)
; Should old, possibly stalled jobs be automatically be migrated
; as part of `start!`?
:auto-migrate? true}
u/duration->millis))
(defn init! [{:keys [conn tx-report-queue] :as cfg}]
(assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil")))
(when (some? tx-report-queue)
(assert (instance? BlockingQueue tx-report-queue)
(str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue")))
(locking threadpool
@(d/transact conn i/schema)
(let [new-cfg (swap! *config*
(fn [old-conf]
(-> (merge-with (fn [_ b] b)
{:running-queues (atom #{})
:start-execute-time (atom {})
:system-error (atom {})
:healthy? (atom nil)
:slow? (atom nil)
:slow-thread-watcher-done? (promise)}
default-opts
(if *test-mode* old-conf (select-keys old-conf [:handlers]))
cfg)
u/duration->millis)))]
new-cfg)))
(defn get-queue-id
[queue-id-or-var]
(cond (and (var? queue-id-or-var)
(keyword? (:yoltq/queue-id (meta queue-id-or-var))))
(:yoltq/queue-id (meta queue-id-or-var))
(keyword? queue-id-or-var)
queue-id-or-var
:else
(throw (ex-info (str "Could not get queue-id for " queue-id-or-var) {:queue-id queue-id-or-var}))))
(defn add-consumer!
([queue-id f]
(add-consumer! queue-id f {}))
([queue-id f opts]
(swap! *config* (fn [old-config] (assoc-in old-config [:handlers (get-queue-id queue-id)] (merge opts {:f f}))))))
(defn put
([queue-id payload] (put queue-id payload {}))
([queue-id payload opts]
(let [{:keys [bootstrap-poller! conn] :as cfg} @*config*]
(when (and *test-mode* bootstrap-poller!)
(bootstrap-poller! conn))
(i/put cfg (get-queue-id queue-id) payload opts))))
(defn- do-start! []
(let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate? slow-thread-watcher-done?] :as cfg} @*config*]
(when auto-migrate?
(future (migrate/migrate! cfg)))
(let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size)))
queue-listener-ready (promise)]
(reset! *running?* true)
(.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
(.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
(.execute ^ScheduledExecutorService pool
(fn []
(try
(log/debug "report-queue-listener starting")
(rq/report-queue-listener *running?* queue-listener-ready pool *config*)
(finally
(log/debug "report-queue-listener exiting")
(deliver queue-listener-ready :finally)))))
(future (try
(slow-executor/show-slow-threads pool *config*)
(finally
(deliver slow-thread-watcher-done? :done))))
(let [q-listener-retval (deref queue-listener-ready 30000 :timeout)]
(cond (= :timeout q-listener-retval)
(do
(log/error "Timed out waiting for report-queue-listener to start")
(throw (IllegalStateException. "Timed out waiting for report-queue-listener to start")))
(= :finally q-listener-retval)
(do
(log/error "report-queue-listener did not start")
(throw (IllegalStateException. "report-queue-listener did not start")))
(= :ready q-listener-retval)
(do
(log/debug "report-queue-listener is ready"))
:else
(do
(log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))
(throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))))))))))
(defn start! []
(locking threadpool
(cond (true? *test-mode*)
(log/info "test mode enabled, doing nothing for start!")
(true? @*running?*)
nil
(false? @*running?*)
(do-start!))))
(defn stop! []
(locking threadpool
(cond (true? *test-mode*)
(log/info "test mode enabled, doing nothing for stop!")
(false? @*running?*)
nil
(true? @*running?*)
(do
(reset! *running?* false)
(when-let [^ExecutorService tp @threadpool]
(log/debug "shutting down threadpool")
(.shutdown tp)
(while (not (.awaitTermination tp 1 TimeUnit/SECONDS))
(log/trace "waiting for threadpool to stop"))
(log/debug "stopped!")
(reset! threadpool nil))
(when-let [wait-slow-threads (some->> *config* deref :slow-thread-watcher-done?)]
(log/debug "waiting for slow-thread-watcher to stop ...")
@wait-slow-threads
(log/debug "waiting for slow-thread-watcher to stop ... OK"))))))
(defn healthy? []
(cond
(< (.toMinutes (Duration/ofMillis (.getUptime (ManagementFactory/getRuntimeMXBean)))) 10)
true
(false? (some->> @*config*
:healthy?
(deref)))
false
(true? (some->> @*config*
:slow?
(deref)))
false
:else
true))
(defn unhealthy?
"Returns `true` if there are queues in error or a thread is slow, and the application has been up for over 10 minutes, otherwise `false`."
[]
(false? (healthy?)))
(defn queue-stats []
(let [{:keys [conn]} @*config*
db (d/db conn)]
(->> (d/q '[:find ?e ?qname ?status
:in $
:where
[?e :com.github.ivarref.yoltq/queue-name ?qname]
[?e :com.github.ivarref.yoltq/status ?status]]
db)
(mapv (partial zipmap [:e :qname :status]))
(mapv #(select-keys % [:qname :status]))
(mapv (fn [qitem] {qitem 1}))
(reduce (partial merge-with +) {})
(mapv (fn [[{:keys [qname status]} v]]
(array-map
:qname qname
:status status
:count v)))
(sort-by (juxt :qname :status))
(vec))))
(defn job-group-progress [queue-name job-group]
(let [{:keys [conn]} @*config*
db (d/db conn)]
(->> (d/q '[:find ?e ?qname ?job-group ?status
:keys :e :qname :job-group :status
:in $ ?qname ?job-group
:where
[?e :com.github.ivarref.yoltq/queue-name ?qname]
[?e :com.github.ivarref.yoltq/job-group ?job-group]
[?e :com.github.ivarref.yoltq/status ?status]]
db queue-name job-group)
(mapv #(select-keys % [:qname :job-group :status]))
(mapv (fn [qitem] {qitem 1}))
(reduce (partial merge-with +) {})
(mapv (fn [[{:keys [qname job-group status]} v]]
(array-map
:qname qname
:job-group job-group
:status status
:count v)))
(sort-by (juxt :qname :job-group :status))
(vec))))
(defn get-errors [qname]
(let [{:keys [conn]} @*config*
db (d/db conn)]
(->> (d/q '[:find [?id ...]
:in $ ?qname ?status
:where
[?e :com.github.ivarref.yoltq/queue-name ?qname]
[?e :com.github.ivarref.yoltq/status ?status]
[?e :com.github.ivarref.yoltq/id ?id]]
db
qname
:error)
(mapv (partial u/get-queue-item db)))))
(defn retry-one-error! [qname]
(let [{:keys [handlers] :as cfg} @*config*
_ (assert (contains? handlers qname) "Queue not found")
cfg (assoc-in cfg [:handlers qname :max-retries] Long/MAX_VALUE)]
(poller/poll-once! cfg qname :error)))
(defn retry-stats
"Gather retry statistics.
Optional keyword arguments:
* :age-days — last number of days to look at data from. Defaults to 30.
* :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues.
Example return value:
{:queue-a {:ok 100, :retries 2, :retry-percentage 2.0}
:queue-b {:ok 100, :retries 75, :retry-percentage 75.0}}
From the example value above, we can see that :queue-b fails at a much higher rate than :queue-a.
Assuming that the queue consumers are correctly implemented, this means that the service representing :queue-b
is much more unstable than the one representing :queue-a. This again implies
that you will probably want to fix the downstream service of :queue-b, if that is possible.
"
[{:keys [age-days queue-name now db]
:or {age-days 30
now (ZonedDateTime/now ZoneOffset/UTC)}}]
(let [{:keys [conn]} @*config*
db (or db (d/db conn))]
(->> (d/query {:query {:find '[?qname ?status ?tries ?init-time]
:in (into '[$] (when queue-name '[?qname]))
:where '[[?e :com.github.ivarref.yoltq/queue-name ?qname]
[?e :com.github.ivarref.yoltq/status ?status]
[?e :com.github.ivarref.yoltq/tries ?tries]
[?e :com.github.ivarref.yoltq/init-time ?init-time]]}
:args (remove nil? [db queue-name])})
(mapv (partial zipmap [:qname :status :tries :init-time]))
(mapv #(update % :init-time (fn [init-time] (.atZone (Instant/ofEpochMilli init-time) ZoneOffset/UTC))))
(mapv #(assoc % :age-days (.toDays (Duration/between (:init-time %) now))))
(filter #(<= (:age-days %) age-days))
(group-by :qname)
(mapv (fn [[q values]]
{q (let [{:keys [ok retries] :as m} (->> values
(mapv (fn [{:keys [tries status]}]
(condp = status
u/status-init {}
u/status-processing {:processing 1 :retries (dec tries)}
u/status-done {:ok 1 :retries (dec tries)}
u/status-error {:error 1 :retries (dec tries)})))
(reduce (partial merge-with +) {}))]
(into (sorted-map) (merge m
(when (pos-int? ok)
{:retry-percentage (double (* 100 (/ retries ok)))}))))}))
(into (sorted-map)))))
(defn- percentile [n values]
(let [idx (int (Math/floor (* (count values) (/ n 100))))]
(nth values idx)))
(defn processing-time-stats
"Gather processing time statistics. Default unit is seconds.
Optional keyword arguments:
* :age-days — last number of days to look at data from. Defaults to 30.
Use nil to have no limit.
* :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues.
* :duration->long - Specify what unit should be used for values.
Must take a java.time.Duration as input and return a long.
Defaults to (fn [duration] (.toSeconds duration).
I.e. the default unit is seconds.
Example return value:
{:queue-a {:avg 1
:max 10
:min 0
:p50 ...
:p90 ...
:p95 ...
:p99 ...}}"
[{:keys [age-days queue-name now db duration->long]
:or {age-days 30
now (ZonedDateTime/now ZoneOffset/UTC)
duration->long (fn [duration] (.toSeconds ^Duration duration))}}]
(let [{:keys [conn]} @*config*
db (or db (d/db conn))
->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)]
(->> (d/query {:query {:find '[?qname ?status ?init-time ?done-time]
:in (into '[$ ?status] (when queue-name '[?qname]))
:where '[[?e :com.github.ivarref.yoltq/queue-name ?qname]
[?e :com.github.ivarref.yoltq/status ?status]
[?e :com.github.ivarref.yoltq/init-time ?init-time]
[?e :com.github.ivarref.yoltq/done-time ?done-time]]}
:args (vec (remove nil? [db u/status-done queue-name]))})
(mapv (partial zipmap [:qname :status :init-time :done-time]))
(mapv #(update % :init-time ->zdt))
(mapv #(update % :done-time ->zdt))
(mapv #(assoc % :age-days (.toDays (Duration/between (:init-time %) now))))
(mapv #(assoc % :spent-time (duration->long (Duration/between (:init-time %) (:done-time %)))))
(filter #(or (nil? age-days) (<= (:age-days %) age-days)))
(group-by :qname)
(mapv (fn [[q values]]
(let [values (vec (sort (mapv :spent-time values)))]
{q (sorted-map
:max (apply max values)
:avg (int (Math/floor (/ (reduce + 0 values) (count values))))
:p50 (percentile 50 values)
:p90 (percentile 90 values)
:p95 (percentile 95 values)
:p99 (percentile 99 values)
:min (apply min values))})))
(into (sorted-map)))))
(defn get-tx-report-queue-multicast!
"Multicast the datomic.api/tx-report-queue to different consumers.
A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer.
Repeated calls using the same `conn` and `id` returns the same queue.
The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread
to send `:end` if the queue is stopped.
The default value for `send-end-token?` is `false`.
A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`.
Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
([conn id]
(get-tx-report-queue-multicast! conn id false))
([conn id send-end-token?]
(assert (instance? Connection conn))
(assert (boolean? send-end-token?))
(rq/get-tx-report-queue-multicast! conn id send-end-token?)))
(defn stop-multicast-consumer-id!
"Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`.
If this is the last report destination for the given `conn`, the multicaster thread will exit.
Repeated calls are no-op.
The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
was called.
Returns `true` if the queue was stopped.
Return `false` if the queue does not exist."
[conn id]
(assert (instance? Connection conn))
(rq/stop-multicast-consumer-id! conn id))
(defn stop-multicaster!
"Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`.
The multicaster thread will exit.
Repeated calls are no-op.
The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
was called.
Returns `true` if any queue belonging to `conn` was stopped.
Returns `false` is `conn` did not have any associated queues."
[conn]
(assert (instance? Connection conn))
(rq/stop-multicaster! conn))
(defn stop-all-multicasters!
"Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`.
All multicaster threads will exit.
Repeated calls are no-op.
The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
was called.
Returns `true` if any queue was stopped.
Returns `false` if no queues existed."
[]
(rq/stop-all-multicasters!))
(comment
(do
(require 'com.github.ivarref.yoltq.log-init)
(com.github.ivarref.yoltq.log-init/init-logging!
[[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
[#{"ivarref.yoltq.report-queue"} :info]
[#{"ivarref.yoltq.poller"} :info]
[#{"com.github.ivarref.yoltq"} :debug]
;[#{"ivarref.yoltq*"} :info]
[#{"*"} :info]])
(stop!)
(future (let [received (atom [])
uri (str "datomic:mem://demo")]
(d/delete-database uri)
(d/create-database uri)
(let [conn (d/connect uri)
started-consuming? (promise)
n 1]
(init! {:conn conn
:error-backoff-time (Duration/ofSeconds 1)
:poll-delay (Duration/ofSeconds 1)
:max-execute-time (Duration/ofSeconds 3)
:slow-thread-show-stacktrace? false})
(add-consumer! :q (fn [_]
(deliver started-consuming? true)
(log/info "sleeping...")
(Thread/sleep (.toMillis (Duration/ofSeconds 60)))
(log/info "done sleeping")))
(start!)
@(d/transact conn [(put :q {:work 123})])
@started-consuming?
(stop!)
nil)))))
(comment
(do
(require 'com.github.ivarref.yoltq.log-init)
(com.github.ivarref.yoltq.log-init/init-logging!
[[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
[#{"com.github.ivarref.yoltq.report-queue"} :debug]
[#{"com.github.ivarref.yoltq.poller"} :info]
[#{"com.github.ivarref.yoltq.migrate"} :warn]
[#{"com.github.ivarref.yoltq"} :debug]
;[#{"ivarref.yoltq*"} :info]
[#{"*"} :info]])
(stop!)
(let [received (atom [])
uri (str "datomic:mem://demo")]
(d/delete-database uri)
(d/create-database uri)
(let [conn (d/connect uri)
started-consuming? (promise)
n 1]
(init! {:conn conn
:tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true)
:slow-thread-show-stacktrace? false})
(add-consumer! :q (fn [_]
(deliver started-consuming? true)))
(log/info "begin start! ...")
(start!)
(log/info "begin start! ... Done")
(Thread/sleep 2000)
(log/info "*******************************************")
@(d/transact conn [(put :q {:work 123})])
@started-consuming?
(stop-multicaster! conn)
(log/info "*******************************************")
(stop!)
(log/info "stop! done")
nil))))