aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq.clj
diff options
context:
space:
mode:
authorIvar Refsdal <ivar.refsdal@nsd.no>2021-09-04 13:23:07 +0200
committerIvar Refsdal <ivar.refsdal@nsd.no>2021-09-14 12:52:42 +0200
commitea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc (patch)
tree38db9a13c41576dd39a18ec4f4b2d498322a30c2 /src/com/github/ivarref/yoltq.clj
downloadfiinha-ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc.tar.gz
fiinha-ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc.tar.xz
Initial commit
Add release script Release 0.1.3 Use com.github.ivarref.yoltq namespace Use com.github.ivarref.yoltq namespace
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
-rw-r--r--src/com/github/ivarref/yoltq.clj175
1 files changed, 175 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
new file mode 100644
index 0000000..565c01d
--- /dev/null
+++ b/src/com/github/ivarref/yoltq.clj
@@ -0,0 +1,175 @@
+(ns com.github.ivarref.yoltq
+ (:require [datomic-schema.core]
+ [datomic.api :as d]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq.report-queue :as rq]
+ [com.github.ivarref.yoltq.poller :as poller]
+ [com.github.ivarref.yoltq.error-poller :as errpoller]
+ [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
+ [com.github.ivarref.yoltq.utils :as u])
+ (:import (datomic Connection)
+ (java.util.concurrent Executors TimeUnit ExecutorService)
+ (java.time Duration)))
+
+
+(defonce ^:dynamic *config* (atom nil))
+(defonce threadpool (atom nil))
+(defonce ^:dynamic *running?* (atom false))
+(defonce ^:dynamic *test-mode* false)
+
+
+(def default-opts
+ (-> {; Default number of times a queue job will be retried before giving up
+ ; Can be overridden on a per consumer basis with
+ ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200})
+ :max-retries 100
+
+ ; Minimum amount of time to wait before a failed queue job is retried
+ :error-backoff-time (Duration/ofSeconds 5)
+
+ ; Max time a queue job can execute before an error is logged
+ :max-execute-time (Duration/ofMinutes 5)
+
+ ; Amount of time an in progress queue job can run before it is considered failed
+ ; and will be marked as such.
+ :hung-backoff-time (Duration/ofMinutes 30)
+
+ ; Most queue jobs in init state will be consumed by the tx-report-queue listener.
+ ; However in the case where a init job was added right before the application
+ ; was shut down and did not have time to be processed by the tx-report-queue listener,
+ ; it will be consumer by the init poller. This init poller backs off by
+ ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could
+ ; otherwise occur if competing with the tx-report-queue listener.
+ :init-backoff-time (Duration/ofSeconds 60)
+
+ ; How frequent polling for init, error and hung jobs should be done.
+ :poll-delay (Duration/ofSeconds 10)
+
+ ; Specifies the number of threads available for executing queue and polling jobs.
+ ; The final thread pool will be this size + 2.
+ ;
+ ; One thread is permanently allocated for listening to the
+ ; tx-report-queue.
+ ;
+ ; Another thread is permanently allocated for checking :max-execute-time.
+ ; This means that if all executing queue jobs are stuck and the thread pool is unavailable
+ ; as such, at least an error will be logged about this. The log entry will
+ ; contain the stacktrace of the stuck threads.
+ :pool-size 4
+
+ ; How often should the system be polled for failed queue jobs
+ :system-error-poll-delay (Duration/ofMinutes 1)
+
+ ; How often should the system invoke
+ :system-error-callback-backoff (Duration/ofHours 1)}
+
+ u/duration->nanos))
+
+
+(defn init! [{:keys [conn] :as cfg}]
+ (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil")))
+ (locking threadpool
+ @(d/transact conn i/schema)
+ (let [new-cfg (swap! *config*
+ (fn [old-conf]
+ (-> (merge-with (fn [a b] (or b a))
+ {:running-queues (atom #{})
+ :start-execute-time (atom {})}
+ default-opts
+ old-conf
+ cfg)
+ (assoc :system-error (atom {}))
+ u/duration->nanos)))]
+ new-cfg)))
+
+
+(defn add-consumer!
+ ([queue-id f]
+ (add-consumer! queue-id f {}))
+ ([queue-id f opts]
+ (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f}))))))
+
+
+(defn put [id payload]
+ (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*]
+ (when (and *test-mode* bootstrap-poller!)
+ (bootstrap-poller! conn))
+ (i/put cfg id payload)))
+
+
+(defn- do-start! []
+ (let [{:keys [poll-delay pool-size system-error-poll-delay]} @*config*]
+ (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size)))
+ (let [pool @threadpool
+ queue-listener-ready (promise)]
+ (reset! *running?* true)
+ (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/NANOSECONDS)
+ (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/NANOSECONDS)
+ (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*)))
+ (.execute pool (fn [] (slow-executor/show-slow-threads *running?* *config*)))
+ @queue-listener-ready)))
+
+
+(defn start! []
+ (locking threadpool
+ (cond (true? *test-mode*)
+ (log/info "test mode enabled, doing nothing for start!")
+
+ (true? @*running?*)
+ nil
+
+ (false? @*running?*)
+ (do-start!))))
+
+
+(defn stop! []
+ (locking threadpool
+ (cond (true? *test-mode*)
+ (log/info "test mode enabled, doing nothing for stop!")
+
+ (false? @*running?*)
+ nil
+
+ (true? @*running?*)
+ (do
+ (reset! *running?* false)
+ (when-let [^ExecutorService tp @threadpool]
+ (log/debug "shutting down old threadpool")
+ (.shutdown tp)
+ (while (not (.awaitTermination tp 1 TimeUnit/SECONDS))
+ (log/debug "waiting for threadpool to stop"))
+ (log/debug "stopped!")
+ (reset! threadpool nil))))))
+
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"ivarref.yoltq.report-queue"} :info]
+ [#{"ivarref.yoltq.poller"} :info]
+ [#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://demo")]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [ok-items (atom [])
+ conn (d/connect uri)
+ n 100]
+ (init! {:conn conn
+ :error-backoff-time (Duration/ofSeconds 1)
+ :poll-delay (Duration/ofSeconds 1)})
+ (add-consumer! :q (fn [payload]
+ (when (> (Math/random) 0.5)
+ (throw (ex-info "oops" {})))
+ (if (= n (count (swap! received conj (:work payload))))
+ (log/info "... and we are done!")
+ (log/info "got payload" payload "total ok:" (count @received)))))
+ (start!)
+ (dotimes [x n]
+ @(d/transact conn [(put :q {:work x})]))
+ nil)))) \ No newline at end of file