From 60f7371f4d2dd43c5b177039406eeaab00ba27cc Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 23 Sep 2021 11:06:33 +0200 Subject: To test-queue namespace --- src/com/github/ivarref/yoltq/test_queue.clj | 197 ++++++++++++++++++++++++ src/com/github/ivarref/yoltq/virtual_queue.clj | 198 ------------------------- 2 files changed, 197 insertions(+), 198 deletions(-) create mode 100644 src/com/github/ivarref/yoltq/test_queue.clj delete mode 100644 src/com/github/ivarref/yoltq/virtual_queue.clj (limited to 'src') diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj new file mode 100644 index 0000000..6aeb959 --- /dev/null +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -0,0 +1,197 @@ +(ns com.github.ivarref.yoltq.test-queue + (:require [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq :as yq] + [datomic.api :as d] + [com.github.ivarref.yoltq.poller :as poller] + [clojure.test :as test] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i]) + (:import (java.util.concurrent BlockingQueue TimeUnit) + (datomic Datom))) + + +(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn] + (let [ready? (promise)] + (future + (let [q (d/tx-report-queue conn)] + (try + (while @running? + (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)] + (swap! txq conj poll-result)) + (deliver ready? true) + (reset! bootstrapped? true)) + (catch Throwable t + (log/error t "test-poller crashed: " (ex-message t))) + (finally + (try + (d/remove-tx-report-queue conn) + (catch Throwable t + (log/warn t "remove-tx-report-queue failed:" (ex-message t)))) + (deliver poller-exited? true))))) + @ready?)) + + +(defmacro with-virtual-queue! + [& body] + `(let [txq# (atom []) + poller-exited?# (promise) + bootstrapped?# (atom false) + running?# (atom true) + config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) + :init-backoff-time 0 + :hung-log-level :warn + :tx-queue txq#})] + (with-bindings {#'yq/*config* config# + #'yq/*running?* (atom false) + #'yq/*test-mode* true + #'ext/*now-ns-atom* (atom 0) + #'ext/*random-atom* (atom 0) + #'ext/*squuid-atom* (atom 0)} + (try + ~@body + (finally + (reset! running?# false) + (when @bootstrapped?# + @poller-exited?#)))))) + + +(defn call-with-virtual-queue! + [f] + (with-virtual-queue! + (f))) + + +(defn run-report-queue! [min-items] + (let [{:keys [tx-queue conn]} @yq/*config* + id-ident (d/q '[:find ?e . + :where [?e :db/ident :com.github.ivarref.yoltq/id]] + (d/db conn))] + (let [timeout (+ 3000 (System/currentTimeMillis))] + (while (and (< (System/currentTimeMillis) timeout) + (< (count @tx-queue) min-items)) + (Thread/sleep 10))) + (when (< (count @tx-queue) min-items) + (let [msg (str "run-report-queue: timeout waiting for " min-items " items")] + (log/error msg) + (throw (ex-info msg {})))) + (let [res (atom [])] + (doseq [itm (first (swap-vals! tx-queue (constantly [])))] + (rq/process-poll-result! + @yq/*config* + id-ident + itm + (fn [f] (swap! res conj (f))))) + @res))) + + +(defn run-one-report-queue! [] + (first (run-report-queue! 1))) + + +(defn run-queue-once! [q status] + (poller/poll-once! @yq/*config* q status)) + + +(defn put! [q payload] + @(d/transact (:conn @yq/*config*) [(yq/put q payload)])) + + +(defn transact-result->maps [{:keys [tx-data db-after]}] + (let [m (->> tx-data + (group-by (fn [^Datom d] (.e d))) + (vals) + (mapv (fn [datoms] + (reduce (fn [o ^Datom d] + (if (.added d) + (assoc o (d/q '[:find ?r . + :in $ ?e + :where [?e :db/ident ?r]] + db-after + (.a d)) + (.v d)) + o)) + {} + datoms))))] + m)) + +(defn contains-queue-job? + [queue-id conn {::yq/keys [id queue-name status] :as m}] + (when (and (= queue-id queue-name) + (= status :init) + (d/q '[:find ?e . + :in $ ?id + :where + [?e ::yq/id ?id] + [?e ::yq/status :init]] + (d/db conn) + id)) + m)) + + +(defn get-tx-q-job [q-id] + (let [{:keys [tx-queue conn]} @yq/*config*] + (loop [timeout (+ 3000 (System/currentTimeMillis))] + (if-let [job (->> @tx-queue + (mapcat transact-result->maps) + (filter (partial contains-queue-job? q-id conn)) + (first))] + (u/get-queue-item (d/db conn) (::yq/id job)) + (if (< (System/currentTimeMillis) timeout) + (do (Thread/sleep 10) + (recur timeout)) + nil))))) + +(defmacro consume-expect! [queue-name expected-status] + `(if-let [job# (get-tx-q-job ~queue-name)] + (try + (with-bindings (:com.github.ivarref.yoltq/bindings job#) + (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#)) + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#)))) + (catch Throwable t# + (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) + (test/is false (str "No job found for queue " ~queue-name)))) + +(defmacro consume! [queue-name] + `(consume-expect! ~queue-name :done)) + + +(defn mark-fails! [{:keys [conn]} + {:com.github.ivarref.yoltq/keys [id lock tries]} + _] + (try + (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]] + @(d/transact conn tx) + nil) + (catch Throwable t + (log/error t "unexpected error in mark-status!: " (ex-message t)) + nil))) + + +(defmacro force-retry! [queue-name] + `(if-let [job# (get-tx-q-job ~queue-name)] + (try + (with-bindings (:com.github.ivarref.yoltq/bindings job#) + (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#)) + (i/take! @yq/*config*) + (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!))) + (consume! ~queue-name)) + (catch Throwable t# + (log/error t# "unexpected error in consume-twice!:" (ex-message t#)))) + (test/is false (str "No job found for queue " ~queue-name)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj deleted file mode 100644 index db429a8..0000000 --- a/src/com/github/ivarref/yoltq/virtual_queue.clj +++ /dev/null @@ -1,198 +0,0 @@ -(ns com.github.ivarref.yoltq.virtual-queue - (:require [clojure.tools.logging :as log] - [com.github.ivarref.yoltq.report-queue :as rq] - [com.github.ivarref.yoltq.ext-sys :as ext] - [com.github.ivarref.yoltq :as yq] - [datomic.api :as d] - [com.github.ivarref.yoltq.poller :as poller] - [clojure.test :as test] - [com.github.ivarref.yoltq.utils :as u] - [com.github.ivarref.yoltq.impl :as i]) - (:import (java.util.concurrent BlockingQueue TimeUnit) - (datomic Datom))) - - -(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn] - (let [ready? (promise)] - (future - (let [q (d/tx-report-queue conn)] - (try - (while @running? - (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)] - (swap! txq conj poll-result)) - (deliver ready? true) - (reset! bootstrapped? true)) - (catch Throwable t - (log/error t "test-poller crashed: " (ex-message t))) - (finally - (try - (d/remove-tx-report-queue conn) - (catch Throwable t - (log/warn t "remove-tx-report-queue failed:" (ex-message t)))) - (deliver poller-exited? true))))) - @ready?)) - - -(defmacro with-virtual-queue! - [& body] - `(let [txq# (atom []) - poller-exited?# (promise) - bootstrapped?# (atom false) - running?# (atom true) - config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) - :init-backoff-time 0 - :hung-log-level :warn - :tx-queue txq#})] - (with-bindings {#'yq/*config* config# - #'yq/*running?* (atom false) - #'yq/*test-mode* true - #'ext/*now-ns-atom* (atom 0) - #'ext/*random-atom* (atom 0) - #'ext/*squuid-atom* (atom 0)} - (try - ~@body - (finally - (reset! running?# false) - (when @bootstrapped?# - @poller-exited?#)))))) - - -(defn call-with-virtual-queue! - [f] - (with-virtual-queue! - (f))) - - -(defn run-report-queue! [min-items] - (let [{:keys [tx-queue conn]} @yq/*config* - id-ident (d/q '[:find ?e . - :where [?e :db/ident :com.github.ivarref.yoltq/id]] - (d/db conn))] - (let [timeout (+ 3000 (System/currentTimeMillis))] - (while (and (< (System/currentTimeMillis) timeout) - (< (count @tx-queue) min-items)) - (Thread/sleep 10))) - (when (< (count @tx-queue) min-items) - (let [msg (str "run-report-queue: timeout waiting for " min-items " items")] - (log/error msg) - (throw (ex-info msg {})))) - (let [res (atom [])] - (doseq [itm (first (swap-vals! tx-queue (constantly [])))] - (rq/process-poll-result! - @yq/*config* - id-ident - itm - (fn [f] (swap! res conj (f))))) - @res))) - - -(defn run-one-report-queue! [] - (first (run-report-queue! 1))) - - -(defn run-queue-once! [q status] - (poller/poll-once! @yq/*config* q status)) - - -(defn put! [q payload] - @(d/transact (:conn @yq/*config*) [(yq/put q payload)])) - - -(defn transact-result->maps [{:keys [tx-data db-after]}] - (let [m (->> tx-data - (group-by (fn [^Datom d] (.e d))) - (vals) - (mapv (fn [datoms] - (reduce (fn [o ^Datom d] - (if (.added d) - (assoc o (d/q '[:find ?r . - :in $ ?e - :where [?e :db/ident ?r]] - db-after - (.a d)) - (.v d)) - o)) - {} - datoms))))] - m)) - -(defn contains-queue-job? - [queue-id conn {::yq/keys [id queue-name status] :as m}] - (when (and (= queue-id queue-name) - (= status :init) - (d/q '[:find ?e . - :in $ ?id - :where - [?e ::yq/id ?id] - [?e ::yq/status :init]] - (d/db conn) - id)) - m)) - - -(defn get-tx-q-job [q-id] - (let [{:keys [tx-queue conn]} @yq/*config*] - (loop [timeout (+ 3000 (System/currentTimeMillis))] - (if-let [job (->> @tx-queue - (mapcat transact-result->maps) - (filter (partial contains-queue-job? q-id conn)) - (first))] - (u/get-queue-item (d/db conn) (::yq/id job)) - (if (< (System/currentTimeMillis) timeout) - (do (Thread/sleep 10) - (recur timeout)) - nil))))) - -(defmacro consume-expect! [queue-name expected-status] - `(if-let [job# (get-tx-q-job ~queue-name)] - (try - (with-bindings (:com.github.ivarref.yoltq/bindings job#) - (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) - (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! @yq/*config*))] - (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) - (if (:retval res#) - (:retval res#) - (:exception res#)))) - (catch Throwable t# - (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) - (test/is false (str "No job found for queue " ~queue-name)))) -tx-spent-time! - -(defmacro consume! [queue-name] - `(consume-expect! ~queue-name :done)) - - -(defn mark-fails! [{:keys [conn]} - {:com.github.ivarref.yoltq/keys [id lock tries]} - _] - (try - (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]] - @(d/transact conn tx) - nil) - (catch Throwable t - (log/error t "unexpected error in mark-status!: " (ex-message t)) - nil))) - - -(defmacro consume-twice! [queue-name] - `(if-let [job# (get-tx-q-job ~queue-name)] - (try - (with-bindings (:com.github.ivarref.yoltq/bindings job#) - (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) - (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!))) - (consume! ~queue-name)) - (catch Throwable t# - (log/error t# "unexpected error in consume-twice!:" (ex-message t#)))) - (test/is false (str "No job found for queue " ~queue-name)))) \ No newline at end of file -- cgit v1.2.3