aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq
diff options
context:
space:
mode:
authorIvar Refsdal <refsdal.ivar@gmail.com>2022-08-15 08:37:39 +0200
committerIvar Refsdal <refsdal.ivar@gmail.com>2022-08-15 08:37:39 +0200
commit7d4477c318eefa711c7b7be46fd902419826e4c2 (patch)
tree12caa9727280296a87a3b35678e17468bc4b389f /src/com/github/ivarref/yoltq
parentRelease 0.2.58 (diff)
downloadfiinha-7d4477c318eefa711c7b7be46fd902419826e4c2.tar.gz
fiinha-7d4477c318eefa711c7b7be46fd902419826e4c2.tar.xz
Release 0.2.59
Fix slow thread watcher race condition when used with stop! https://github.com/ivarref/yoltq/issues/2
Diffstat (limited to 'src/com/github/ivarref/yoltq')
-rw-r--r--src/com/github/ivarref/yoltq/slow_executor_detector.clj38
1 files changed, 23 insertions, 15 deletions
diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
index 80d3718..53dfe89 100644
--- a/src/com/github/ivarref/yoltq/slow_executor_detector.clj
+++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
@@ -1,28 +1,36 @@
(ns com.github.ivarref.yoltq.slow-executor-detector
- (:require [com.github.ivarref.yoltq.ext-sys :as ext]
+ (:require [clojure.string :as str]
[clojure.tools.logging :as log]
- [clojure.string :as str]))
-
+ [com.github.ivarref.yoltq.ext-sys :as ext])
+ (:import (java.util.concurrent ExecutorService)))
(defn- do-show-slow-threads [{:keys [start-execute-time
- max-execute-time]}]
- (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time]
- (when (> (ext/now-ms) (+ start-time max-execute-time))
- (log/error "thread" (.getName thread) "spent too much time on"
- "queue item" (str queue-id)
- "for queue" queue-name
- "stacktrace: \n"
- (str/join "\n" (mapv str (seq (.getStackTrace thread))))))))
-
+ max-execute-time
+ slow?
+ slow-thread-show-stacktrace?]
+ :or {slow-thread-show-stacktrace? true}}]
+ (let [new-slow-val (atom false)]
+ (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time]
+ (when (> (ext/now-ms) (+ start-time max-execute-time))
+ (reset! new-slow-val true)
+ (log/error "thread" (.getName thread) "spent too much time on"
+ "queue item" (str queue-id)
+ "for queue" queue-name
+ (if slow-thread-show-stacktrace?
+ (str "stacktrace: \n" (str/join "\n" (mapv str (seq (.getStackTrace thread)))))
+ ""))))
+ (reset! slow? @new-slow-val)))
-(defn show-slow-threads [running? config-atom]
+(defn show-slow-threads [^ExecutorService pool config-atom]
(try
- (while @running?
+ (while (not (.isTerminated pool))
(try
(do-show-slow-threads @config-atom)
(catch Throwable t
(log/error t "do-show-slow-threads crashed:" (ex-message t))))
(dotimes [_ 3]
- (when @running? (Thread/sleep 1000))))
+ (when (not (.isTerminated pool))
+ (Thread/sleep 1000))))
+ (log/debug "show-slow-threads exiting")
(catch Throwable t
(log/error t "reap! crashed:" (ex-message t)))))