diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq/utils.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq/utils.clj | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index ad2444a..39572a9 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -13,6 +13,7 @@ (def status-done :done) (def status-error :error) +(def current-version "2") (defn duration->millis [m] (reduce-kv (fn [o k v] @@ -85,17 +86,19 @@ "\nConfig was: " (str cfg))) (let [db (or db (d/db conn))] (if-let [ids (->> (d/q '[:find ?id ?lock - :in $ ?queue-name ?backoff + :in $ ?queue-name ?backoff ?current-version :where [?e :com.github.ivarref.yoltq/status :init] [?e :com.github.ivarref.yoltq/queue-name ?queue-name] [?e :com.github.ivarref.yoltq/init-time ?init-time] [(>= ?backoff ?init-time)] [?e :com.github.ivarref.yoltq/id ?id] - [?e :com.github.ivarref.yoltq/lock ?lock]] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] db queue-name - (- (now-ms) init-backoff-time)) + (- (now-ms) init-backoff-time) + current-version) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] (prepare-processing db id queue-name old-lock :init)) @@ -109,7 +112,7 @@ (let [db (or db (d/db conn)) max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] (when-let [ids (->> (d/q '[:find ?id ?lock - :in $ ?queue-name ?backoff ?max-tries + :in $ ?queue-name ?backoff ?max-tries ?current-version :where [?e :com.github.ivarref.yoltq/status :error] [?e :com.github.ivarref.yoltq/queue-name ?queue-name] @@ -118,11 +121,13 @@ [?e :com.github.ivarref.yoltq/tries ?tries] [(> ?max-tries ?tries)] [?e :com.github.ivarref.yoltq/id ?id] - [?e :com.github.ivarref.yoltq/lock ?lock]] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] db queue-name (- (now-ms) error-backoff-time) - (inc max-retries)) + (inc max-retries) + current-version) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] (prepare-processing db id queue-name old-lock :error))))) @@ -136,7 +141,7 @@ max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries) db (or db (d/db conn))] (when-let [ids (->> (d/q '[:find ?id ?lock ?tries - :in $ ?qname ?backoff + :in $ ?qname ?backoff ?current-version :where [?e :com.github.ivarref.yoltq/status :processing] [?e :com.github.ivarref.yoltq/queue-name ?qname] @@ -144,10 +149,12 @@ [(>= ?backoff ?time)] [?e :com.github.ivarref.yoltq/tries ?tries] [?e :com.github.ivarref.yoltq/id ?id] - [?e :com.github.ivarref.yoltq/lock ?lock]] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] db queue-name - (- now hung-backoff-time)) + (- now hung-backoff-time) + current-version) (not-empty))] (let [new-lock (random-uuid) [id old-lock tries _t] (rand-nth (into [] ids)) |
