aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvar Refsdal <ivar.refsdal@nsd.no>2021-09-23 13:01:23 +0200
committerIvar Refsdal <ivar.refsdal@nsd.no>2021-09-23 13:01:23 +0200
commitdc2e14b4e1e91e6fefecc01c312a44c0033640c9 (patch)
treeca51f0e1830abc41672cc565487ddaaa3135ec89
parentDocument force-retry! (diff)
downloadfiinha-dc2e14b4e1e91e6fefecc01c312a44c0033640c9.tar.gz
fiinha-dc2e14b4e1e91e6fefecc01c312a44c0033640c9.tar.xz
Basic depends-on works for test queue
-rw-r--r--src/com/github/ivarref/yoltq.clj12
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj51
-rw-r--r--src/com/github/ivarref/yoltq/test_queue.clj27
-rw-r--r--src/com/github/ivarref/yoltq/utils.clj1
-rw-r--r--test/com/github/ivarref/yoltq/test_utils.clj2
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj27
6 files changed, 92 insertions, 28 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 58efca1..3164020 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -94,11 +94,13 @@
(swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f}))))))
-(defn put [queue-id payload]
- (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*]
- (when (and *test-mode* bootstrap-poller!)
- (bootstrap-poller! conn))
- (i/put cfg queue-id payload)))
+(defn put
+ ([queue-id payload] (put queue-id payload {}))
+ ([queue-id payload opts]
+ (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*]
+ (when (and *test-mode* bootstrap-poller!)
+ (bootstrap-poller! conn))
+ (i/put cfg queue-id payload opts))))
(defn- do-start! []
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index 9c95cff..9811c93 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -8,9 +8,11 @@
(def schema
[#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
+ #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value}
#:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true}
#:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true}
@@ -20,8 +22,10 @@
#:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}])
-(defn put [{:keys [capture-bindings] :as config}
- queue-name payload]
+(defn put [{:keys [capture-bindings conn] :as config}
+ queue-name
+ payload
+ opts]
(if-let [_ (get-in config [:handlers queue-name])]
(let [id (u/squuid)
str-bindings (->> (reduce (fn [o k]
@@ -30,19 +34,46 @@
(or capture-bindings []))
(pr-str))]
(log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init)
- {:com.github.ivarref.yoltq/id id
- :com.github.ivarref.yoltq/queue-name queue-name
- :com.github.ivarref.yoltq/status u/status-init
- :com.github.ivarref.yoltq/payload (pr-str payload)
- :com.github.ivarref.yoltq/bindings str-bindings
- :com.github.ivarref.yoltq/lock (u/random-uuid)
- :com.github.ivarref.yoltq/tries 0
- :com.github.ivarref.yoltq/init-time (u/now-ns)})
+ (merge
+ {:com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name queue-name
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/payload (pr-str payload)
+ :com.github.ivarref.yoltq/bindings str-bindings
+ :com.github.ivarref.yoltq/opts (pr-str (or opts {}))
+ :com.github.ivarref.yoltq/lock (u/random-uuid)
+ :com.github.ivarref.yoltq/tries 0
+ :com.github.ivarref.yoltq/init-time (u/now-ns)}
+ (when-let [[q ext-id] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
+ (d/db conn)
+ (pr-str [q ext-id]))
+ (throw (ex-info ":depends-on not found in database" opts))))
+ (when-let [ext-id (:id opts)]
+ {:com.github.ivarref.yoltq/ext-id (pr-str [queue-name ext-id])})))
(do
(log/error "Did not find registered handler for queue" queue-name)
(throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
+(defn depends-on-waiting? [{:keys [conn]}
+ {:keys [id]}]
+ (let [db (d/db conn)]
+ (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db id)]
+ (when-let [[q id :as depends-on] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]
+ [?e :com.github.ivarref.yoltq/status :done]]
+ db
+ (pr-str [q id]))
+ {:depends-on depends-on})))))
+
+
(defn take! [{:keys [conn cas-failures hung-log-level tx-spent-time!]
:or {hung-log-level :error}}
{:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}]
diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj
index 4c4f903..6183216 100644
--- a/src/com/github/ivarref/yoltq/test_queue.clj
+++ b/src/com/github/ivarref/yoltq/test_queue.clj
@@ -148,18 +148,21 @@
`(if-let [job# (get-tx-q-job ~queue-name)]
(try
(with-bindings (:com.github.ivarref.yoltq/bindings job#)
- (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*))
- (:com.github.ivarref.yoltq/id job#)
- ~queue-name
- (:com.github.ivarref.yoltq/lock job#)
- (:com.github.ivarref.yoltq/status job#))
- (i/take! @yq/*config*)
- (i/execute! @yq/*config*))]
- (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#)
- (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#)))
- (if (:retval res#)
- (:retval res#)
- (:exception res#))))
+ (let [prep# (u/prepare-processing (d/db (:conn @yq/*config*))
+ (:com.github.ivarref.yoltq/id job#)
+ ~queue-name
+ (:com.github.ivarref.yoltq/lock job#)
+ (:com.github.ivarref.yoltq/status job#))]
+ (if-let [depends-on# (i/depends-on-waiting? @yq/*config* prep#)]
+ depends-on#
+ (let [res# (some->> prep#
+ (i/take! @yq/*config*)
+ (i/execute! @yq/*config*))]
+ (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#)
+ (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#)))
+ (if (:retval res#)
+ (:retval res#)
+ (:exception res#))))))
(catch Throwable t#
(log/error t# "unexpected error in consume-expect:" (ex-message t#))
(throw t#)))
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
index 9501343..d551510 100644
--- a/src/com/github/ivarref/yoltq/utils.clj
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -56,6 +56,7 @@
(-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id])
(dissoc :db/id)
(update :com.github.ivarref.yoltq/payload edn/read-string)
+ (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {})))
(update :com.github.ivarref.yoltq/bindings
(fn [s]
(when s
diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj
index df56460..5427ff5 100644
--- a/test/com/github/ivarref/yoltq/test_utils.clj
+++ b/test/com/github/ivarref/yoltq/test_utils.clj
@@ -35,7 +35,7 @@
(defn put-transact! [id payload]
- @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload)]))
+ @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload {})]))
(defn advance! [tp]
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
index 2b67e5e..789e5b4 100644
--- a/test/com/github/ivarref/yoltq/virtual_test.clj
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -284,3 +284,30 @@
(is (= 1 (tq/consume! :q)))
(is (= 2 (tq/force-retry! :q)))
(is (= 3 (tq/force-retry! :q)))))
+
+
+(deftest ext-id-no-duplicates
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q nil {:id "123"})])
+ (is (thrown? Exception @(d/transact conn [(yq/put :q nil {:id "123"})])))))
+
+
+(deftest depends-on
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :a identity)
+ (yq/add-consumer! :b identity)
+ @(d/transact conn [(yq/put :a "a" {:id "1"})])
+ (is (thrown? Exception @(d/transact conn [(yq/put :b "b" {:depends-on [:a "0"]})])))
+ @(d/transact conn [(yq/put :b "b" {:depends-on [:a "1"]})])
+
+ ; can't consume :b yet:
+ (is (= {:depends-on [:a "1"]} (tq/consume! :b)))
+ (is (= {:depends-on [:a "1"]} (tq/consume! :b)))
+
+ (is (= "a" (tq/consume! :a)))
+ (is (= "b" (tq/consume! :b)))
+ (is (= "b" (tq/force-retry! :b)))))
+