aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvar Refsdal <refsdal.ivar@gmail.com>2022-03-27 22:41:49 +0200
committerIvar Refsdal <refsdal.ivar@gmail.com>2022-03-27 22:41:49 +0200
commitce96806e63ead7c926a348842a3b466eba01190c (patch)
treebaad5a96c8a8088137decd93691cfed0e81c09a1
parentRelease 0.2.46 (diff)
downloadfiinha-ce96806e63ead7c926a348842a3b466eba01190c.tar.gz
fiinha-ce96806e63ead7c926a348842a3b466eba01190c.tar.xz
Auto migration should run in the background and not interfere with polling
-rw-r--r--README.md4
-rw-r--r--src/com/github/ivarref/yoltq.clj2
-rw-r--r--src/com/github/ivarref/yoltq/utils.clj25
3 files changed, 21 insertions, 10 deletions
diff --git a/README.md b/README.md
index 334af5d..2385d15 100644
--- a/README.md
+++ b/README.md
@@ -333,6 +333,10 @@ easier.
## Change log
+### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48)
+* Auto migration is done in the background.
+* Only poll for current version of jobs, thus no races for auto migration.
+
### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46)
* Critical bugfix that in some cases can lead to stalled jobs.
```
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 1a60a45..4b324c4 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -111,7 +111,7 @@
(defn- do-start! []
(let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate?] :as cfg} @*config*]
(when auto-migrate?
- (migrate/migrate! cfg))
+ (future (migrate/migrate! cfg)))
(reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size)))
(let [pool @threadpool
queue-listener-ready (promise)]
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
index ad2444a..39572a9 100644
--- a/src/com/github/ivarref/yoltq/utils.clj
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -13,6 +13,7 @@
(def status-done :done)
(def status-error :error)
+(def current-version "2")
(defn duration->millis [m]
(reduce-kv (fn [o k v]
@@ -85,17 +86,19 @@
"\nConfig was: " (str cfg)))
(let [db (or db (d/db conn))]
(if-let [ids (->> (d/q '[:find ?id ?lock
- :in $ ?queue-name ?backoff
+ :in $ ?queue-name ?backoff ?current-version
:where
[?e :com.github.ivarref.yoltq/status :init]
[?e :com.github.ivarref.yoltq/queue-name ?queue-name]
[?e :com.github.ivarref.yoltq/init-time ?init-time]
[(>= ?backoff ?init-time)]
[?e :com.github.ivarref.yoltq/id ?id]
- [?e :com.github.ivarref.yoltq/lock ?lock]]
+ [?e :com.github.ivarref.yoltq/lock ?lock]
+ [?e :com.github.ivarref.yoltq/version ?current-version]]
db
queue-name
- (- (now-ms) init-backoff-time))
+ (- (now-ms) init-backoff-time)
+ current-version)
(not-empty))]
(let [[id old-lock] (rand-nth (into [] ids))]
(prepare-processing db id queue-name old-lock :init))
@@ -109,7 +112,7 @@
(let [db (or db (d/db conn))
max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)]
(when-let [ids (->> (d/q '[:find ?id ?lock
- :in $ ?queue-name ?backoff ?max-tries
+ :in $ ?queue-name ?backoff ?max-tries ?current-version
:where
[?e :com.github.ivarref.yoltq/status :error]
[?e :com.github.ivarref.yoltq/queue-name ?queue-name]
@@ -118,11 +121,13 @@
[?e :com.github.ivarref.yoltq/tries ?tries]
[(> ?max-tries ?tries)]
[?e :com.github.ivarref.yoltq/id ?id]
- [?e :com.github.ivarref.yoltq/lock ?lock]]
+ [?e :com.github.ivarref.yoltq/lock ?lock]
+ [?e :com.github.ivarref.yoltq/version ?current-version]]
db
queue-name
(- (now-ms) error-backoff-time)
- (inc max-retries))
+ (inc max-retries)
+ current-version)
(not-empty))]
(let [[id old-lock] (rand-nth (into [] ids))]
(prepare-processing db id queue-name old-lock :error)))))
@@ -136,7 +141,7 @@
max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)
db (or db (d/db conn))]
(when-let [ids (->> (d/q '[:find ?id ?lock ?tries
- :in $ ?qname ?backoff
+ :in $ ?qname ?backoff ?current-version
:where
[?e :com.github.ivarref.yoltq/status :processing]
[?e :com.github.ivarref.yoltq/queue-name ?qname]
@@ -144,10 +149,12 @@
[(>= ?backoff ?time)]
[?e :com.github.ivarref.yoltq/tries ?tries]
[?e :com.github.ivarref.yoltq/id ?id]
- [?e :com.github.ivarref.yoltq/lock ?lock]]
+ [?e :com.github.ivarref.yoltq/lock ?lock]
+ [?e :com.github.ivarref.yoltq/version ?current-version]]
db
queue-name
- (- now hung-backoff-time))
+ (- now hung-backoff-time)
+ current-version)
(not-empty))]
(let [new-lock (random-uuid)
[id old-lock tries _t] (rand-nth (into [] ids))