From ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sat, 4 Sep 2021 13:23:07 +0200 Subject: Initial commit Add release script Release 0.1.3 Use com.github.ivarref.yoltq namespace Use com.github.ivarref.yoltq namespace --- src/com/github/ivarref/yoltq/error_poller.clj | 109 ++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 src/com/github/ivarref/yoltq/error_poller.clj (limited to 'src/com/github/ivarref/yoltq/error_poller.clj') diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj new file mode 100644 index 0000000..77339f7 --- /dev/null +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -0,0 +1,109 @@ +(ns com.github.ivarref.yoltq.error-poller + (:require [datomic.api :as d] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.tools.logging :as log])) + + +(defn get-state [v] + (case v + [:error :none] :recovery + [:error :some] :error + [:error :all] :error + [:recovery :none] :recovery + [:recovery :some] :recovery + [:recovery :all] :error + nil)) + + +(defn handle-error-count [{:keys [errors last-notify state] + :or {errors [] + last-notify 0 + state :recovery}} + {:keys [system-error-min-count system-error-callback-backoff] + :or {system-error-min-count 3}} + now-ns + error-count] + (let [new-errors (->> (conj errors error-count) + (take-last system-error-min-count) + (vec)) + classify (fn [coll] + (cond + (not= system-error-min-count (count coll)) + :missing + + (every? pos-int? coll) + :all + + (every? zero? coll) + :none + + :else + :some)) + old-state state] + (merge + {:errors new-errors + :last-notify last-notify} + (when-let [new-state (get-state [old-state (classify new-errors)])] + (merge + {:state new-state} + (when (and (= old-state :recovery) + (= new-state :error)) + {:run-callback :error + :last-notify now-ns}) + + (when (and (= new-state :error) + (= old-state :error) + (> now-ns + (+ last-notify system-error-callback-backoff))) + {:run-callback :error + :last-notify now-ns}) + + (when (and (= new-state :recovery) + (= old-state :error)) + {:run-callback :recovery})))))) + + +(defn do-poll-errors [{:keys [conn system-error + on-system-error + on-system-recovery] + :or {on-system-error (fn [] nil) + on-system-recovery (fn [] nil)} + :as config}] + (assert (some? conn) "expected :conn to be present") + (assert (some? system-error) "expected :system-error to be present") + (let [error-count (or (d/q '[:find (count ?e) . + :in $ ?status + :where + [?e :com.github.ivarref.yoltq/status ?status]] + (d/db conn) + u/status-error) + 0)] + (when (pos-int? error-count) + (log/debug "poll-errors found" error-count "errors in system")) + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] + (when run-callback + (cond (= run-callback :error) + (on-system-error) + + (= run-callback :recovery) + (on-system-recovery) + + :else + (log/error "unhandled callback-type" run-callback)) + (log/debug "run-callback is" run-callback)) + new-state))) + + +(defn poll-errors [running? config-atom] + (try + (when @running? + (do-poll-errors @config-atom)) + (catch Throwable t + (log/error t "unexpected error in poll-erros:" (ex-message t)) + nil))) + + +(comment + (do-poll-errors @com.github.ivarref.yoltq/*config*)) + -- cgit v1.2.3 From b28837ea804fbc6abd14fae23a92933b9406d5e1 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sun, 27 Mar 2022 13:49:50 +0200 Subject: Add healthy?, queue-stats functions and default functions for :on-system-error and :on-system-recovery --- README.md | 30 +++++++++++++++++++- deps.edn | 8 ++++-- pom.xml | 4 +-- release.sh | 4 +-- src/com/github/ivarref/yoltq.clj | 32 ++++++++++++++++++++-- src/com/github/ivarref/yoltq/error_poller.clj | 19 +++++++++---- .../com/github/ivarref/yoltq/error_poller_test.clj | 2 +- 7 files changed, 80 insertions(+), 19 deletions(-) (limited to 'src/com/github/ivarref/yoltq/error_poller.clj') diff --git a/README.md b/README.md index 7e49431..f62d46c 100644 --- a/README.md +++ b/README.md @@ -331,6 +331,34 @@ These dynamic bindings will be in place when yoltq logs errors, warnings etc. about failing consumer functions, possibly making troubleshooting easier. +## Change log + +### 2022-03-27 v0.2.41 +``` + Added function `healthy?` that returns: + true if no errors + false if one or more errors + nil if error-poller is yet to be executed. + + Added default functions for `:on-system-error` and `:on-system-recovery` + that simply logs that the system is in error (ERROR level) or has + recovered (INFO level). + + Added function `queue-stats` that returns a nicely "formatted" + vector of queue stats, for example: + (queue-stats) + => + [{:qname :add-message-thread, :status :done, :count 10274} + {:qname :add-message-thread, :status :init, :count 30} + {:qname :add-message-thread, :status :processing, :count 1} + {:qname :send-message, :status :done, :count 21106} + {:qname :send-message, :status :init, :count 56}] +``` + +### 2021-09-27 v0.2.39: ? +### 2021-09-27 v0.2.37: ? + +### 2021-09-24 v0.2.33: First publicly announced release. ## License @@ -345,4 +373,4 @@ Licenses when the conditions for such availability set forth in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version, with the GNU Classpath Exception which is available -at https://www.gnu.org/software/classpath/license.html. \ No newline at end of file +at https://www.gnu.org/software/classpath/license.html. diff --git a/deps.edn b/deps.edn index a457628..d0f0a26 100644 --- a/deps.edn +++ b/deps.edn @@ -22,8 +22,10 @@ :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} - :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.1.3"}} - :main-opts ["-m" "deps-deploy.deps-deploy" "deploy" - "target/out.jar" "true"]}} + :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} + :exec-fn deps-deploy.deps-deploy/deploy + :exec-args {:installer :remote + :sign-releases? false + :artifact "target/out.jar"}}} :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}} diff --git a/pom.xml b/pom.xml index 9784836..e486fb1 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ jar com.github.ivarref yoltq - 0.2.39 + 0.2.40 yoltq @@ -30,7 +30,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.39 + v0.2.40 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/release.sh b/release.sh index 70f67b5..dec59a2 100755 --- a/release.sh +++ b/release.sh @@ -13,8 +13,6 @@ git commit -m "Release $VERSION" git tag -a v$VERSION -m "Release v$VERSION" git push --follow-tags -clojure -M:deploy +clojure -X:deploy echo "Released $VERSION" - -rm *.pom.asc \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 3164020..03a364f 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -17,7 +17,6 @@ (defonce ^:dynamic *running?* (atom false)) (defonce ^:dynamic *test-mode* false) - (def default-opts (-> {; Default number of times a queue job will be retried before giving up ; Can be overridden on a per consumer basis with @@ -79,7 +78,8 @@ (-> (merge-with (fn [a b] (or b a)) {:running-queues (atom #{}) :start-execute-time (atom {}) - :system-error (atom {})} + :system-error (atom {}) + :healthy? (atom nil)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) cfg) @@ -148,6 +148,32 @@ (reset! threadpool nil)))))) +(defn healthy? [] + (some->> @*config* + :healthy? + (deref))) + +(defn queue-stats [] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?status + :in $ + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status]] + db) + (mapv (partial zipmap [:e :qname :status])) + (mapv #(select-keys % [:qname :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname status]} v]] + (array-map + :qname qname + :status status + :count v))) + (sort-by (juxt :qname :status)) + (vec)))) + (comment (do (require 'com.github.ivarref.yoltq.log-init) @@ -177,4 +203,4 @@ (start!) (dotimes [x n] @(d/transact conn [(put :q {:work 123})])) - nil)))) \ No newline at end of file + nil)))) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index 77339f7..1268482 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -66,9 +66,13 @@ (defn do-poll-errors [{:keys [conn system-error on-system-error - on-system-recovery] - :or {on-system-error (fn [] nil) - on-system-recovery (fn [] nil)} + on-system-recovery + healthy?] + :or {on-system-error (fn [] + (log/error "There are yoltq queues which have errors") + nil) + on-system-recovery (fn [] + (log/info "Yoltq recovered"))} :as config}] (assert (some? conn) "expected :conn to be present") (assert (some? system-error) "expected :system-error to be present") @@ -79,8 +83,11 @@ (d/db conn) u/status-error) 0)] - (when (pos-int? error-count) - (log/debug "poll-errors found" error-count "errors in system")) + (if (pos-int? error-count) + (do + (log/debug "poll-errors found" error-count "errors in system") + (reset! healthy? false)) + (reset! healthy? true)) (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] (when run-callback (cond (= run-callback :error) @@ -100,7 +107,7 @@ (when @running? (do-poll-errors @config-atom)) (catch Throwable t - (log/error t "unexpected error in poll-erros:" (ex-message t)) + (log/error t "unexpected error in poll-errors:" (ex-message t)) nil))) diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj index 2e0873e..18f0aa7 100644 --- a/test/com/github/ivarref/yoltq/error_poller_test.clj +++ b/test/com/github/ivarref/yoltq/error_poller_test.clj @@ -1,5 +1,5 @@ (ns com.github.ivarref.yoltq.error-poller-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [deftest is]] [com.github.ivarref.yoltq.error-poller :as ep] [clojure.tools.logging :as log] [com.github.ivarref.yoltq.log-init :as logconfig] -- cgit v1.2.3 From 6c26a3b6871286510bb8e9770ee7f7e3abf97abe Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sun, 27 Mar 2022 18:39:44 +0200 Subject: Start use current millis in the database, not nano offset --- .gitignore | 3 +- README.md | 38 ++++++++++++++-------- deps.edn | 2 +- src/com/github/ivarref/yoltq.clj | 8 ++--- src/com/github/ivarref/yoltq/error_poller.clj | 10 +++--- src/com/github/ivarref/yoltq/ext_sys.clj | 13 ++++---- src/com/github/ivarref/yoltq/impl.clj | 14 ++++---- src/com/github/ivarref/yoltq/poller.clj | 19 +++++++---- .../ivarref/yoltq/slow_executor_detector.clj | 4 +-- src/com/github/ivarref/yoltq/test_queue.clj | 2 +- src/com/github/ivarref/yoltq/utils.clj | 19 ++++++----- test/com/github/ivarref/yoltq/test_utils.clj | 9 ++--- 12 files changed, 81 insertions(+), 60 deletions(-) (limited to 'src/com/github/ivarref/yoltq/error_poller.clj') diff --git a/.gitignore b/.gitignore index cb9a7ca..c82fdd7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ tree.txt .stage-url.txt *.pom.asc *.pom -temp/ \ No newline at end of file +temp/ +.clj-kondo/ diff --git a/README.md b/README.md index 45ba8c4..9c5669c 100644 --- a/README.md +++ b/README.md @@ -333,12 +333,21 @@ easier. ## Change log -### 2022-03-27 [v0.2.41](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) +### 20..-..-.. vHEAD [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...HEAD) +* Critical bugfix. +``` +Started using (System/currentTimeMillis) and not (System/nanoTime) +when storing time in the database. +``` + +* Bump Clojure to `1.11.0`. + +### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) * Added function `healthy?` that returns: ``` - true if no errors - false if one or more errors - nil if error-poller is yet to be executed. + true if no errors + false if one or more errors + nil if error-poller is yet to be executed. ``` * Added default functions for `:on-system-error` and `:on-system-recovery` @@ -348,22 +357,23 @@ easier. * Added function `queue-stats` that returns a nicely "formatted" vector of queue stats, for example: ``` -(queue-stats) -=> -[{:qname :add-message-thread, :status :done, :count 10274} - {:qname :add-message-thread, :status :init, :count 30} - {:qname :add-message-thread, :status :processing, :count 1} - {:qname :send-message, :status :done, :count 21106} - {:qname :send-message, :status :init, :count 56}] + (queue-stats) + => + [{:qname :add-message-thread, :status :done, :count 10274} + {:qname :add-message-thread, :status :init, :count 30} + {:qname :add-message-thread, :status :processing, :count 1} + {:qname :send-message, :status :done, :count 21106} + {:qname :send-message, :status :init, :count 56}] ``` -### 2021-09-27 [v0.2.39](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) +### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) Added `:valid-payload?` option for queue consumers. -### 2021-09-27 [v0.2.37](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) +### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) Improved error reporting. -### 2021-09-24 v0.2.33: First publicly announced release. +### 2021-09-24 v0.2.33 +First publicly announced release. ## License diff --git a/deps.edn b/deps.edn index d0f0a26..8e769e1 100644 --- a/deps.edn +++ b/deps.edn @@ -1,5 +1,5 @@ {:deps {org.clojure/tools.logging {:mvn/version "1.1.0"} - org.clojure/clojure {:mvn/version "1.10.3"}} + org.clojure/clojure {:mvn/version "1.11.0"}} :paths ["src"] diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 03a364f..17aa40a 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -19,7 +19,7 @@ (def default-opts (-> {; Default number of times a queue job will be retried before giving up - ; Can be overridden on a per consumer basis with + ; Can be overridden on a per-consumer basis with ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200}) :max-retries 100 @@ -34,7 +34,7 @@ :hung-backoff-time (Duration/ofMinutes 30) ; Most queue jobs in init state will be consumed by the tx-report-queue listener. - ; However in the case where a init job was added right before the application + ; However, in the case where an init job was added right before the application ; was shut down and did not have time to be processed by the tx-report-queue listener, ; it will be consumer by the init poller. This init poller backs off by ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could @@ -66,7 +66,7 @@ ; How often should the system invoke :system-error-callback-backoff (Duration/ofHours 1)} - u/duration->nanos)) + u/duration->millis)) (defn init! [{:keys [conn] :as cfg}] @@ -83,7 +83,7 @@ default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) cfg) - u/duration->nanos)))] + u/duration->millis)))] new-cfg))) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index 1268482..ee6359e 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -22,7 +22,7 @@ state :recovery}} {:keys [system-error-min-count system-error-callback-backoff] :or {system-error-min-count 3}} - now-ns + now-ms error-count] (let [new-errors (->> (conj errors error-count) (take-last system-error-min-count) @@ -50,14 +50,14 @@ (when (and (= old-state :recovery) (= new-state :error)) {:run-callback :error - :last-notify now-ns}) + :last-notify now-ms}) (when (and (= new-state :error) (= old-state :error) - (> now-ns + (> now-ms (+ last-notify system-error-callback-backoff))) {:run-callback :error - :last-notify now-ns}) + :last-notify now-ms}) (when (and (= new-state :recovery) (= old-state :error)) @@ -88,7 +88,7 @@ (log/debug "poll-errors found" error-count "errors in system") (reset! healthy? false)) (reset! healthy? true)) - (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)] (when run-callback (cond (= run-callback :error) (on-system-error) diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj index 3480475..692b934 100644 --- a/src/com/github/ivarref/yoltq/ext_sys.clj +++ b/src/com/github/ivarref/yoltq/ext_sys.clj @@ -1,17 +1,18 @@ (ns com.github.ivarref.yoltq.ext-sys (:require [datomic.api :as d]) + (:refer-clojure :exclude [random-uuid]) (:import (java.util UUID))) -(def ^:dynamic *now-ns-atom* nil) +(def ^:dynamic *now-ms-atom* nil) (def ^:dynamic *squuid-atom* nil) (def ^:dynamic *random-atom* nil) -(defn now-ns [] - (if *now-ns-atom* - @*now-ns-atom* - (System/nanoTime))) +(defn now-ms [] + (if *now-ms-atom* + @*now-ms-atom* + (System/currentTimeMillis))) (defn squuid [] @@ -23,4 +24,4 @@ (defn random-uuid [] (if *random-atom* (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc)))) - (UUID/randomUUID))) \ No newline at end of file + (UUID/randomUUID))) diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 8b75fc3..b4eef8d 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -63,7 +63,7 @@ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ns)} + :com.github.ivarref.yoltq/init-time (u/now-ms)} (when-let [[q ext-id] (:depends-on opts)] (when-not (d/q '[:find ?e . :in $ ?ext-id @@ -138,8 +138,8 @@ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status] (if (= new-status u/status-done) - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)} - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)} + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})] start-time (System/nanoTime) {:keys [db-after]} @(d/transact conn tx)] (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) @@ -171,7 +171,7 @@ (log/debug "queue item" (str id) "for queue" queue-name "is now processing") (let [{:keys [retval exception]} (try - (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name]) + (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) (let [v (f payload)] {:retval v}) (catch Throwable t @@ -188,7 +188,7 @@ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :retval retval :success? true :allow-cas-failure? true))) (some? exception) @@ -198,14 +198,14 @@ (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :exception exception))) :else (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :retval retval :success? true)))))) (do (log/error "no handler for queue" queue-name) diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj index 28b158f..9cf81c7 100644 --- a/src/com/github/ivarref/yoltq/poller.clj +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -25,17 +25,16 @@ (if-not (contains? old q) (try (log/debug "polling queue" queue-name "for status" status) - (let [start-time (u/now-ns) + (let [start-time (u/now-ms) last-res (loop [prev-res nil] (when @running? (let [res (poll-once! cfg queue-name status)] + (log/debug "poll-once! returned" res) (if (and res (:success? res)) (recur res) prev-res))))] - (let [spent-ns (- (u/now-ns) start-time)] - (log/trace "done polling queue" q "in" - (format "%.1f" (double (/ spent-ns 1e6))) - "ms")) + (let [spent-ms (- (u/now-ms) start-time)] + (log/trace "done polling queue" q "in" spent-ms "ms")) last-res) (finally (swap! running-queues disj q))) @@ -44,6 +43,14 @@ (log/error t "poll-queue! crashed:" (ex-message t))) (finally))) +(comment + (def cfg @com.github.ivarref.yoltq/*config*)) + +(comment + (poll-queue! + (atom true) + @com.github.ivarref.yoltq/*config* + [:add-message-thread :init])) (defn poll-all-queues! [running? config-atom pool] (try @@ -54,4 +61,4 @@ [q-name status])))] (.execute pool (fn [] (poll-queue! running? @config-atom q)))))) (catch Throwable t - (log/error t "poll-all-queues! crashed:" (ex-message t))))) \ No newline at end of file + (log/error t "poll-all-queues! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj index f15ef7d..80d3718 100644 --- a/src/com/github/ivarref/yoltq/slow_executor_detector.clj +++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj @@ -7,7 +7,7 @@ (defn- do-show-slow-threads [{:keys [start-execute-time max-execute-time]}] (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time] - (when (> (ext/now-ns) (+ start-time max-execute-time)) + (when (> (ext/now-ms) (+ start-time max-execute-time)) (log/error "thread" (.getName thread) "spent too much time on" "queue item" (str queue-id) "for queue" queue-name @@ -25,4 +25,4 @@ (dotimes [_ 3] (when @running? (Thread/sleep 1000)))) (catch Throwable t - (log/error t "reap! crashed:" (ex-message t))))) \ No newline at end of file + (log/error t "reap! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index 6183216..ee9cd54 100644 --- a/src/com/github/ivarref/yoltq/test_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -47,7 +47,7 @@ (with-bindings {#'yq/*config* config# #'yq/*running?* (atom false) #'yq/*test-mode* true - #'ext/*now-ns-atom* (atom 0) + #'ext/*now-ms-atom* (atom 0) #'ext/*random-atom* (atom 0) #'ext/*squuid-atom* (atom 0)} (try diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index d551510..ad2444a 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -3,6 +3,7 @@ [clojure.edn :as edn] [com.github.ivarref.yoltq.ext-sys :as ext] [clojure.tools.logging :as log]) + (:refer-clojure :exclude [random-uuid]) (:import (datomic Connection) (java.time Duration))) @@ -13,10 +14,10 @@ (def status-error :error) -(defn duration->nanos [m] +(defn duration->millis [m] (reduce-kv (fn [o k v] (if (instance? Duration v) - (assoc o k (.toNanos v)) + (assoc o k (.toMillis v)) (assoc o k v))) {} m)) @@ -30,8 +31,8 @@ (ext/random-uuid)) -(defn now-ns [] - (ext/now-ns)) +(defn now-ms [] + (ext/now-ms)) (defn root-cause [e] @@ -75,7 +76,7 @@ :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]})) + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]})) (defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name] @@ -94,11 +95,11 @@ [?e :com.github.ivarref.yoltq/lock ?lock]] db queue-name - (- (now-ns) init-backoff-time)) + (- (now-ms) init-backoff-time)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] (prepare-processing db id queue-name old-lock :init)) - (log/trace "no new-items in :init status for queue" queue-name)))) + (log/debug "no new-items in :init status for queue" queue-name)))) (defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name] @@ -120,7 +121,7 @@ [?e :com.github.ivarref.yoltq/lock ?lock]] db queue-name - (- (now-ns) error-backoff-time) + (- (now-ms) error-backoff-time) (inc max-retries)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] @@ -131,7 +132,7 @@ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) - (let [now (or now (now-ns)) + (let [now (or now (now-ms)) max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries) db (or db (d/db conn))] (when-let [ids (->> (d/q '[:find ?id ?lock ?tries diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index 5427ff5..e4151c2 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -8,7 +8,8 @@ [com.github.ivarref.yoltq.impl :as i] [clojure.edn :as edn] [com.github.ivarref.yoltq.ext-sys :as ext]) - (:import (java.util UUID))) + (:import (java.util UUID) + (java.time Duration))) (logconfig/init-logging! @@ -39,10 +40,10 @@ (defn advance! [tp] - (assert (some? ext/*now-ns-atom*) "Expected to be running in test-mode!") - (swap! ext/*now-ns-atom* + (if (number? tp) + (assert (some? ext/*now-ms-atom*) "Expected to be running in test-mode!") + (swap! ext/*now-ms-atom* + (if (number? tp) tp - (.toNanos tp)))) + (.toMillis ^Duration tp)))) (defn done-count [] -- cgit v1.2.3 From 812a07b3b9f2d212f80499433b638fb5b4a78f70 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 18 Aug 2022 13:00:02 +0200 Subject: Release 0.2.60 Warn about not setting connection/socket-timeout when using clj-http https://github.com/ivarref/yoltq/issues/2 Add :healthy-allowed-error-time configuration option, default is 15 minutes --- README.md | 28 +++++++++++++++-- pom.xml | 4 +-- src/com/github/ivarref/yoltq.clj | 14 ++++++--- src/com/github/ivarref/yoltq/error_poller.clj | 36 +++++++++++++--------- .../com/github/ivarref/yoltq/error_poller_test.clj | 8 ++--- test/com/github/ivarref/yoltq/virtual_test.clj | 15 ++++++++- 6 files changed, 77 insertions(+), 28 deletions(-) (limited to 'src/com/github/ivarref/yoltq/error_poller.clj') diff --git a/README.md b/README.md index ade8650..05e7033 100644 --- a/README.md +++ b/README.md @@ -62,18 +62,25 @@ Imagine the following code: ```clojure (defn post-handler [user-input] (let [db-item (process user-input) - ext-ref (clj-http.client/post ext-service {...})] ; may throw exception + ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds + :socket-timeout 10000 ; timeout in milliseconds + ...})] ; may throw exception @(d/transact conn [(assoc db-item :some/ext-ref ext-ref)]))) ``` What if the POST request fails? Should it be retried? For how long? Should it be allowed to fail? How do you then process failures later? +PS: If you do not set connection/socket-timeout, there is a chance that +clj-http/client will wait for all eternity in the case of a dropped TCP connection. + The queue way to solve this would be: ```clojure (defn get-ext-ref [{:keys [id]}] - (let [ext-ref (clj-http.client/post ext-service {...})] ; may throw exception + (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds + :socket-timeout 10000 ; timeout in milliseconds + ...})] ; may throw exception @(d/transact conn [[:db/cas [:some/id id] :some/ext-ref nil @@ -82,7 +89,7 @@ The queue way to solve this would be: (yq/add-consumer! :get-ext-ref get-ext-ref {:allow-cas-failure? true}) (defn post-handler [user-input] - (let [{:some/keys [id] :as db-item} (process user-input) + (let [{:some/keys [id] :as db-item} (process user-input)] @(d/transact conn [db-item (yq/put :get-ext-ref {:id id})]))) ``` @@ -371,6 +378,21 @@ Note: I have not tried these libraries myself. ## Change log +#### 2022-08-18 v0.2.60 [diff](https://github.com/ivarref/yoltq/compare/v0.2.59...v0.2.60) +Improved: Added config option `:healthy-allowed-error-time`: +``` + ; If you are dealing with a flaky downstream service, you may not want + ; yoltq to mark itself as unhealthy on the first failure encounter with + ; the downstream service. Change this setting to let yoltq mark itself + ; as healthy even though a queue item has been failing for some time. + :healthy-allowed-error-time (Duration/ofMinutes 15) +``` + +#### 2022-08-15 v0.2.59 [diff](https://github.com/ivarref/yoltq/compare/v0.2.58...v0.2.59) +Fixed: +* Race condition that made the following possible: `stop!` would terminate the slow thread +watcher, and a stuck thread could keep `stop!` from completing! + #### 2022-06-30 v0.2.58 [diff](https://github.com/ivarref/yoltq/compare/v0.2.57...v0.2.58) Slightly more safe EDN printing and parsing. Recommended reading: diff --git a/pom.xml b/pom.xml index 187b8ad..719b0e7 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ jar com.github.ivarref yoltq - 0.2.59 + 0.2.60 yoltq @@ -30,7 +30,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.59 + v0.2.60 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 32693c3..89112a6 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -44,6 +44,12 @@ ; otherwise occur if competing with the tx-report-queue listener. :init-backoff-time (Duration/ofSeconds 60) + ; If you are dealing with a flaky downstream service, you may not want + ; yoltq to mark itself as unhealthy on the first failure encounter with + ; the downstream service. Change this setting to let yoltq mark itself + ; as healthy even though a queue item has been failing for some time. + :healthy-allowed-error-time (Duration/ofMinutes 15) + ; How frequent polling for init, error and hung jobs should be done. :poll-delay (Duration/ofSeconds 10) @@ -259,10 +265,10 @@ (let [conn (d/connect uri) started-consuming? (promise) n 1] - (init! {:conn conn - :error-backoff-time (Duration/ofSeconds 1) - :poll-delay (Duration/ofSeconds 1) - :max-execute-time (Duration/ofSeconds 3) + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 1) + :poll-delay (Duration/ofSeconds 1) + :max-execute-time (Duration/ofSeconds 3) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index ee6359e..dffff28 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -1,8 +1,8 @@ (ns com.github.ivarref.yoltq.error-poller - (:require [datomic.api :as d] - [com.github.ivarref.yoltq.utils :as u] + (:require [clojure.tools.logging :as log] [com.github.ivarref.yoltq.ext-sys :as ext] - [clojure.tools.logging :as log])) + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d])) (defn get-state [v] @@ -64,31 +64,39 @@ {:run-callback :recovery})))))) -(defn do-poll-errors [{:keys [conn system-error +(defn do-poll-errors [{:keys [conn + system-error on-system-error on-system-recovery - healthy?] + healthy? + healthy-allowed-error-time] :or {on-system-error (fn [] (log/error "There are yoltq queues which have errors") nil) on-system-recovery (fn [] (log/info "Yoltq recovered"))} - :as config}] + :as config} + now-ms] (assert (some? conn) "expected :conn to be present") (assert (some? system-error) "expected :system-error to be present") - (let [error-count (or (d/q '[:find (count ?e) . - :in $ ?status + (assert (nat-int? healthy-allowed-error-time) "expected :healthy-allowed-error-time to be present") + (let [max-init-time (- now-ms healthy-allowed-error-time) + error-count (or (d/q '[:find (count ?e) . + :in $ ?status ?max-init-time :where - [?e :com.github.ivarref.yoltq/status ?status]] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(<= ?init-time ?max-init-time)]] (d/db conn) - u/status-error) + u/status-error + max-init-time) 0)] (if (pos-int? error-count) (do (log/debug "poll-errors found" error-count "errors in system") (reset! healthy? false)) (reset! healthy? true)) - (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)] + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config now-ms error-count)] (when run-callback (cond (= run-callback :error) (on-system-error) @@ -99,18 +107,18 @@ :else (log/error "unhandled callback-type" run-callback)) (log/debug "run-callback is" run-callback)) - new-state))) + error-count))) (defn poll-errors [running? config-atom] (try (when @running? - (do-poll-errors @config-atom)) + (do-poll-errors @config-atom (ext/now-ms))) (catch Throwable t (log/error t "unexpected error in poll-errors:" (ex-message t)) nil))) (comment - (do-poll-errors @com.github.ivarref.yoltq/*config*)) + (do-poll-errors @com.github.ivarref.yoltq/*config* (ext/now-ms))) diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj index 18f0aa7..4d92b81 100644 --- a/test/com/github/ivarref/yoltq/error_poller_test.clj +++ b/test/com/github/ivarref/yoltq/error_poller_test.clj @@ -1,9 +1,9 @@ (ns com.github.ivarref.yoltq.error-poller-test - (:require [clojure.test :refer [deftest is]] - [com.github.ivarref.yoltq.error-poller :as ep] + (:require [clojure.edn :as edn] + [clojure.test :refer [deftest is]] [clojure.tools.logging :as log] - [com.github.ivarref.yoltq.log-init :as logconfig] - [clojure.edn :as edn])) + [com.github.ivarref.yoltq.error-poller :as ep] + [com.github.ivarref.yoltq.log-init :as logconfig])) (deftest error-poller diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index e077517..996792e 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -2,6 +2,8 @@ (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] [clojure.tools.logging :as log] [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.error-poller :as error-poller] + [com.github.ivarref.yoltq.ext-sys :as ext] [com.github.ivarref.yoltq.impl :as i] [com.github.ivarref.yoltq.migrate :as migrate] [com.github.ivarref.yoltq.test-queue :as tq] @@ -9,7 +11,8 @@ [com.github.ivarref.yoltq.utils :as uu] [datomic-schema.core] [datomic.api :as d] - [taoensso.timbre :as timbre])) + [taoensso.timbre :as timbre]) + (:import (java.time Duration))) (use-fixtures :each tq/call-with-virtual-queue!) @@ -367,3 +370,13 @@ (is (= #{{:id "a"}} @received)) #_(timbre/with-level :fatal (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + +(deftest healthy-allowed-error-time-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (throw (ex-info "" {})))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume-expect! :q :error) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms)))) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms))))) + (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms))))))) -- cgit v1.2.3