aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvar Refsdal <ivar.refsdal@nsd.no>2021-09-17 14:08:22 +0200
committerIvar Refsdal <ivar.refsdal@nsd.no>2021-09-17 14:08:22 +0200
commit3800a43e538e9d140ef0227f2417430171865605 (patch)
treedc15620e1af74b97c0dd4a4345662f9b15bea7ff /src
parentMore documentation (diff)
downloadfiinha-3800a43e538e9d140ef0227f2417430171865605.tar.gz
fiinha-3800a43e538e9d140ef0227f2417430171865605.tar.xz
Add consume-expect!
Diffstat (limited to 'src')
-rw-r--r--src/com/github/ivarref/yoltq/virtual_queue.clj88
1 files changed, 78 insertions, 10 deletions
diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj
index e49aca3..f133bde 100644
--- a/src/com/github/ivarref/yoltq/virtual_queue.clj
+++ b/src/com/github/ivarref/yoltq/virtual_queue.clj
@@ -2,10 +2,14 @@
(:require [clojure.tools.logging :as log]
[com.github.ivarref.yoltq.report-queue :as rq]
[com.github.ivarref.yoltq.ext-sys :as ext]
- [com.github.ivarref.yoltq :as dq]
+ [com.github.ivarref.yoltq :as yq]
[datomic.api :as d]
- [com.github.ivarref.yoltq.poller :as poller])
- (:import (java.util.concurrent BlockingQueue TimeUnit)))
+ [com.github.ivarref.yoltq.poller :as poller]
+ [clojure.test :as test]
+ [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.impl :as i])
+ (:import (java.util.concurrent BlockingQueue TimeUnit)
+ (datomic Datom)))
(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn]
@@ -39,9 +43,9 @@
:init-backoff-time 0
:hung-log-level :warn
:tx-queue txq#})]
- (with-bindings {#'dq/*config* config#
- #'dq/*running?* (atom false)
- #'dq/*test-mode* true
+ (with-bindings {#'yq/*config* config#
+ #'yq/*running?* (atom false)
+ #'yq/*test-mode* true
#'ext/*now-ns-atom* (atom 0)
#'ext/*random-atom* (atom 0)
#'ext/*squuid-atom* (atom 0)}
@@ -60,7 +64,7 @@
(defn run-report-queue! [min-items]
- (let [{:keys [tx-queue conn]} @dq/*config*
+ (let [{:keys [tx-queue conn]} @yq/*config*
id-ident (d/q '[:find ?e .
:where [?e :db/ident :com.github.ivarref.yoltq/id]]
(d/db conn))]
@@ -75,7 +79,7 @@
(let [res (atom [])]
(doseq [itm (first (swap-vals! tx-queue (constantly [])))]
(rq/process-poll-result!
- @dq/*config*
+ @yq/*config*
id-ident
itm
(fn [f] (swap! res conj (f)))))
@@ -87,8 +91,72 @@
(defn run-queue-once! [q status]
- (poller/poll-once! @dq/*config* q status))
+ (poller/poll-once! @yq/*config* q status))
(defn put! [q payload]
- @(d/transact (:conn @dq/*config*) [(dq/put q payload)])) \ No newline at end of file
+ @(d/transact (:conn @yq/*config*) [(yq/put q payload)]))
+
+
+(defn transact-result->maps [{:keys [tx-data db-after]}]
+ (let [m (->> tx-data
+ (group-by (fn [^Datom d] (.e d)))
+ (vals)
+ (mapv (fn [datoms]
+ (reduce (fn [o ^Datom d]
+ (if (.added d)
+ (assoc o (d/q '[:find ?r .
+ :in $ ?e
+ :where [?e :db/ident ?r]]
+ db-after
+ (.a d))
+ (.v d))
+ o))
+ {}
+ datoms))))]
+ m))
+
+(defn contains-queue-job?
+ [queue-id conn {::yq/keys [id queue-name status] :as m}]
+ (when (and (= queue-id queue-name)
+ (= status :init)
+ (d/q '[:find ?e .
+ :in $ ?id
+ :where
+ [?e ::yq/id ?id]
+ [?e ::yq/status :init]]
+ (d/db conn)
+ id))
+ m))
+
+
+(defn get-tx-q-job [q-id]
+ (let [{:keys [tx-queue conn]} @yq/*config*]
+ (loop [timeout (+ 3000 (System/currentTimeMillis))]
+ (if-let [job (->> @tx-queue
+ (mapcat transact-result->maps)
+ (filter (partial contains-queue-job? q-id conn))
+ (first))]
+ (u/get-queue-item (d/db conn) (::yq/id job))
+ (if (< (System/currentTimeMillis) timeout)
+ (do (Thread/sleep 10)
+ (recur timeout))
+ nil)))))
+
+(defmacro consume-expect! [queue-name expected-status]
+ `(if-let [job# (get-tx-q-job ~queue-name)]
+ (try
+ (let [res# (some->>
+ (u/prepare-processing (:com.github.ivarref.yoltq/id job#)
+ ~queue-name
+ (:com.github.ivarref.yoltq/lock job#)
+ (:com.github.ivarref.yoltq/status job#))
+ (i/take! @yq/*config*)
+ (i/execute! @yq/*config*))]
+ (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#)))
+ (if (:retval res#)
+ (:retval res#)
+ (:exception res#)))
+ (catch Throwable t#
+ (log/error t# "unexpected error in consume-expect:" (ex-message t#))))
+ (test/is nil (str "No job found for queue " ~queue-name)))) \ No newline at end of file