diff options
| author | Ivar Refsdal <refsdal.ivar@gmail.com> | 2022-08-15 08:37:39 +0200 |
|---|---|---|
| committer | Ivar Refsdal <refsdal.ivar@gmail.com> | 2022-08-15 08:37:39 +0200 |
| commit | 7d4477c318eefa711c7b7be46fd902419826e4c2 (patch) | |
| tree | 12caa9727280296a87a3b35678e17468bc4b389f /src/com/github/ivarref/yoltq.clj | |
| parent | Release 0.2.58 (diff) | |
| download | fiinha-7d4477c318eefa711c7b7be46fd902419826e4c2.tar.gz fiinha-7d4477c318eefa711c7b7be46fd902419826e4c2.tar.xz | |
Release 0.2.59
Fix slow thread watcher race condition when used with stop! https://github.com/ivarref/yoltq/issues/2
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 121 |
1 files changed, 72 insertions, 49 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index f4c2bf7..32693c3 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -1,17 +1,18 @@ (ns com.github.ivarref.yoltq - (:require [clojure.tools.logging :as log] - [com.github.ivarref.yoltq.error-poller :as errpoller] - [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq.migrate :as migrate] - [com.github.ivarref.yoltq.poller :as poller] - [com.github.ivarref.yoltq.report-queue :as rq] - [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] - [com.github.ivarref.yoltq.utils :as u] - [datomic.api :as d]) + (:require + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.error-poller :as errpoller] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.poller :as poller] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) (:import (datomic Connection) + (java.lang.management ManagementFactory) (java.time Duration) - (java.util.concurrent ExecutorService Executors TimeUnit) - (java.lang.management ManagementFactory))) + (java.util.concurrent ExecutorService Executors TimeUnit))) (defonce ^:dynamic *config* (atom nil)) @@ -70,7 +71,7 @@ ; Should old, possibly stalled jobs be automatically be migrated ; as part of `start!`? - :auto-migrate? true} + :auto-migrate? true} u/duration->millis)) @@ -82,10 +83,12 @@ (let [new-cfg (swap! *config* (fn [old-conf] (-> (merge-with (fn [_ b] b) - {:running-queues (atom #{}) - :start-execute-time (atom {}) - :system-error (atom {}) - :healthy? (atom nil)} + {:running-queues (atom #{}) + :start-execute-time (atom {}) + :system-error (atom {}) + :healthy? (atom nil) + :slow? (atom nil) + :slow-thread-watcher-done? (promise)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) cfg) @@ -122,17 +125,19 @@ (defn- do-start! [] - (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate?] :as cfg} @*config*] + (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate? slow-thread-watcher-done?] :as cfg} @*config*] (when auto-migrate? (future (migrate/migrate! cfg))) - (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size))) - (let [pool @threadpool + (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size))) queue-listener-ready (promise)] (reset! *running?* true) (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) - (.execute pool (fn [] (slow-executor/show-slow-threads *running?* *config*))) + (future (try + (slow-executor/show-slow-threads pool *config*) + (finally + (deliver slow-thread-watcher-done? :done)))) @queue-listener-ready))) @@ -160,23 +165,38 @@ (do (reset! *running?* false) (when-let [^ExecutorService tp @threadpool] - (log/debug "shutting down old threadpool") + (log/debug "shutting down threadpool") (.shutdown tp) (while (not (.awaitTermination tp 1 TimeUnit/SECONDS)) - (log/debug "waiting for threadpool to stop")) + (log/trace "waiting for threadpool to stop")) (log/debug "stopped!") - (reset! threadpool nil)))))) + (reset! threadpool nil)) + (when-let [wait-slow-threads (some->> *config* deref :slow-thread-watcher-done?)] + (log/debug "waiting for slow-thread-watcher to stop ...") + @wait-slow-threads + (log/debug "waiting for slow-thread-watcher to stop ... OK")))))) (defn healthy? [] - (or + (cond (< (.toMinutes (Duration/ofMillis (.getUptime (ManagementFactory/getRuntimeMXBean)))) 10) - (some->> @*config* - :healthy? - (deref)))) + true + + (false? (some->> @*config* + :healthy? + (deref))) + false + + (true? (some->> @*config* + :slow? + (deref))) + false + + :else + true)) (defn unhealthy? - "Returns `true` if there are queues in error and application has been up for over 10 minutes, otherwise `false`." + "Returns `true` if there are queues in error or a thread is slow, and the application has been up for over 10 minutes, otherwise `false`." [] (false? (healthy?))) @@ -228,26 +248,29 @@ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] [#{"ivarref.yoltq.report-queue"} :info] [#{"ivarref.yoltq.poller"} :info] - [#{"ivarref.yoltq*"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] [#{"*"} :info]]) (stop!) - (let [received (atom []) - uri (str "datomic:mem://demo")] - (d/delete-database uri) - (d/create-database uri) - (let [ok-items (atom []) - conn (d/connect uri) - n 1] - (init! {:conn conn - :error-backoff-time (Duration/ofSeconds 1) - :poll-delay (Duration/ofSeconds 1)}) - (add-consumer! :q (fn [payload] - #_(when (> (Math/random) 0.5) - (throw (ex-info "oops" {}))) - (if (= n (count (swap! received conj (:work payload)))) - (log/info "... and we are done!") - (log/info "got payload" payload "total ok:" (count @received))))) - (start!) - (dotimes [x n] - @(d/transact conn [(put :q {:work 123})])) - nil)))) + (future (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 1) + :poll-delay (Duration/ofSeconds 1) + :max-execute-time (Duration/ofSeconds 3) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true) + (log/info "sleeping...") + (Thread/sleep (.toMillis (Duration/ofSeconds 60))) + (log/info "done sleeping"))) + (start!) + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop!) + nil))))) |
