diff options
| author | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-17 14:08:22 +0200 |
|---|---|---|
| committer | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-17 14:08:22 +0200 |
| commit | 3800a43e538e9d140ef0227f2417430171865605 (patch) | |
| tree | dc15620e1af74b97c0dd4a4345662f9b15bea7ff /src | |
| parent | More documentation (diff) | |
| download | fiinha-3800a43e538e9d140ef0227f2417430171865605.tar.gz fiinha-3800a43e538e9d140ef0227f2417430171865605.tar.xz | |
Add consume-expect!
Diffstat (limited to 'src')
| -rw-r--r-- | src/com/github/ivarref/yoltq/virtual_queue.clj | 88 |
1 files changed, 78 insertions, 10 deletions
diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj index e49aca3..f133bde 100644 --- a/src/com/github/ivarref/yoltq/virtual_queue.clj +++ b/src/com/github/ivarref/yoltq/virtual_queue.clj @@ -2,10 +2,14 @@ (:require [clojure.tools.logging :as log] [com.github.ivarref.yoltq.report-queue :as rq] [com.github.ivarref.yoltq.ext-sys :as ext] - [com.github.ivarref.yoltq :as dq] + [com.github.ivarref.yoltq :as yq] [datomic.api :as d] - [com.github.ivarref.yoltq.poller :as poller]) - (:import (java.util.concurrent BlockingQueue TimeUnit))) + [com.github.ivarref.yoltq.poller :as poller] + [clojure.test :as test] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i]) + (:import (java.util.concurrent BlockingQueue TimeUnit) + (datomic Datom))) (defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn] @@ -39,9 +43,9 @@ :init-backoff-time 0 :hung-log-level :warn :tx-queue txq#})] - (with-bindings {#'dq/*config* config# - #'dq/*running?* (atom false) - #'dq/*test-mode* true + (with-bindings {#'yq/*config* config# + #'yq/*running?* (atom false) + #'yq/*test-mode* true #'ext/*now-ns-atom* (atom 0) #'ext/*random-atom* (atom 0) #'ext/*squuid-atom* (atom 0)} @@ -60,7 +64,7 @@ (defn run-report-queue! [min-items] - (let [{:keys [tx-queue conn]} @dq/*config* + (let [{:keys [tx-queue conn]} @yq/*config* id-ident (d/q '[:find ?e . :where [?e :db/ident :com.github.ivarref.yoltq/id]] (d/db conn))] @@ -75,7 +79,7 @@ (let [res (atom [])] (doseq [itm (first (swap-vals! tx-queue (constantly [])))] (rq/process-poll-result! - @dq/*config* + @yq/*config* id-ident itm (fn [f] (swap! res conj (f))))) @@ -87,8 +91,72 @@ (defn run-queue-once! [q status] - (poller/poll-once! @dq/*config* q status)) + (poller/poll-once! @yq/*config* q status)) (defn put! [q payload] - @(d/transact (:conn @dq/*config*) [(dq/put q payload)]))
\ No newline at end of file + @(d/transact (:conn @yq/*config*) [(yq/put q payload)])) + + +(defn transact-result->maps [{:keys [tx-data db-after]}] + (let [m (->> tx-data + (group-by (fn [^Datom d] (.e d))) + (vals) + (mapv (fn [datoms] + (reduce (fn [o ^Datom d] + (if (.added d) + (assoc o (d/q '[:find ?r . + :in $ ?e + :where [?e :db/ident ?r]] + db-after + (.a d)) + (.v d)) + o)) + {} + datoms))))] + m)) + +(defn contains-queue-job? + [queue-id conn {::yq/keys [id queue-name status] :as m}] + (when (and (= queue-id queue-name) + (= status :init) + (d/q '[:find ?e . + :in $ ?id + :where + [?e ::yq/id ?id] + [?e ::yq/status :init]] + (d/db conn) + id)) + m)) + + +(defn get-tx-q-job [q-id] + (let [{:keys [tx-queue conn]} @yq/*config*] + (loop [timeout (+ 3000 (System/currentTimeMillis))] + (if-let [job (->> @tx-queue + (mapcat transact-result->maps) + (filter (partial contains-queue-job? q-id conn)) + (first))] + (u/get-queue-item (d/db conn) (::yq/id job)) + (if (< (System/currentTimeMillis) timeout) + (do (Thread/sleep 10) + (recur timeout)) + nil))))) + +(defmacro consume-expect! [queue-name expected-status] + `(if-let [job# (get-tx-q-job ~queue-name)] + (try + (let [res# (some->> + (u/prepare-processing (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#)) + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#))) + (catch Throwable t# + (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) + (test/is nil (str "No job found for queue " ~queue-name))))
\ No newline at end of file |
