aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvar Refsdal <refsdal.ivar@gmail.com>2022-03-27 21:33:00 +0200
committerIvar Refsdal <refsdal.ivar@gmail.com>2022-03-27 21:33:00 +0200
commit41c9e08d63176cf7c239574d1d07f2b302a2d3ec (patch)
tree7e3b36871f75d90c28a2b1f79ecdb084e5f284f1 /src
parentStart use current millis in the database, not nano offset (diff)
downloadfiinha-41c9e08d63176cf7c239574d1d07f2b302a2d3ec.tar.gz
fiinha-41c9e08d63176cf7c239574d1d07f2b302a2d3ec.tar.xz
Fix use current millis in the database, not nano offset
Diffstat (limited to 'src')
-rw-r--r--src/com/github/ivarref/yoltq.clj11
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj6
-rw-r--r--src/com/github/ivarref/yoltq/migrate.clj58
3 files changed, 71 insertions, 4 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 17aa40a..1a60a45 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -6,6 +6,7 @@
[com.github.ivarref.yoltq.poller :as poller]
[com.github.ivarref.yoltq.error-poller :as errpoller]
[com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
+ [com.github.ivarref.yoltq.migrate :as migrate]
[com.github.ivarref.yoltq.utils :as u])
(:import (datomic Connection)
(java.util.concurrent Executors TimeUnit ExecutorService)
@@ -64,7 +65,11 @@
:system-error-poll-delay (Duration/ofMinutes 1)
; How often should the system invoke
- :system-error-callback-backoff (Duration/ofHours 1)}
+ :system-error-callback-backoff (Duration/ofHours 1)
+
+ ; Should old, possibly stalled jobs be automatically be migrated
+ ; as part of `start!`?
+ :auto-migrate? true}
u/duration->millis))
@@ -104,7 +109,9 @@
(defn- do-start! []
- (let [{:keys [poll-delay pool-size system-error-poll-delay]} @*config*]
+ (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate?] :as cfg} @*config*]
+ (when auto-migrate?
+ (migrate/migrate! cfg))
(reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size)))
(let [pool @threadpool
queue-listener-ready (promise)]
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index b4eef8d..6b14ffc 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -20,7 +20,8 @@
#:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
#:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
#:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
- #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}])
+ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}])
(defn pr-str-safe [what x]
@@ -63,7 +64,8 @@
:com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
:com.github.ivarref.yoltq/lock (u/random-uuid)
:com.github.ivarref.yoltq/tries 0
- :com.github.ivarref.yoltq/init-time (u/now-ms)}
+ :com.github.ivarref.yoltq/init-time (u/now-ms)
+ :com.github.ivarref.yoltq/version "2"}
(when-let [[q ext-id] (:depends-on opts)]
(when-not (d/q '[:find ?e .
:in $ ?ext-id
diff --git a/src/com/github/ivarref/yoltq/migrate.clj b/src/com/github/ivarref/yoltq/migrate.clj
new file mode 100644
index 0000000..89fc286
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/migrate.clj
@@ -0,0 +1,58 @@
+(ns com.github.ivarref.yoltq.migrate
+ (:require [datomic.api :as d]
+ [clojure.tools.logging :as log]))
+
+(defn to->v2-ent [{:keys [conn]} now-ms id]
+ (log/info "Migrating id" id)
+ (let [attr-val (fn [attr]
+ (when-let [old (d/q '[:find ?time .
+ :in $ ?e ?a
+ :where
+ [?e ?a ?time]]
+ (d/db conn)
+ [:com.github.ivarref.yoltq/id id]
+ attr)]
+ (let [now-ms (or now-ms
+ (.getTime (d/q '[:find (max ?txinst) .
+ :in $ ?e ?a
+ :where
+ [?e ?a _ ?tx true]
+ [?tx :db/txInstant ?txinst]]
+ (d/history (d/db conn))
+ [:com.github.ivarref.yoltq/id id]
+ attr)))]
+ (log/info "Updating" id attr "to" now-ms)
+ [[:db/cas [:com.github.ivarref.yoltq/id id]
+ attr old now-ms]])))]
+ (vec (concat [[:db/cas [:com.github.ivarref.yoltq/id id]
+ :com.github.ivarref.yoltq/version nil "2"]]
+ (mapcat attr-val [:com.github.ivarref.yoltq/init-time
+ :com.github.ivarref.yoltq/processing-time
+ :com.github.ivarref.yoltq/done-time
+ :com.github.ivarref.yoltq/error-time])))))
+
+(defn to->v2 [{:keys [conn loop? now-ms]
+ :or {loop? true}
+ :as cfg}]
+ (loop [tx-vec []]
+ (if-let [id (some->> (d/q '[:find [?id ...]
+ :in $
+ :where
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [(missing? $ ?e :com.github.ivarref.yoltq/version)]]
+ (d/db conn))
+ (sort)
+ (not-empty)
+ (first))]
+ (let [tx (to->v2-ent cfg now-ms id)]
+ @(d/transact conn tx)
+ (if loop?
+ (recur (conj tx-vec tx))
+ tx))
+ (do
+ (log/info "No items left to migrate")
+ tx-vec))))
+
+
+(defn migrate! [cfg]
+ (to->v2 cfg))