diff options
| author | Ivar Refsdal <refsdal.ivar@gmail.com> | 2022-08-18 13:00:02 +0200 |
|---|---|---|
| committer | Ivar Refsdal <refsdal.ivar@gmail.com> | 2022-08-18 13:00:02 +0200 |
| commit | 812a07b3b9f2d212f80499433b638fb5b4a78f70 (patch) | |
| tree | f9f72f784ccf2a9cccaf255b84ac323861c277b5 /src | |
| parent | Release 0.2.59 (diff) | |
| download | fiinha-812a07b3b9f2d212f80499433b638fb5b4a78f70.tar.gz fiinha-812a07b3b9f2d212f80499433b638fb5b4a78f70.tar.xz | |
Release 0.2.60
Warn about not setting connection/socket-timeout when using clj-http https://github.com/ivarref/yoltq/issues/2
Add :healthy-allowed-error-time configuration option, default is 15 minutes
Diffstat (limited to 'src')
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 14 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/error_poller.clj | 36 |
2 files changed, 32 insertions, 18 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 32693c3..89112a6 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -44,6 +44,12 @@ ; otherwise occur if competing with the tx-report-queue listener. :init-backoff-time (Duration/ofSeconds 60) + ; If you are dealing with a flaky downstream service, you may not want + ; yoltq to mark itself as unhealthy on the first failure encounter with + ; the downstream service. Change this setting to let yoltq mark itself + ; as healthy even though a queue item has been failing for some time. + :healthy-allowed-error-time (Duration/ofMinutes 15) + ; How frequent polling for init, error and hung jobs should be done. :poll-delay (Duration/ofSeconds 10) @@ -259,10 +265,10 @@ (let [conn (d/connect uri) started-consuming? (promise) n 1] - (init! {:conn conn - :error-backoff-time (Duration/ofSeconds 1) - :poll-delay (Duration/ofSeconds 1) - :max-execute-time (Duration/ofSeconds 3) + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 1) + :poll-delay (Duration/ofSeconds 1) + :max-execute-time (Duration/ofSeconds 3) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index ee6359e..dffff28 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -1,8 +1,8 @@ (ns com.github.ivarref.yoltq.error-poller - (:require [datomic.api :as d] - [com.github.ivarref.yoltq.utils :as u] + (:require [clojure.tools.logging :as log] [com.github.ivarref.yoltq.ext-sys :as ext] - [clojure.tools.logging :as log])) + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d])) (defn get-state [v] @@ -64,31 +64,39 @@ {:run-callback :recovery})))))) -(defn do-poll-errors [{:keys [conn system-error +(defn do-poll-errors [{:keys [conn + system-error on-system-error on-system-recovery - healthy?] + healthy? + healthy-allowed-error-time] :or {on-system-error (fn [] (log/error "There are yoltq queues which have errors") nil) on-system-recovery (fn [] (log/info "Yoltq recovered"))} - :as config}] + :as config} + now-ms] (assert (some? conn) "expected :conn to be present") (assert (some? system-error) "expected :system-error to be present") - (let [error-count (or (d/q '[:find (count ?e) . - :in $ ?status + (assert (nat-int? healthy-allowed-error-time) "expected :healthy-allowed-error-time to be present") + (let [max-init-time (- now-ms healthy-allowed-error-time) + error-count (or (d/q '[:find (count ?e) . + :in $ ?status ?max-init-time :where - [?e :com.github.ivarref.yoltq/status ?status]] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(<= ?init-time ?max-init-time)]] (d/db conn) - u/status-error) + u/status-error + max-init-time) 0)] (if (pos-int? error-count) (do (log/debug "poll-errors found" error-count "errors in system") (reset! healthy? false)) (reset! healthy? true)) - (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)] + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config now-ms error-count)] (when run-callback (cond (= run-callback :error) (on-system-error) @@ -99,18 +107,18 @@ :else (log/error "unhandled callback-type" run-callback)) (log/debug "run-callback is" run-callback)) - new-state))) + error-count))) (defn poll-errors [running? config-atom] (try (when @running? - (do-poll-errors @config-atom)) + (do-poll-errors @config-atom (ext/now-ms))) (catch Throwable t (log/error t "unexpected error in poll-errors:" (ex-message t)) nil))) (comment - (do-poll-errors @com.github.ivarref.yoltq/*config*)) + (do-poll-errors @com.github.ivarref.yoltq/*config* (ext/now-ms))) |
