aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvar Refsdal <refsdal.ivar@gmail.com>2022-08-18 13:00:02 +0200
committerIvar Refsdal <refsdal.ivar@gmail.com>2022-08-18 13:00:02 +0200
commit812a07b3b9f2d212f80499433b638fb5b4a78f70 (patch)
treef9f72f784ccf2a9cccaf255b84ac323861c277b5 /src
parentRelease 0.2.59 (diff)
downloadfiinha-812a07b3b9f2d212f80499433b638fb5b4a78f70.tar.gz
fiinha-812a07b3b9f2d212f80499433b638fb5b4a78f70.tar.xz
Release 0.2.60
Warn about not setting connection/socket-timeout when using clj-http https://github.com/ivarref/yoltq/issues/2 Add :healthy-allowed-error-time configuration option, default is 15 minutes
Diffstat (limited to 'src')
-rw-r--r--src/com/github/ivarref/yoltq.clj14
-rw-r--r--src/com/github/ivarref/yoltq/error_poller.clj36
2 files changed, 32 insertions, 18 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 32693c3..89112a6 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -44,6 +44,12 @@
; otherwise occur if competing with the tx-report-queue listener.
:init-backoff-time (Duration/ofSeconds 60)
+ ; If you are dealing with a flaky downstream service, you may not want
+ ; yoltq to mark itself as unhealthy on the first failure encounter with
+ ; the downstream service. Change this setting to let yoltq mark itself
+ ; as healthy even though a queue item has been failing for some time.
+ :healthy-allowed-error-time (Duration/ofMinutes 15)
+
; How frequent polling for init, error and hung jobs should be done.
:poll-delay (Duration/ofSeconds 10)
@@ -259,10 +265,10 @@
(let [conn (d/connect uri)
started-consuming? (promise)
n 1]
- (init! {:conn conn
- :error-backoff-time (Duration/ofSeconds 1)
- :poll-delay (Duration/ofSeconds 1)
- :max-execute-time (Duration/ofSeconds 3)
+ (init! {:conn conn
+ :error-backoff-time (Duration/ofSeconds 1)
+ :poll-delay (Duration/ofSeconds 1)
+ :max-execute-time (Duration/ofSeconds 3)
:slow-thread-show-stacktrace? false})
(add-consumer! :q (fn [_]
(deliver started-consuming? true)
diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj
index ee6359e..dffff28 100644
--- a/src/com/github/ivarref/yoltq/error_poller.clj
+++ b/src/com/github/ivarref/yoltq/error_poller.clj
@@ -1,8 +1,8 @@
(ns com.github.ivarref.yoltq.error-poller
- (:require [datomic.api :as d]
- [com.github.ivarref.yoltq.utils :as u]
+ (:require [clojure.tools.logging :as log]
[com.github.ivarref.yoltq.ext-sys :as ext]
- [clojure.tools.logging :as log]))
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d]))
(defn get-state [v]
@@ -64,31 +64,39 @@
{:run-callback :recovery}))))))
-(defn do-poll-errors [{:keys [conn system-error
+(defn do-poll-errors [{:keys [conn
+ system-error
on-system-error
on-system-recovery
- healthy?]
+ healthy?
+ healthy-allowed-error-time]
:or {on-system-error (fn []
(log/error "There are yoltq queues which have errors")
nil)
on-system-recovery (fn []
(log/info "Yoltq recovered"))}
- :as config}]
+ :as config}
+ now-ms]
(assert (some? conn) "expected :conn to be present")
(assert (some? system-error) "expected :system-error to be present")
- (let [error-count (or (d/q '[:find (count ?e) .
- :in $ ?status
+ (assert (nat-int? healthy-allowed-error-time) "expected :healthy-allowed-error-time to be present")
+ (let [max-init-time (- now-ms healthy-allowed-error-time)
+ error-count (or (d/q '[:find (count ?e) .
+ :in $ ?status ?max-init-time
:where
- [?e :com.github.ivarref.yoltq/status ?status]]
+ [?e :com.github.ivarref.yoltq/status ?status]
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]
+ [(<= ?init-time ?max-init-time)]]
(d/db conn)
- u/status-error)
+ u/status-error
+ max-init-time)
0)]
(if (pos-int? error-count)
(do
(log/debug "poll-errors found" error-count "errors in system")
(reset! healthy? false))
(reset! healthy? true))
- (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)]
+ (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config now-ms error-count)]
(when run-callback
(cond (= run-callback :error)
(on-system-error)
@@ -99,18 +107,18 @@
:else
(log/error "unhandled callback-type" run-callback))
(log/debug "run-callback is" run-callback))
- new-state)))
+ error-count)))
(defn poll-errors [running? config-atom]
(try
(when @running?
- (do-poll-errors @config-atom))
+ (do-poll-errors @config-atom (ext/now-ms)))
(catch Throwable t
(log/error t "unexpected error in poll-errors:" (ex-message t))
nil)))
(comment
- (do-poll-errors @com.github.ivarref.yoltq/*config*))
+ (do-poll-errors @com.github.ivarref.yoltq/*config* (ext/now-ms)))