aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml4
-rw-r--r--src/com/github/ivarref/yoltq.clj121
-rw-r--r--src/com/github/ivarref/yoltq/slow_executor_detector.clj38
3 files changed, 97 insertions, 66 deletions
diff --git a/pom.xml b/pom.xml
index cb293b7..187b8ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
<packaging>jar</packaging>
<groupId>com.github.ivarref</groupId>
<artifactId>yoltq</artifactId>
- <version>0.2.58</version>
+ <version>0.2.59</version>
<name>yoltq</name>
<dependencies>
<dependency>
@@ -30,7 +30,7 @@
<scm>
<connection>scm:git:git://github.com/ivarref/yoltq.git</connection>
<developerConnection>scm:git:ssh://git@github.com/ivarref/yoltq.git</developerConnection>
- <tag>v0.2.58</tag>
+ <tag>v0.2.59</tag>
<url>https://github.com/ivarref/yoltq</url>
</scm>
</project> \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index f4c2bf7..32693c3 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -1,17 +1,18 @@
(ns com.github.ivarref.yoltq
- (:require [clojure.tools.logging :as log]
- [com.github.ivarref.yoltq.error-poller :as errpoller]
- [com.github.ivarref.yoltq.impl :as i]
- [com.github.ivarref.yoltq.migrate :as migrate]
- [com.github.ivarref.yoltq.poller :as poller]
- [com.github.ivarref.yoltq.report-queue :as rq]
- [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
- [com.github.ivarref.yoltq.utils :as u]
- [datomic.api :as d])
+ (:require
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.error-poller :as errpoller]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq.migrate :as migrate]
+ [com.github.ivarref.yoltq.poller :as poller]
+ [com.github.ivarref.yoltq.report-queue :as rq]
+ [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d])
(:import (datomic Connection)
+ (java.lang.management ManagementFactory)
(java.time Duration)
- (java.util.concurrent ExecutorService Executors TimeUnit)
- (java.lang.management ManagementFactory)))
+ (java.util.concurrent ExecutorService Executors TimeUnit)))
(defonce ^:dynamic *config* (atom nil))
@@ -70,7 +71,7 @@
; Should old, possibly stalled jobs be automatically be migrated
; as part of `start!`?
- :auto-migrate? true}
+ :auto-migrate? true}
u/duration->millis))
@@ -82,10 +83,12 @@
(let [new-cfg (swap! *config*
(fn [old-conf]
(-> (merge-with (fn [_ b] b)
- {:running-queues (atom #{})
- :start-execute-time (atom {})
- :system-error (atom {})
- :healthy? (atom nil)}
+ {:running-queues (atom #{})
+ :start-execute-time (atom {})
+ :system-error (atom {})
+ :healthy? (atom nil)
+ :slow? (atom nil)
+ :slow-thread-watcher-done? (promise)}
default-opts
(if *test-mode* old-conf (select-keys old-conf [:handlers]))
cfg)
@@ -122,17 +125,19 @@
(defn- do-start! []
- (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate?] :as cfg} @*config*]
+ (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate? slow-thread-watcher-done?] :as cfg} @*config*]
(when auto-migrate?
(future (migrate/migrate! cfg)))
- (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size)))
- (let [pool @threadpool
+ (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size)))
queue-listener-ready (promise)]
(reset! *running?* true)
(.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
(.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
(.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*)))
- (.execute pool (fn [] (slow-executor/show-slow-threads *running?* *config*)))
+ (future (try
+ (slow-executor/show-slow-threads pool *config*)
+ (finally
+ (deliver slow-thread-watcher-done? :done))))
@queue-listener-ready)))
@@ -160,23 +165,38 @@
(do
(reset! *running?* false)
(when-let [^ExecutorService tp @threadpool]
- (log/debug "shutting down old threadpool")
+ (log/debug "shutting down threadpool")
(.shutdown tp)
(while (not (.awaitTermination tp 1 TimeUnit/SECONDS))
- (log/debug "waiting for threadpool to stop"))
+ (log/trace "waiting for threadpool to stop"))
(log/debug "stopped!")
- (reset! threadpool nil))))))
+ (reset! threadpool nil))
+ (when-let [wait-slow-threads (some->> *config* deref :slow-thread-watcher-done?)]
+ (log/debug "waiting for slow-thread-watcher to stop ...")
+ @wait-slow-threads
+ (log/debug "waiting for slow-thread-watcher to stop ... OK"))))))
(defn healthy? []
- (or
+ (cond
(< (.toMinutes (Duration/ofMillis (.getUptime (ManagementFactory/getRuntimeMXBean)))) 10)
- (some->> @*config*
- :healthy?
- (deref))))
+ true
+
+ (false? (some->> @*config*
+ :healthy?
+ (deref)))
+ false
+
+ (true? (some->> @*config*
+ :slow?
+ (deref)))
+ false
+
+ :else
+ true))
(defn unhealthy?
- "Returns `true` if there are queues in error and application has been up for over 10 minutes, otherwise `false`."
+ "Returns `true` if there are queues in error or a thread is slow, and the application has been up for over 10 minutes, otherwise `false`."
[]
(false? (healthy?)))
@@ -228,26 +248,29 @@
[[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
[#{"ivarref.yoltq.report-queue"} :info]
[#{"ivarref.yoltq.poller"} :info]
- [#{"ivarref.yoltq*"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
[#{"*"} :info]])
(stop!)
- (let [received (atom [])
- uri (str "datomic:mem://demo")]
- (d/delete-database uri)
- (d/create-database uri)
- (let [ok-items (atom [])
- conn (d/connect uri)
- n 1]
- (init! {:conn conn
- :error-backoff-time (Duration/ofSeconds 1)
- :poll-delay (Duration/ofSeconds 1)})
- (add-consumer! :q (fn [payload]
- #_(when (> (Math/random) 0.5)
- (throw (ex-info "oops" {})))
- (if (= n (count (swap! received conj (:work payload))))
- (log/info "... and we are done!")
- (log/info "got payload" payload "total ok:" (count @received)))))
- (start!)
- (dotimes [x n]
- @(d/transact conn [(put :q {:work 123})]))
- nil))))
+ (future (let [received (atom [])
+ uri (str "datomic:mem://demo")]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)
+ started-consuming? (promise)
+ n 1]
+ (init! {:conn conn
+ :error-backoff-time (Duration/ofSeconds 1)
+ :poll-delay (Duration/ofSeconds 1)
+ :max-execute-time (Duration/ofSeconds 3)
+ :slow-thread-show-stacktrace? false})
+ (add-consumer! :q (fn [_]
+ (deliver started-consuming? true)
+ (log/info "sleeping...")
+ (Thread/sleep (.toMillis (Duration/ofSeconds 60)))
+ (log/info "done sleeping")))
+ (start!)
+ @(d/transact conn [(put :q {:work 123})])
+ @started-consuming?
+ (stop!)
+ nil)))))
diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
index 80d3718..53dfe89 100644
--- a/src/com/github/ivarref/yoltq/slow_executor_detector.clj
+++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
@@ -1,28 +1,36 @@
(ns com.github.ivarref.yoltq.slow-executor-detector
- (:require [com.github.ivarref.yoltq.ext-sys :as ext]
+ (:require [clojure.string :as str]
[clojure.tools.logging :as log]
- [clojure.string :as str]))
-
+ [com.github.ivarref.yoltq.ext-sys :as ext])
+ (:import (java.util.concurrent ExecutorService)))
(defn- do-show-slow-threads [{:keys [start-execute-time
- max-execute-time]}]
- (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time]
- (when (> (ext/now-ms) (+ start-time max-execute-time))
- (log/error "thread" (.getName thread) "spent too much time on"
- "queue item" (str queue-id)
- "for queue" queue-name
- "stacktrace: \n"
- (str/join "\n" (mapv str (seq (.getStackTrace thread))))))))
-
+ max-execute-time
+ slow?
+ slow-thread-show-stacktrace?]
+ :or {slow-thread-show-stacktrace? true}}]
+ (let [new-slow-val (atom false)]
+ (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time]
+ (when (> (ext/now-ms) (+ start-time max-execute-time))
+ (reset! new-slow-val true)
+ (log/error "thread" (.getName thread) "spent too much time on"
+ "queue item" (str queue-id)
+ "for queue" queue-name
+ (if slow-thread-show-stacktrace?
+ (str "stacktrace: \n" (str/join "\n" (mapv str (seq (.getStackTrace thread)))))
+ ""))))
+ (reset! slow? @new-slow-val)))
-(defn show-slow-threads [running? config-atom]
+(defn show-slow-threads [^ExecutorService pool config-atom]
(try
- (while @running?
+ (while (not (.isTerminated pool))
(try
(do-show-slow-threads @config-atom)
(catch Throwable t
(log/error t "do-show-slow-threads crashed:" (ex-message t))))
(dotimes [_ 3]
- (when @running? (Thread/sleep 1000))))
+ (when (not (.isTerminated pool))
+ (Thread/sleep 1000))))
+ (log/debug "show-slow-threads exiting")
(catch Throwable t
(log/error t "reap! crashed:" (ex-message t)))))