diff options
| author | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-04 13:23:07 +0200 |
|---|---|---|
| committer | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-14 12:52:42 +0200 |
| commit | ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc (patch) | |
| tree | 38db9a13c41576dd39a18ec4f4b2d498322a30c2 /src | |
| download | fiinha-ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc.tar.gz fiinha-ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc.tar.xz | |
Initial commit
Add release script
Release 0.1.3
Use com.github.ivarref.yoltq namespace
Use com.github.ivarref.yoltq namespace
Diffstat (limited to 'src')
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 175 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/error_poller.clj | 109 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/ext_sys.clj | 26 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/impl.clj | 147 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/poller.clj | 51 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/report_queue.clj | 54 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/slow_executor_detector.clj | 28 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/utils.clj | 154 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/virtual_queue.clj | 94 |
9 files changed, 838 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj new file mode 100644 index 0000000..565c01d --- /dev/null +++ b/src/com/github/ivarref/yoltq.clj @@ -0,0 +1,175 @@ +(ns com.github.ivarref.yoltq + (:require [datomic-schema.core] + [datomic.api :as d] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.poller :as poller] + [com.github.ivarref.yoltq.error-poller :as errpoller] + [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] + [com.github.ivarref.yoltq.utils :as u]) + (:import (datomic Connection) + (java.util.concurrent Executors TimeUnit ExecutorService) + (java.time Duration))) + + +(defonce ^:dynamic *config* (atom nil)) +(defonce threadpool (atom nil)) +(defonce ^:dynamic *running?* (atom false)) +(defonce ^:dynamic *test-mode* false) + + +(def default-opts + (-> {; Default number of times a queue job will be retried before giving up + ; Can be overridden on a per consumer basis with + ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200}) + :max-retries 100 + + ; Minimum amount of time to wait before a failed queue job is retried + :error-backoff-time (Duration/ofSeconds 5) + + ; Max time a queue job can execute before an error is logged + :max-execute-time (Duration/ofMinutes 5) + + ; Amount of time an in progress queue job can run before it is considered failed + ; and will be marked as such. + :hung-backoff-time (Duration/ofMinutes 30) + + ; Most queue jobs in init state will be consumed by the tx-report-queue listener. + ; However in the case where a init job was added right before the application + ; was shut down and did not have time to be processed by the tx-report-queue listener, + ; it will be consumer by the init poller. This init poller backs off by + ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could + ; otherwise occur if competing with the tx-report-queue listener. + :init-backoff-time (Duration/ofSeconds 60) + + ; How frequent polling for init, error and hung jobs should be done. + :poll-delay (Duration/ofSeconds 10) + + ; Specifies the number of threads available for executing queue and polling jobs. + ; The final thread pool will be this size + 2. + ; + ; One thread is permanently allocated for listening to the + ; tx-report-queue. + ; + ; Another thread is permanently allocated for checking :max-execute-time. + ; This means that if all executing queue jobs are stuck and the thread pool is unavailable + ; as such, at least an error will be logged about this. The log entry will + ; contain the stacktrace of the stuck threads. + :pool-size 4 + + ; How often should the system be polled for failed queue jobs + :system-error-poll-delay (Duration/ofMinutes 1) + + ; How often should the system invoke + :system-error-callback-backoff (Duration/ofHours 1)} + + u/duration->nanos)) + + +(defn init! [{:keys [conn] :as cfg}] + (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (locking threadpool + @(d/transact conn i/schema) + (let [new-cfg (swap! *config* + (fn [old-conf] + (-> (merge-with (fn [a b] (or b a)) + {:running-queues (atom #{}) + :start-execute-time (atom {})} + default-opts + old-conf + cfg) + (assoc :system-error (atom {})) + u/duration->nanos)))] + new-cfg))) + + +(defn add-consumer! + ([queue-id f] + (add-consumer! queue-id f {})) + ([queue-id f opts] + (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f})))))) + + +(defn put [id payload] + (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] + (when (and *test-mode* bootstrap-poller!) + (bootstrap-poller! conn)) + (i/put cfg id payload))) + + +(defn- do-start! [] + (let [{:keys [poll-delay pool-size system-error-poll-delay]} @*config*] + (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size))) + (let [pool @threadpool + queue-listener-ready (promise)] + (reset! *running?* true) + (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/NANOSECONDS) + (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/NANOSECONDS) + (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.execute pool (fn [] (slow-executor/show-slow-threads *running?* *config*))) + @queue-listener-ready))) + + +(defn start! [] + (locking threadpool + (cond (true? *test-mode*) + (log/info "test mode enabled, doing nothing for start!") + + (true? @*running?*) + nil + + (false? @*running?*) + (do-start!)))) + + +(defn stop! [] + (locking threadpool + (cond (true? *test-mode*) + (log/info "test mode enabled, doing nothing for stop!") + + (false? @*running?*) + nil + + (true? @*running?*) + (do + (reset! *running?* false) + (when-let [^ExecutorService tp @threadpool] + (log/debug "shutting down old threadpool") + (.shutdown tp) + (while (not (.awaitTermination tp 1 TimeUnit/SECONDS)) + (log/debug "waiting for threadpool to stop")) + (log/debug "stopped!") + (reset! threadpool nil)))))) + + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"ivarref.yoltq.report-queue"} :info] + [#{"ivarref.yoltq.poller"} :info] + [#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [ok-items (atom []) + conn (d/connect uri) + n 100] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 1) + :poll-delay (Duration/ofSeconds 1)}) + (add-consumer! :q (fn [payload] + (when (> (Math/random) 0.5) + (throw (ex-info "oops" {}))) + (if (= n (count (swap! received conj (:work payload)))) + (log/info "... and we are done!") + (log/info "got payload" payload "total ok:" (count @received))))) + (start!) + (dotimes [x n] + @(d/transact conn [(put :q {:work x})])) + nil))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj new file mode 100644 index 0000000..77339f7 --- /dev/null +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -0,0 +1,109 @@ +(ns com.github.ivarref.yoltq.error-poller + (:require [datomic.api :as d] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.tools.logging :as log])) + + +(defn get-state [v] + (case v + [:error :none] :recovery + [:error :some] :error + [:error :all] :error + [:recovery :none] :recovery + [:recovery :some] :recovery + [:recovery :all] :error + nil)) + + +(defn handle-error-count [{:keys [errors last-notify state] + :or {errors [] + last-notify 0 + state :recovery}} + {:keys [system-error-min-count system-error-callback-backoff] + :or {system-error-min-count 3}} + now-ns + error-count] + (let [new-errors (->> (conj errors error-count) + (take-last system-error-min-count) + (vec)) + classify (fn [coll] + (cond + (not= system-error-min-count (count coll)) + :missing + + (every? pos-int? coll) + :all + + (every? zero? coll) + :none + + :else + :some)) + old-state state] + (merge + {:errors new-errors + :last-notify last-notify} + (when-let [new-state (get-state [old-state (classify new-errors)])] + (merge + {:state new-state} + (when (and (= old-state :recovery) + (= new-state :error)) + {:run-callback :error + :last-notify now-ns}) + + (when (and (= new-state :error) + (= old-state :error) + (> now-ns + (+ last-notify system-error-callback-backoff))) + {:run-callback :error + :last-notify now-ns}) + + (when (and (= new-state :recovery) + (= old-state :error)) + {:run-callback :recovery})))))) + + +(defn do-poll-errors [{:keys [conn system-error + on-system-error + on-system-recovery] + :or {on-system-error (fn [] nil) + on-system-recovery (fn [] nil)} + :as config}] + (assert (some? conn) "expected :conn to be present") + (assert (some? system-error) "expected :system-error to be present") + (let [error-count (or (d/q '[:find (count ?e) . + :in $ ?status + :where + [?e :com.github.ivarref.yoltq/status ?status]] + (d/db conn) + u/status-error) + 0)] + (when (pos-int? error-count) + (log/debug "poll-errors found" error-count "errors in system")) + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] + (when run-callback + (cond (= run-callback :error) + (on-system-error) + + (= run-callback :recovery) + (on-system-recovery) + + :else + (log/error "unhandled callback-type" run-callback)) + (log/debug "run-callback is" run-callback)) + new-state))) + + +(defn poll-errors [running? config-atom] + (try + (when @running? + (do-poll-errors @config-atom)) + (catch Throwable t + (log/error t "unexpected error in poll-erros:" (ex-message t)) + nil))) + + +(comment + (do-poll-errors @com.github.ivarref.yoltq/*config*)) + diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj new file mode 100644 index 0000000..3480475 --- /dev/null +++ b/src/com/github/ivarref/yoltq/ext_sys.clj @@ -0,0 +1,26 @@ +(ns com.github.ivarref.yoltq.ext-sys + (:require [datomic.api :as d]) + (:import (java.util UUID))) + + +(def ^:dynamic *now-ns-atom* nil) +(def ^:dynamic *squuid-atom* nil) +(def ^:dynamic *random-atom* nil) + + +(defn now-ns [] + (if *now-ns-atom* + @*now-ns-atom* + (System/nanoTime))) + + +(defn squuid [] + (if *squuid-atom* + (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *squuid-atom* inc)))) + (d/squuid))) + + +(defn random-uuid [] + (if *random-atom* + (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc)))) + (UUID/randomUUID)))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj new file mode 100644 index 0000000..2acc83d --- /dev/null +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -0,0 +1,147 @@ +(ns com.github.ivarref.yoltq.impl + (:require [datomic.api :as d] + [clojure.tools.logging :as log] + [clojure.string :as str] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.ext-sys :as ext])) + + +(def schema + [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} + #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) + + +(defn put [config queue-name payload] + (if-let [_ (get-in config [:handlers queue-name])] + (let [id (u/squuid)] + (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) + {:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/payload (pr-str payload) + :com.github.ivarref.yoltq/bindings (pr-str {}) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ns)}) + (do + (log/error "Did not find registered handler for queue" queue-name) + (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) + + +(defn take! [{:keys [conn cas-failures hung-log-level] + :or {hung-log-level :error}} + {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}] + (when queue-item-info + (try + (cond to-error? + (log/logp hung-log-level "queue-item" (str id) "was hung and retried too many times. Giving up!") + + was-hung? + (log/logp hung-log-level "queue-item" (str id) "was hung, retrying ...") + + :else + nil) + (let [{:keys [db-after]} @(d/transact conn tx) + {:com.github.ivarref.yoltq/keys [status] :as q-item} (u/get-queue-item db-after id)] + (log/debug "queue item" (str id) "for queue" queue-name "now has status" status) + q-item) + (catch Throwable t + (let [{:db/keys [error] :as m} (u/db-error-map t)] + (cond + (= :db.error/cas-failed error) + (do + (log/info ":db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) + (when cas-failures + (swap! cas-failures inc)) + nil) + + :else + (do + (log/error t "Unexpected failure for queue item" (str id) ":" (ex-message t)) + nil))))))) + + +(defn mark-status! [{:keys [conn]} + {:com.github.ivarref.yoltq/keys [id lock tries]} + new-status] + (try + (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status] + (if (= new-status u/status-done) + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)} + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})] + {:keys [db-after]} @(d/transact conn tx)] + (u/get-queue-item db-after id)) + (catch Throwable t + (log/error t "unexpected error in mark-status!: " (ex-message t)) + nil))) + + +(defn fmt [id queue-name new-status tries spent-ns] + (str/join " " ["queue-item" (str id) + "for queue" queue-name + "now has status" new-status + "after" tries (if (= 1 tries) + "try" + "tries") + "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) + + +(defn execute! [{:keys [handlers mark-status-fn! start-execute-time] + :or {mark-status-fn! mark-status!} + :as cfg} + {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}] + (when queue-item + (if (= :error status) + (assoc queue-item :failed? true) + (if-let [queue (get handlers queue-name)] + (let [{:keys [f allow-cas-failure?]} queue] + (log/debug "queue item" (str id) "for queue" queue-name "is now processing") + (let [{:keys [retval exception]} + (try + (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name]) + (let [v (f payload)] + {:retval v}) + (catch Throwable t + {:exception t}) + (finally + (swap! start-execute-time dissoc (Thread/currentThread)))) + {:db/keys [error] :as m} (u/db-error-map exception)] + (cond + (and (some? exception) + allow-cas-failure? + (= :db.error/cas-failed error) + (or (true? allow-cas-failure?) + (allow-cas-failure? (:a m)))) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (assoc q-item :retval retval :success? true :allow-cas-failure? true))) + + (some? exception) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-error)] + (let [{:com.github.ivarref.yoltq/keys [init-time error-time tries]} q-item + level (if (>= tries 3) :error :warn)] + (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) + (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) + (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) + (assoc q-item :exception exception))) + + :else + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (assoc q-item :retval retval :success? true)))))) + (do + (log/error "no handler for queue" queue-name) + nil))))) diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj new file mode 100644 index 0000000..ad9d32a --- /dev/null +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -0,0 +1,51 @@ +(ns com.github.ivarref.yoltq.poller + (:require [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i] + [clojure.tools.logging :as log])) + + +(defn poll-once! [cfg q status] + (case status + :init (some->> (u/get-init cfg q) (i/take! cfg) (i/execute! cfg)) + :error (some->> (u/get-error cfg q) (i/take! cfg) (i/execute! cfg)) + :hung (some->> (u/get-hung cfg q) (i/take! cfg) (i/execute! cfg)))) + + +(defn poll-queue! [running? + {:keys [running-queues] :as cfg} + [queue-name status :as q]] + (try + (let [[old _] (swap-vals! running-queues conj q)] + (if-not (contains? old q) + (try + (log/debug "polling queue" queue-name "for status" status) + (let [start-time (u/now-ns) + last-res (loop [prev-res nil] + (when @running? + (let [res (poll-once! cfg queue-name status)] + (if (and res (:success? res)) + (recur res) + prev-res))))] + (let [spent-ns (- (u/now-ns) start-time)] + (log/trace "done polling queue" q "in" + (format "%.1f" (double (/ spent-ns 1e6))) + "ms")) + last-res) + (finally + (swap! running-queues disj q))) + (log/debug "queue" q "is already being polled, doing nothing..."))) + (catch Throwable t + (log/error t "poll-queue! crashed:" (ex-message t))) + (finally))) + + +(defn poll-all-queues! [running? config-atom pool] + (try + (when @running? + (let [{:keys [handlers]} @config-atom] + (doseq [q (shuffle (vec (for [q-name (keys handlers) + status [:init :error :hung]] + [q-name status])))] + (.execute pool (fn [] (poll-queue! running? @config-atom q)))))) + (catch Throwable t + (log/error t "poll-all-queues! crashed:" (ex-message t)))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj new file mode 100644 index 0000000..a40d29a --- /dev/null +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -0,0 +1,54 @@ +(ns com.github.ivarref.yoltq.report-queue + (:require [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i] + [datomic.api :as d] + [clojure.tools.logging :as log]) + (:import (datomic Datom) + (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit))) + + +(defn process-poll-result! [cfg id-ident poll-result consumer] + (let [{:keys [tx-data db-after]} poll-result] + (when-let [new-ids (->> tx-data + (filter (fn [^Datom datom] (and + (= (.a datom) id-ident) + (.added datom)))) + (mapv (fn [^Datom datom] (.v datom))) + (into []) + (not-empty))] + (doseq [id new-ids] + (consumer (fn [] + (try + (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name]} (u/get-queue-item db-after id)] + (some->> + (u/prepare-processing id queue-name lock status) + (i/take! cfg) + (i/execute! cfg))) + (catch Throwable t + (log/error t "unexpected error in process-poll-result!"))))))))) + + +(defn report-queue-listener [running? + ready? + ^ScheduledExecutorService pool + config-atom] + (let [conn (:conn @config-atom) + ^BlockingQueue q (d/tx-report-queue conn) + id-ident (d/q '[:find ?e . + :where [?e :db/ident :com.github.ivarref.yoltq/id]] + (d/db conn))] + (try + (while @running? + (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] + (process-poll-result! @config-atom + id-ident + poll-result + (fn [f] + (when @running? + (.execute ^ScheduledExecutorService pool f))))) + (deliver ready? true)) + (catch Throwable t + (log/error t "unexpected error in report-queue-listener")) + (finally + (log/debug "remove tx-report-queue") + (d/remove-tx-report-queue conn)))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj new file mode 100644 index 0000000..f15ef7d --- /dev/null +++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj @@ -0,0 +1,28 @@ +(ns com.github.ivarref.yoltq.slow-executor-detector + (:require [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.tools.logging :as log] + [clojure.string :as str])) + + +(defn- do-show-slow-threads [{:keys [start-execute-time + max-execute-time]}] + (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time] + (when (> (ext/now-ns) (+ start-time max-execute-time)) + (log/error "thread" (.getName thread) "spent too much time on" + "queue item" (str queue-id) + "for queue" queue-name + "stacktrace: \n" + (str/join "\n" (mapv str (seq (.getStackTrace thread)))))))) + + +(defn show-slow-threads [running? config-atom] + (try + (while @running? + (try + (do-show-slow-threads @config-atom) + (catch Throwable t + (log/error t "do-show-slow-threads crashed:" (ex-message t)))) + (dotimes [_ 3] + (when @running? (Thread/sleep 1000)))) + (catch Throwable t + (log/error t "reap! crashed:" (ex-message t)))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj new file mode 100644 index 0000000..c96d1dc --- /dev/null +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -0,0 +1,154 @@ +(ns com.github.ivarref.yoltq.utils + (:require [datomic.api :as d] + [clojure.edn :as edn] + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.tools.logging :as log]) + (:import (datomic Connection) + (java.time Duration))) + + +(def status-init :init) +(def status-processing :processing) +(def status-done :done) +(def status-error :error) + + +(defn duration->nanos [m] + (reduce-kv (fn [o k v] + (if (instance? Duration v) + (assoc o k (.toNanos v)) + (assoc o k v))) + {} + m)) + + +(defn squuid [] + (ext/squuid)) + + +(defn random-uuid [] + (ext/random-uuid)) + + +(defn now-ns [] + (ext/now-ns)) + + +(defn root-cause [e] + (if-let [root (ex-cause e)] + (root-cause root) + e)) + + +(defn db-error-map [^Throwable t] + (loop [e t] + (cond (nil? e) nil + + (and (map? (ex-data e)) + (contains? (ex-data e) :db/error)) + (ex-data e) + + :else + (recur (ex-cause e))))) + + +(defn get-queue-item [db id] + (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) + (dissoc :db/id) + (update :com.github.ivarref.yoltq/payload edn/read-string) + (update :com.github.ivarref.yoltq/bindings edn/read-string))) + + +(defn prepare-processing [id queue-name old-lock old-status] + (let [new-lock (random-uuid)] + {:id id + :lock new-lock + :queue-name queue-name + :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]})) + + +(defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (if-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff + :where + [?e :com.github.ivarref.yoltq/status :init] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(>= ?backoff ?init-time)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock]] + (or db (d/db conn)) + queue-name + (- (now-ns) init-backoff-time)) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing id queue-name old-lock :init)) + (log/trace "no new-items in :init status for queue" queue-name))) + + +(defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] + (when-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff ?max-tries + :where + [?e :com.github.ivarref.yoltq/status :error] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/error-time ?time] + [(>= ?backoff ?time)] + [?e :com.github.ivarref.yoltq/tries ?tries] + [(> ?max-tries ?tries)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock]] + (or db (d/db conn)) + queue-name + (- (now-ns) error-backoff-time) + (inc max-retries)) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing id queue-name old-lock :error))))) + + +(defn get-hung [{:keys [conn db now hung-backoff-time max-retries] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [now (or now (now-ns)) + max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] + (when-let [ids (->> (d/q '[:find ?id ?lock ?tries + :in $ ?qname ?backoff + :where + [?e :com.github.ivarref.yoltq/status :processing] + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/processing-time ?time] + [(>= ?backoff ?time)] + [?e :com.github.ivarref.yoltq/tries ?tries] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock]] + (or db (d/db conn)) + queue-name + (- now hung-backoff-time)) + (not-empty))] + (let [new-lock (random-uuid) + [id old-lock tries _t] (rand-nth (into [] ids)) + to-error? (>= tries max-retries)] + {:id id + :lock new-lock + :queue-name queue-name + :was-hung? true + :to-error? to-error? + :tx (if (not to-error?) + [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}] + [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status status-processing status-error] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}])})))) diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj new file mode 100644 index 0000000..e49aca3 --- /dev/null +++ b/src/com/github/ivarref/yoltq/virtual_queue.clj @@ -0,0 +1,94 @@ +(ns com.github.ivarref.yoltq.virtual-queue + (:require [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq :as dq] + [datomic.api :as d] + [com.github.ivarref.yoltq.poller :as poller]) + (:import (java.util.concurrent BlockingQueue TimeUnit))) + + +(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn] + (let [ready? (promise)] + (future + (let [q (d/tx-report-queue conn)] + (try + (while @running? + (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)] + (swap! txq conj poll-result)) + (deliver ready? true) + (reset! bootstrapped? true)) + (catch Throwable t + (log/error t "test-poller crashed: " (ex-message t))) + (finally + (try + (d/remove-tx-report-queue conn) + (catch Throwable t + (log/warn t "remove-tx-report-queue failed:" (ex-message t)))) + (deliver poller-exited? true))))) + @ready?)) + + +(defmacro with-virtual-queue! + [& body] + `(let [txq# (atom []) + poller-exited?# (promise) + bootstrapped?# (atom false) + running?# (atom true) + config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) + :init-backoff-time 0 + :hung-log-level :warn + :tx-queue txq#})] + (with-bindings {#'dq/*config* config# + #'dq/*running?* (atom false) + #'dq/*test-mode* true + #'ext/*now-ns-atom* (atom 0) + #'ext/*random-atom* (atom 0) + #'ext/*squuid-atom* (atom 0)} + (try + ~@body + (finally + (reset! running?# false) + (when @bootstrapped?# + @poller-exited?#)))))) + + +(defn call-with-virtual-queue! + [f] + (with-virtual-queue! + (f))) + + +(defn run-report-queue! [min-items] + (let [{:keys [tx-queue conn]} @dq/*config* + id-ident (d/q '[:find ?e . + :where [?e :db/ident :com.github.ivarref.yoltq/id]] + (d/db conn))] + (let [timeout (+ 3000 (System/currentTimeMillis))] + (while (and (< (System/currentTimeMillis) timeout) + (< (count @tx-queue) min-items)) + (Thread/sleep 10))) + (when (< (count @tx-queue) min-items) + (let [msg (str "run-report-queue: timeout waiting for " min-items " items")] + (log/error msg) + (throw (ex-info msg {})))) + (let [res (atom [])] + (doseq [itm (first (swap-vals! tx-queue (constantly [])))] + (rq/process-poll-result! + @dq/*config* + id-ident + itm + (fn [f] (swap! res conj (f))))) + @res))) + + +(defn run-one-report-queue! [] + (first (run-report-queue! 1))) + + +(defn run-queue-once! [q status] + (poller/poll-once! @dq/*config* q status)) + + +(defn put! [q payload] + @(d/transact (:conn @dq/*config*) [(dq/put q payload)]))
\ No newline at end of file |
