aboutsummaryrefslogtreecommitdiff
path: root/src/com
diff options
context:
space:
mode:
authorIvar Refsdal <refsdal.ivar@gmail.com>2023-03-20 12:24:00 +0100
committerIvar Refsdal <refsdal.ivar@gmail.com>2023-03-20 12:24:00 +0100
commite848610ac341db31b804644a7dfaaf98389469d5 (patch)
treec56f4e7d47e6c7315c65bfc0743b05cb41947688 /src/com
parentRelease 0.2.63: Add support for :encode and :decode function. Add :partition-... (diff)
downloadfiinha-e848610ac341db31b804644a7dfaaf98389469d5.tar.gz
fiinha-e848610ac341db31b804644a7dfaaf98389469d5.tar.xz
Release 0.2.64: Allow for infinitive retries
Diffstat (limited to 'src/com')
-rw-r--r--src/com/github/ivarref/yoltq.clj7
-rw-r--r--src/com/github/ivarref/yoltq/utils.clj17
2 files changed, 16 insertions, 8 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 9ffb3ad..379d701 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -24,7 +24,10 @@
(-> {; Default number of times a queue job will be retried before giving up
; Can be overridden on a per-consumer basis with
; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200})
- :max-retries 100
+ ; If you want no limit on the number of retries, specify
+ ; the value `0`. That will set the effective retry limit to
+ ; 9223372036854775807 times.
+ :max-retries 10000
; Minimum amount of time to wait before a failed queue job is retried
:error-backoff-time (Duration/ofSeconds 5)
@@ -244,7 +247,7 @@
(defn retry-one-error! [qname]
(let [{:keys [handlers] :as cfg} @*config*
_ (assert (contains? handlers qname) "Queue not found")
- cfg (assoc-in cfg [:handlers qname :max-retries] Integer/MAX_VALUE)]
+ cfg (assoc-in cfg [:handlers qname :max-retries] Long/MAX_VALUE)]
(poller/poll-once! cfg qname :error)))
(defn retry-stats
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
index 7665b6d..9defd0e 100644
--- a/src/com/github/ivarref/yoltq/utils.clj
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -103,13 +103,18 @@
(prepare-processing db id queue-name old-lock :init))
(log/debug "no new-items in :init status for queue" queue-name))))
+(defn- get-max-retries [cfg queue-name]
+ (let [v (get-in cfg [:handlers queue-name :max-retries] (:max-retries cfg))]
+ (if (and (number? v) (pos-int? v))
+ v
+ Long/MAX_VALUE)))
-(defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name]
+(defn get-error [{:keys [conn db error-backoff-time] :as cfg} queue-name]
(assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
(str (if (nil? conn) "nil" conn))
"\nConfig was: " (str cfg)))
(let [db (or db (d/db conn))
- max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)]
+ max-retries (get-max-retries cfg queue-name)]
(when-let [ids (->> (d/q '[:find ?id ?lock
:in $ ?queue-name ?backoff ?max-tries ?current-version
:where
@@ -118,26 +123,26 @@
[?e :com.github.ivarref.yoltq/error-time ?time]
[(>= ?backoff ?time)]
[?e :com.github.ivarref.yoltq/tries ?tries]
- [(> ?max-tries ?tries)]
+ [(>= ?max-tries ?tries)]
[?e :com.github.ivarref.yoltq/id ?id]
[?e :com.github.ivarref.yoltq/lock ?lock]
[?e :com.github.ivarref.yoltq/version ?current-version]]
db
queue-name
(- (now-ms) error-backoff-time)
- (inc max-retries)
+ max-retries
current-version)
(not-empty))]
(let [[id old-lock] (rand-nth (into [] ids))]
(prepare-processing db id queue-name old-lock :error)))))
-(defn get-hung [{:keys [conn db now hung-backoff-time max-retries] :as cfg} queue-name]
+(defn get-hung [{:keys [conn db now hung-backoff-time] :as cfg} queue-name]
(assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
(str (if (nil? conn) "nil" conn))
"\nConfig was: " (str cfg)))
(let [now (or now (now-ms))
- max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)
+ max-retries (get-max-retries cfg queue-name)
db (or db (d/db conn))]
(when-let [ids (->> (d/q '[:find ?id ?lock ?tries
:in $ ?qname ?backoff ?current-version