diff options
Diffstat (limited to 'src/com/github')
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 121 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/slow_executor_detector.clj | 38 |
2 files changed, 95 insertions, 64 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index f4c2bf7..32693c3 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -1,17 +1,18 @@ (ns com.github.ivarref.yoltq - (:require [clojure.tools.logging :as log] - [com.github.ivarref.yoltq.error-poller :as errpoller] - [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq.migrate :as migrate] - [com.github.ivarref.yoltq.poller :as poller] - [com.github.ivarref.yoltq.report-queue :as rq] - [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] - [com.github.ivarref.yoltq.utils :as u] - [datomic.api :as d]) + (:require + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.error-poller :as errpoller] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.poller :as poller] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) (:import (datomic Connection) + (java.lang.management ManagementFactory) (java.time Duration) - (java.util.concurrent ExecutorService Executors TimeUnit) - (java.lang.management ManagementFactory))) + (java.util.concurrent ExecutorService Executors TimeUnit))) (defonce ^:dynamic *config* (atom nil)) @@ -70,7 +71,7 @@ ; Should old, possibly stalled jobs be automatically be migrated ; as part of `start!`? - :auto-migrate? true} + :auto-migrate? true} u/duration->millis)) @@ -82,10 +83,12 @@ (let [new-cfg (swap! *config* (fn [old-conf] (-> (merge-with (fn [_ b] b) - {:running-queues (atom #{}) - :start-execute-time (atom {}) - :system-error (atom {}) - :healthy? (atom nil)} + {:running-queues (atom #{}) + :start-execute-time (atom {}) + :system-error (atom {}) + :healthy? (atom nil) + :slow? (atom nil) + :slow-thread-watcher-done? (promise)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) cfg) @@ -122,17 +125,19 @@ (defn- do-start! [] - (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate?] :as cfg} @*config*] + (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate? slow-thread-watcher-done?] :as cfg} @*config*] (when auto-migrate? (future (migrate/migrate! cfg))) - (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size))) - (let [pool @threadpool + (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size))) queue-listener-ready (promise)] (reset! *running?* true) (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) - (.execute pool (fn [] (slow-executor/show-slow-threads *running?* *config*))) + (future (try + (slow-executor/show-slow-threads pool *config*) + (finally + (deliver slow-thread-watcher-done? :done)))) @queue-listener-ready))) @@ -160,23 +165,38 @@ (do (reset! *running?* false) (when-let [^ExecutorService tp @threadpool] - (log/debug "shutting down old threadpool") + (log/debug "shutting down threadpool") (.shutdown tp) (while (not (.awaitTermination tp 1 TimeUnit/SECONDS)) - (log/debug "waiting for threadpool to stop")) + (log/trace "waiting for threadpool to stop")) (log/debug "stopped!") - (reset! threadpool nil)))))) + (reset! threadpool nil)) + (when-let [wait-slow-threads (some->> *config* deref :slow-thread-watcher-done?)] + (log/debug "waiting for slow-thread-watcher to stop ...") + @wait-slow-threads + (log/debug "waiting for slow-thread-watcher to stop ... OK")))))) (defn healthy? [] - (or + (cond (< (.toMinutes (Duration/ofMillis (.getUptime (ManagementFactory/getRuntimeMXBean)))) 10) - (some->> @*config* - :healthy? - (deref)))) + true + + (false? (some->> @*config* + :healthy? + (deref))) + false + + (true? (some->> @*config* + :slow? + (deref))) + false + + :else + true)) (defn unhealthy? - "Returns `true` if there are queues in error and application has been up for over 10 minutes, otherwise `false`." + "Returns `true` if there are queues in error or a thread is slow, and the application has been up for over 10 minutes, otherwise `false`." [] (false? (healthy?))) @@ -228,26 +248,29 @@ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] [#{"ivarref.yoltq.report-queue"} :info] [#{"ivarref.yoltq.poller"} :info] - [#{"ivarref.yoltq*"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] [#{"*"} :info]]) (stop!) - (let [received (atom []) - uri (str "datomic:mem://demo")] - (d/delete-database uri) - (d/create-database uri) - (let [ok-items (atom []) - conn (d/connect uri) - n 1] - (init! {:conn conn - :error-backoff-time (Duration/ofSeconds 1) - :poll-delay (Duration/ofSeconds 1)}) - (add-consumer! :q (fn [payload] - #_(when (> (Math/random) 0.5) - (throw (ex-info "oops" {}))) - (if (= n (count (swap! received conj (:work payload)))) - (log/info "... and we are done!") - (log/info "got payload" payload "total ok:" (count @received))))) - (start!) - (dotimes [x n] - @(d/transact conn [(put :q {:work 123})])) - nil)))) + (future (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 1) + :poll-delay (Duration/ofSeconds 1) + :max-execute-time (Duration/ofSeconds 3) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true) + (log/info "sleeping...") + (Thread/sleep (.toMillis (Duration/ofSeconds 60))) + (log/info "done sleeping"))) + (start!) + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop!) + nil))))) diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj index 80d3718..53dfe89 100644 --- a/src/com/github/ivarref/yoltq/slow_executor_detector.clj +++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj @@ -1,28 +1,36 @@ (ns com.github.ivarref.yoltq.slow-executor-detector - (:require [com.github.ivarref.yoltq.ext-sys :as ext] + (:require [clojure.string :as str] [clojure.tools.logging :as log] - [clojure.string :as str])) - + [com.github.ivarref.yoltq.ext-sys :as ext]) + (:import (java.util.concurrent ExecutorService))) (defn- do-show-slow-threads [{:keys [start-execute-time - max-execute-time]}] - (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time] - (when (> (ext/now-ms) (+ start-time max-execute-time)) - (log/error "thread" (.getName thread) "spent too much time on" - "queue item" (str queue-id) - "for queue" queue-name - "stacktrace: \n" - (str/join "\n" (mapv str (seq (.getStackTrace thread)))))))) - + max-execute-time + slow? + slow-thread-show-stacktrace?] + :or {slow-thread-show-stacktrace? true}}] + (let [new-slow-val (atom false)] + (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time] + (when (> (ext/now-ms) (+ start-time max-execute-time)) + (reset! new-slow-val true) + (log/error "thread" (.getName thread) "spent too much time on" + "queue item" (str queue-id) + "for queue" queue-name + (if slow-thread-show-stacktrace? + (str "stacktrace: \n" (str/join "\n" (mapv str (seq (.getStackTrace thread))))) + "")))) + (reset! slow? @new-slow-val))) -(defn show-slow-threads [running? config-atom] +(defn show-slow-threads [^ExecutorService pool config-atom] (try - (while @running? + (while (not (.isTerminated pool)) (try (do-show-slow-threads @config-atom) (catch Throwable t (log/error t "do-show-slow-threads crashed:" (ex-message t)))) (dotimes [_ 3] - (when @running? (Thread/sleep 1000)))) + (when (not (.isTerminated pool)) + (Thread/sleep 1000)))) + (log/debug "show-slow-threads exiting") (catch Throwable t (log/error t "reap! crashed:" (ex-message t))))) |
