aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvar Refsdal <refsdal.ivar@gmail.com>2022-03-27 21:33:00 +0200
committerIvar Refsdal <refsdal.ivar@gmail.com>2022-03-27 21:33:00 +0200
commit41c9e08d63176cf7c239574d1d07f2b302a2d3ec (patch)
tree7e3b36871f75d90c28a2b1f79ecdb084e5f284f1
parentStart use current millis in the database, not nano offset (diff)
downloadfiinha-41c9e08d63176cf7c239574d1d07f2b302a2d3ec.tar.gz
fiinha-41c9e08d63176cf7c239574d1d07f2b302a2d3ec.tar.xz
Fix use current millis in the database, not nano offset
-rw-r--r--src/com/github/ivarref/yoltq.clj11
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj6
-rw-r--r--src/com/github/ivarref/yoltq/migrate.clj58
-rw-r--r--test/com/github/ivarref/yoltq/migrate_test.clj92
-rw-r--r--test/com/github/ivarref/yoltq/test_utils.clj7
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj14
6 files changed, 180 insertions, 8 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 17aa40a..1a60a45 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -6,6 +6,7 @@
[com.github.ivarref.yoltq.poller :as poller]
[com.github.ivarref.yoltq.error-poller :as errpoller]
[com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
+ [com.github.ivarref.yoltq.migrate :as migrate]
[com.github.ivarref.yoltq.utils :as u])
(:import (datomic Connection)
(java.util.concurrent Executors TimeUnit ExecutorService)
@@ -64,7 +65,11 @@
:system-error-poll-delay (Duration/ofMinutes 1)
; How often should the system invoke
- :system-error-callback-backoff (Duration/ofHours 1)}
+ :system-error-callback-backoff (Duration/ofHours 1)
+
+ ; Should old, possibly stalled jobs be automatically be migrated
+ ; as part of `start!`?
+ :auto-migrate? true}
u/duration->millis))
@@ -104,7 +109,9 @@
(defn- do-start! []
- (let [{:keys [poll-delay pool-size system-error-poll-delay]} @*config*]
+ (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate?] :as cfg} @*config*]
+ (when auto-migrate?
+ (migrate/migrate! cfg))
(reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size)))
(let [pool @threadpool
queue-listener-ready (promise)]
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index b4eef8d..6b14ffc 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -20,7 +20,8 @@
#:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
#:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
#:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
- #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}])
+ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}])
(defn pr-str-safe [what x]
@@ -63,7 +64,8 @@
:com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
:com.github.ivarref.yoltq/lock (u/random-uuid)
:com.github.ivarref.yoltq/tries 0
- :com.github.ivarref.yoltq/init-time (u/now-ms)}
+ :com.github.ivarref.yoltq/init-time (u/now-ms)
+ :com.github.ivarref.yoltq/version "2"}
(when-let [[q ext-id] (:depends-on opts)]
(when-not (d/q '[:find ?e .
:in $ ?ext-id
diff --git a/src/com/github/ivarref/yoltq/migrate.clj b/src/com/github/ivarref/yoltq/migrate.clj
new file mode 100644
index 0000000..89fc286
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/migrate.clj
@@ -0,0 +1,58 @@
+(ns com.github.ivarref.yoltq.migrate
+ (:require [datomic.api :as d]
+ [clojure.tools.logging :as log]))
+
+(defn to->v2-ent [{:keys [conn]} now-ms id]
+ (log/info "Migrating id" id)
+ (let [attr-val (fn [attr]
+ (when-let [old (d/q '[:find ?time .
+ :in $ ?e ?a
+ :where
+ [?e ?a ?time]]
+ (d/db conn)
+ [:com.github.ivarref.yoltq/id id]
+ attr)]
+ (let [now-ms (or now-ms
+ (.getTime (d/q '[:find (max ?txinst) .
+ :in $ ?e ?a
+ :where
+ [?e ?a _ ?tx true]
+ [?tx :db/txInstant ?txinst]]
+ (d/history (d/db conn))
+ [:com.github.ivarref.yoltq/id id]
+ attr)))]
+ (log/info "Updating" id attr "to" now-ms)
+ [[:db/cas [:com.github.ivarref.yoltq/id id]
+ attr old now-ms]])))]
+ (vec (concat [[:db/cas [:com.github.ivarref.yoltq/id id]
+ :com.github.ivarref.yoltq/version nil "2"]]
+ (mapcat attr-val [:com.github.ivarref.yoltq/init-time
+ :com.github.ivarref.yoltq/processing-time
+ :com.github.ivarref.yoltq/done-time
+ :com.github.ivarref.yoltq/error-time])))))
+
+(defn to->v2 [{:keys [conn loop? now-ms]
+ :or {loop? true}
+ :as cfg}]
+ (loop [tx-vec []]
+ (if-let [id (some->> (d/q '[:find [?id ...]
+ :in $
+ :where
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [(missing? $ ?e :com.github.ivarref.yoltq/version)]]
+ (d/db conn))
+ (sort)
+ (not-empty)
+ (first))]
+ (let [tx (to->v2-ent cfg now-ms id)]
+ @(d/transact conn tx)
+ (if loop?
+ (recur (conj tx-vec tx))
+ tx))
+ (do
+ (log/info "No items left to migrate")
+ tx-vec))))
+
+
+(defn migrate! [cfg]
+ (to->v2 cfg))
diff --git a/test/com/github/ivarref/yoltq/migrate_test.clj b/test/com/github/ivarref/yoltq/migrate_test.clj
new file mode 100644
index 0000000..0063631
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/migrate_test.clj
@@ -0,0 +1,92 @@
+(ns com.github.ivarref.yoltq.migrate-test
+ (:require [clojure.test :refer [deftest is]]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.migrate :as m]
+ [com.github.ivarref.yoltq.impl :as impl]
+ [com.github.ivarref.yoltq.test-utils :as tu]
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d]))
+
+
+(deftest to-v2-migration
+ (with-bindings {#'ext/*squuid-atom* (atom 0)}
+ (let [conn (tu/empty-conn)]
+ @(d/transact conn impl/schema)
+ @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid)
+ :com.github.ivarref.yoltq/queue-name :dummy
+ :com.github.ivarref.yoltq/status u/status-processing
+ :com.github.ivarref.yoltq/init-time 1
+ :com.github.ivarref.yoltq/processing-time 2}])
+ @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid)
+ :com.github.ivarref.yoltq/queue-name :dummy
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/init-time 3}])
+ (is (= [[[:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000001"]
+ :com.github.ivarref.yoltq/version
+ nil
+ "2"]
+ [:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000001"]
+ :com.github.ivarref.yoltq/init-time
+ 1
+ 1000]
+ [:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000001"]
+ :com.github.ivarref.yoltq/processing-time
+ 2
+ 1000]]
+ [[:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000002"]
+ :com.github.ivarref.yoltq/version
+ nil
+ "2"]
+ [:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000002"]
+ :com.github.ivarref.yoltq/init-time
+ 3
+ 1000]]]
+ (m/migrate! {:conn conn
+ :now-ms 1000
+ :loop? true})))
+ (is (= []
+ (m/migrate! {:conn conn
+ :now-ms 1000
+ :loop? true}))))))
+
+
+(deftest to-v2-migration-real-time
+ (with-bindings {#'ext/*squuid-atom* (atom 0)}
+ (let [conn (tu/empty-conn)
+ id (u/squuid)]
+ @(d/transact conn impl/schema)
+ @(d/transact conn [{:com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name :dummy
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/init-time 1}])
+ (Thread/sleep 100)
+ @(d/transact conn [{:com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/init-time 2}])
+ (let [tx-times (->> (d/q '[:find [?txinst ...]
+ :in $ ?e
+ :where
+ [?e :com.github.ivarref.yoltq/init-time _ ?tx true]
+ [?tx :db/txInstant ?txinst]]
+ (d/history (d/db conn))
+ [:com.github.ivarref.yoltq/id id])
+ (sort)
+ (vec))]
+ (is (= 2 (count tx-times)))
+ (m/migrate! {:conn conn})
+ (is (= (.getTime (last tx-times))
+ (d/q '[:find ?init-time .
+ :in $ ?e
+ :where
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]]
+ (d/db conn)
+ [:com.github.ivarref.yoltq/id id])))))))
diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj
index e4151c2..0c1b2f0 100644
--- a/test/com/github/ivarref/yoltq/test_utils.clj
+++ b/test/com/github/ivarref/yoltq/test_utils.clj
@@ -7,7 +7,8 @@
[clojure.string :as str]
[com.github.ivarref.yoltq.impl :as i]
[clojure.edn :as edn]
- [com.github.ivarref.yoltq.ext-sys :as ext])
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [clojure.pprint :as pp])
(:import (java.util UUID)
(java.time Duration)))
@@ -54,6 +55,10 @@
(d/db (:conn @yq/*config*))))
+(defn pp [x]
+ (pp/pprint x)
+ x)
+
(defn get-init [& args]
(apply u/get-init @yq/*config* args))
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
index acd3eb7..34c9026 100644
--- a/test/com/github/ivarref/yoltq/virtual_test.clj
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -1,6 +1,6 @@
(ns com.github.ivarref.yoltq.virtual-test
(:require [datomic-schema.core]
- [clojure.test :refer :all]
+ [clojure.test :refer [use-fixtures deftest is] :refer-macros [thrown?]]
[com.github.ivarref.yoltq.test-queue :as tq]
[com.github.ivarref.yoltq.test-utils :as u]
[datomic.api :as d]
@@ -8,7 +8,8 @@
[clojure.tools.logging :as log]
[com.github.ivarref.yoltq.impl :as i]
[com.github.ivarref.yoltq :as yq]
- [taoensso.timbre :as timbre]))
+ [taoensso.timbre :as timbre]
+ [com.github.ivarref.yoltq.migrate :as migrate]))
(use-fixtures :each tq/call-with-virtual-queue!)
@@ -21,6 +22,13 @@
@(d/transact conn [(yq/put :q {:work 123})])
(is (= {:work 123} (tq/consume! :q)))))
+(deftest happy-case-no-migration-for-new-entities
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (is (= {:work 123} (tq/consume! :q)))
+ (is (= [] (migrate/migrate! @yq/*config*)))))
(deftest happy-case-tx-report-q
(let [conn (u/empty-conn)]
@@ -341,4 +349,4 @@
(some? id))})
@(d/transact conn [(yq/put :q {:id "a"})])
(timbre/with-level :fatal
- (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) \ No newline at end of file
+ (is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))