diff options
| author | Ivar Refsdal <refsdal.ivar@gmail.com> | 2022-03-27 22:41:49 +0200 |
|---|---|---|
| committer | Ivar Refsdal <refsdal.ivar@gmail.com> | 2022-03-27 22:41:49 +0200 |
| commit | ce96806e63ead7c926a348842a3b466eba01190c (patch) | |
| tree | baad5a96c8a8088137decd93691cfed0e81c09a1 | |
| parent | Release 0.2.46 (diff) | |
| download | fiinha-ce96806e63ead7c926a348842a3b466eba01190c.tar.gz fiinha-ce96806e63ead7c926a348842a3b466eba01190c.tar.xz | |
Auto migration should run in the background and not interfere with polling
| -rw-r--r-- | README.md | 4 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 2 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/utils.clj | 25 |
3 files changed, 21 insertions, 10 deletions
@@ -333,6 +333,10 @@ easier. ## Change log +### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48) +* Auto migration is done in the background. +* Only poll for current version of jobs, thus no races for auto migration. + ### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46) * Critical bugfix that in some cases can lead to stalled jobs. ``` diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 1a60a45..4b324c4 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -111,7 +111,7 @@ (defn- do-start! [] (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate?] :as cfg} @*config*] (when auto-migrate? - (migrate/migrate! cfg)) + (future (migrate/migrate! cfg))) (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size))) (let [pool @threadpool queue-listener-ready (promise)] diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index ad2444a..39572a9 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -13,6 +13,7 @@ (def status-done :done) (def status-error :error) +(def current-version "2") (defn duration->millis [m] (reduce-kv (fn [o k v] @@ -85,17 +86,19 @@ "\nConfig was: " (str cfg))) (let [db (or db (d/db conn))] (if-let [ids (->> (d/q '[:find ?id ?lock - :in $ ?queue-name ?backoff + :in $ ?queue-name ?backoff ?current-version :where [?e :com.github.ivarref.yoltq/status :init] [?e :com.github.ivarref.yoltq/queue-name ?queue-name] [?e :com.github.ivarref.yoltq/init-time ?init-time] [(>= ?backoff ?init-time)] [?e :com.github.ivarref.yoltq/id ?id] - [?e :com.github.ivarref.yoltq/lock ?lock]] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] db queue-name - (- (now-ms) init-backoff-time)) + (- (now-ms) init-backoff-time) + current-version) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] (prepare-processing db id queue-name old-lock :init)) @@ -109,7 +112,7 @@ (let [db (or db (d/db conn)) max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] (when-let [ids (->> (d/q '[:find ?id ?lock - :in $ ?queue-name ?backoff ?max-tries + :in $ ?queue-name ?backoff ?max-tries ?current-version :where [?e :com.github.ivarref.yoltq/status :error] [?e :com.github.ivarref.yoltq/queue-name ?queue-name] @@ -118,11 +121,13 @@ [?e :com.github.ivarref.yoltq/tries ?tries] [(> ?max-tries ?tries)] [?e :com.github.ivarref.yoltq/id ?id] - [?e :com.github.ivarref.yoltq/lock ?lock]] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] db queue-name (- (now-ms) error-backoff-time) - (inc max-retries)) + (inc max-retries) + current-version) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] (prepare-processing db id queue-name old-lock :error))))) @@ -136,7 +141,7 @@ max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries) db (or db (d/db conn))] (when-let [ids (->> (d/q '[:find ?id ?lock ?tries - :in $ ?qname ?backoff + :in $ ?qname ?backoff ?current-version :where [?e :com.github.ivarref.yoltq/status :processing] [?e :com.github.ivarref.yoltq/queue-name ?qname] @@ -144,10 +149,12 @@ [(>= ?backoff ?time)] [?e :com.github.ivarref.yoltq/tries ?tries] [?e :com.github.ivarref.yoltq/id ?id] - [?e :com.github.ivarref.yoltq/lock ?lock]] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] db queue-name - (- now hung-backoff-time)) + (- now hung-backoff-time) + current-version) (not-empty))] (let [new-lock (random-uuid) [id old-lock tries _t] (rand-nth (into [] ids)) |
