aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq')
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj6
-rw-r--r--src/com/github/ivarref/yoltq/migrate.clj58
2 files changed, 62 insertions, 2 deletions
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index b4eef8d..6b14ffc 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -20,7 +20,8 @@
#:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
#:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
#:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
- #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}])
+ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}])
(defn pr-str-safe [what x]
@@ -63,7 +64,8 @@
:com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
:com.github.ivarref.yoltq/lock (u/random-uuid)
:com.github.ivarref.yoltq/tries 0
- :com.github.ivarref.yoltq/init-time (u/now-ms)}
+ :com.github.ivarref.yoltq/init-time (u/now-ms)
+ :com.github.ivarref.yoltq/version "2"}
(when-let [[q ext-id] (:depends-on opts)]
(when-not (d/q '[:find ?e .
:in $ ?ext-id
diff --git a/src/com/github/ivarref/yoltq/migrate.clj b/src/com/github/ivarref/yoltq/migrate.clj
new file mode 100644
index 0000000..89fc286
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/migrate.clj
@@ -0,0 +1,58 @@
+(ns com.github.ivarref.yoltq.migrate
+ (:require [datomic.api :as d]
+ [clojure.tools.logging :as log]))
+
+(defn to->v2-ent [{:keys [conn]} now-ms id]
+ (log/info "Migrating id" id)
+ (let [attr-val (fn [attr]
+ (when-let [old (d/q '[:find ?time .
+ :in $ ?e ?a
+ :where
+ [?e ?a ?time]]
+ (d/db conn)
+ [:com.github.ivarref.yoltq/id id]
+ attr)]
+ (let [now-ms (or now-ms
+ (.getTime (d/q '[:find (max ?txinst) .
+ :in $ ?e ?a
+ :where
+ [?e ?a _ ?tx true]
+ [?tx :db/txInstant ?txinst]]
+ (d/history (d/db conn))
+ [:com.github.ivarref.yoltq/id id]
+ attr)))]
+ (log/info "Updating" id attr "to" now-ms)
+ [[:db/cas [:com.github.ivarref.yoltq/id id]
+ attr old now-ms]])))]
+ (vec (concat [[:db/cas [:com.github.ivarref.yoltq/id id]
+ :com.github.ivarref.yoltq/version nil "2"]]
+ (mapcat attr-val [:com.github.ivarref.yoltq/init-time
+ :com.github.ivarref.yoltq/processing-time
+ :com.github.ivarref.yoltq/done-time
+ :com.github.ivarref.yoltq/error-time])))))
+
+(defn to->v2 [{:keys [conn loop? now-ms]
+ :or {loop? true}
+ :as cfg}]
+ (loop [tx-vec []]
+ (if-let [id (some->> (d/q '[:find [?id ...]
+ :in $
+ :where
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [(missing? $ ?e :com.github.ivarref.yoltq/version)]]
+ (d/db conn))
+ (sort)
+ (not-empty)
+ (first))]
+ (let [tx (to->v2-ent cfg now-ms id)]
+ @(d/transact conn tx)
+ (if loop?
+ (recur (conj tx-vec tx))
+ tx))
+ (do
+ (log/info "No items left to migrate")
+ tx-vec))))
+
+
+(defn migrate! [cfg]
+ (to->v2 cfg))