diff options
| -rw-r--r-- | README.md | 24 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/test_queue.clj (renamed from src/com/github/ivarref/yoltq/virtual_queue.clj) | 5 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/test_utils.clj | 16 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/virtual_test.clj | 140 |
4 files changed, 92 insertions, 93 deletions
@@ -243,23 +243,23 @@ by enabling the virtual queue: ```clojure ... (:require [clojure.test :refer :all] - [com.github.ivarref.yoltq :as yq] - [com.github.ivarref.yoltq.virtual-queue :as vq]) + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.test-queue :as tq]) ; Enables the virtual queue and disables the threadpool for each test. ; yq/start! and yq/stop! becomes a no-op. -(use-fixtures :each vq/call-with-virtual-queue!) +(use-fixtures :each tq/call-with-virtual-queue!) (deftest demo - (let [conn ...] - (dq/init! {:conn conn}) ; Setup - (dq/add-consumer! :q identity) - - @(d/transact conn [(yq/put :q {:work 123})]) ; Add work - - ; vq/consume! consumes one job and asserts that it succeeds. - ; It returns the return value of the consumer function - (is (= {:work 123} (vq/consume! :q))))) + (let [conn ...] + (yq/init! {:conn conn}) ; Setup + (yq/add-consumer! :q identity) + + @(d/transact conn [(yq/put :q {:work 123})]) ; Add work + + ; tq/consume! consumes one job and asserts that it succeeds. + ; It returns the return value of the consumer function + (is (= {:work 123} (tq/consume! :q))))) ``` diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index db429a8..6aeb959 100644 --- a/src/com/github/ivarref/yoltq/virtual_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -1,4 +1,4 @@ -(ns com.github.ivarref.yoltq.virtual-queue +(ns com.github.ivarref.yoltq.test-queue (:require [clojure.tools.logging :as log] [com.github.ivarref.yoltq.report-queue :as rq] [com.github.ivarref.yoltq.ext-sys :as ext] @@ -161,7 +161,6 @@ (catch Throwable t# (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) (test/is false (str "No job found for queue " ~queue-name)))) -tx-spent-time! (defmacro consume! [queue-name] `(consume-expect! ~queue-name :done)) @@ -181,7 +180,7 @@ tx-spent-time! nil))) -(defmacro consume-twice! [queue-name] +(defmacro force-retry! [queue-name] `(if-let [job# (get-tx-q-job ~queue-name)] (try (with-bindings (:com.github.ivarref.yoltq/bindings job#) diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index dacba68..df56460 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -2,7 +2,7 @@ (:require [com.github.ivarref.yoltq.log-init :as logconfig] [clojure.tools.logging :as log] [com.github.ivarref.yoltq.utils :as u] - [com.github.ivarref.yoltq :as dq] + [com.github.ivarref.yoltq :as yq] [datomic.api :as d] [clojure.string :as str] [com.github.ivarref.yoltq.impl :as i] @@ -35,7 +35,7 @@ (defn put-transact! [id payload] - @(d/transact (:conn @dq/*config*) [(i/put @dq/*config* id payload)])) + @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload)])) (defn advance! [tp] @@ -50,25 +50,25 @@ :where [?e :com.github.ivarref.yoltq/id _] [?e :com.github.ivarref.yoltq/status :done]] - (d/db (:conn @dq/*config*)))) + (d/db (:conn @yq/*config*)))) (defn get-init [& args] - (apply u/get-init @dq/*config* args)) + (apply u/get-init @yq/*config* args)) (defn get-error [& args] - (apply u/get-error @dq/*config* args)) + (apply u/get-error @yq/*config* args)) (defn get-hung [& args] - (apply u/get-hung @dq/*config* args)) + (apply u/get-hung @yq/*config* args)) (defn take! [& args] - (apply i/take! @dq/*config* args)) + (apply i/take! @yq/*config* args)) (defn execute! [& args] - (apply i/execute! @dq/*config* args)) + (apply i/execute! @yq/*config* args)) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 5e5fc92..442acac 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -1,8 +1,7 @@ (ns com.github.ivarref.yoltq.virtual-test (:require [datomic-schema.core] [clojure.test :refer :all] - [com.github.ivarref.yoltq.virtual-queue :as vq] - [com.github.ivarref.yoltq :as dq] + [com.github.ivarref.yoltq.test-queue :as tq] [com.github.ivarref.yoltq.test-utils :as u] [datomic.api :as d] [com.github.ivarref.yoltq.utils :as uu] @@ -12,29 +11,29 @@ [taoensso.timbre :as timbre])) -(use-fixtures :each vq/call-with-virtual-queue!) +(use-fixtures :each tq/call-with-virtual-queue!) (deftest happy-case-1 (let [conn (u/empty-conn)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q identity) - @(d/transact conn [(dq/put :q {:work 123})]) - (is (= {:work 123} (vq/consume! :q))))) + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))))) (deftest happy-case-tx-report-q (let [conn (u/empty-conn)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q identity) - @(d/transact conn [(dq/put :q {:work 123})]) - (is (= {:work 123} (:retval (vq/run-one-report-queue!)))) + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (:retval (tq/run-one-report-queue!)))) (is (= 1 (u/done-count))))) (deftest happy-case-poller (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :handlers {:q {:f (fn [payload] payload)}}}) (u/put-transact! :q {:work 123}) (u/advance! (:init-backoff-time yq/default-opts)) @@ -47,8 +46,8 @@ (deftest happy-case-queue-fn-allow-cas-failure (let [conn (u/empty-conn) invoke-count (atom 0)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [{:keys [id]}] (swap! invoke-count inc) @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]])) @@ -56,14 +55,14 @@ @(d/transact conn #d/schema [[:e/id :one :string :id] [:e/val :one :string]]) @(d/transact conn [{:e/id "demo"} - (dq/put :q {:id "demo"})]) + (yq/put :q {:id "demo"})]) (u/advance! (:init-backoff-time yq/default-opts)) - (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] (log/info "mark-status! doing nothing for new status" new-status))) (is (nil? (some->> (u/get-init :q) (u/take!) (u/execute!)))) - (swap! dq/*config* dissoc :mark-status-fn!) + (swap! yq/*config* dissoc :mark-status-fn!) ; (mark-status! :done) failed but f succeeded. (is (nil? (u/get-hung :q))) @@ -78,7 +77,7 @@ (deftest backoff-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :init-backoff-time (:init-backoff-time yq/default-opts) :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) (u/put-transact! :q {:work 123}) @@ -102,7 +101,7 @@ (deftest get-hung-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :init-backoff-time (:init-backoff-time yq/default-opts) :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) (u/put-transact! :q {:work 123}) @@ -122,7 +121,7 @@ (deftest basic-locking (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :init-backoff-time (:init-backoff-time yq/default-opts) :cas-failures (atom 0) :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) @@ -133,12 +132,12 @@ (let [job (u/get-init :q)] (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status))) (u/take! job) - (is (= 1 @(:cas-failures @dq/*config*)))))) + (is (= 1 @(:cas-failures @yq/*config*)))))) (deftest retry-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :init-backoff-time (:init-backoff-time yq/default-opts) :handlers {:q {:f (let [c (atom 0)] @@ -161,34 +160,34 @@ (deftest max-retries-test (let [conn (u/empty-conn) call-count (atom 0)] - (dq/init! {:conn conn + (yq/init! {:conn conn :error-backoff-time 0}) - (dq/add-consumer! :q (fn [_] + (yq/add-consumer! :q (fn [_] (swap! call-count inc) (throw (ex-info "janei" {}))) {:max-retries 1}) - (vq/put! :q {:work 123}) - (is (some? (:exception (vq/run-one-report-queue!)))) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) (dotimes [_ 10] - (vq/run-queue-once! :q :error)) + (tq/run-queue-once! :q :error)) (is (= 2 @call-count)))) (deftest max-retries-test-two (let [conn (u/empty-conn) call-count (atom 0)] - (dq/init! {:conn conn + (yq/init! {:conn conn :error-backoff-time 0}) - (dq/add-consumer! :q (fn [_] + (yq/add-consumer! :q (fn [_] (swap! call-count inc) (throw (ex-info "janei" {}))) {:max-retries 3}) - (vq/put! :q {:work 123}) - (is (some? (:exception (vq/run-one-report-queue!)))) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) (dotimes [_ 20] - (vq/run-queue-once! :q :error)) + (tq/run-queue-once! :q :error)) (is (= 4 @call-count)))) @@ -196,55 +195,55 @@ (let [conn (u/empty-conn) call-count (atom 0) missed-mark-status (atom nil)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (if (= 1 (swap! call-count inc)) (throw (ex-info "error" {})) (log/info "return OK")))) - (vq/put! :q {:id "demo"}) - (vq/run-one-report-queue!) ; now in status :error + (tq/put! :q {:id "demo"}) + (tq/run-one-report-queue!) ; now in status :error - (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] (reset! missed-mark-status new-status) (log/info "mark-status! doing nothing for new status" new-status))) (u/advance! (:error-backoff-time @yq/*config*)) - (vq/run-queue-once! :q :error) - (swap! dq/*config* dissoc :mark-status-fn!) + (tq/run-queue-once! :q :error) + (swap! yq/*config* dissoc :mark-status-fn!) (is (= :done @missed-mark-status)) - (is (nil? (uu/get-hung @dq/*config* :q))) + (is (nil? (uu/get-hung @yq/*config* :q))) (u/advance! (:hung-backoff-time @yq/*config*)) - (is (some? (uu/get-hung @dq/*config* :q))) + (is (some? (uu/get-hung @yq/*config* :q))) (is (= 2 @call-count)) - (is (true? (some->> (uu/get-hung (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q) - (i/take! @dq/*config*) - (i/execute! @dq/*config*) + (is (true? (some->> (uu/get-hung (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q) + (i/take! @yq/*config*) + (i/execute! @yq/*config*) :failed?))) (u/advance! (:error-backoff-time @yq/*config*)) - (is (some? (uu/get-error @dq/*config* :q))) - (is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q))))) + (is (some? (uu/get-error @yq/*config* :q))) + (is (nil? (uu/get-error (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q))))) (deftest consume-expect-test (let [conn (u/empty-conn) seen (atom #{})] - (dq/init! {:conn conn}) - (dq/add-consumer! :q (fn [payload] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [payload] (when (= #{1 2} (swap! seen conj payload)) (throw (ex-info "oops" {}))) payload)) - @(d/transact conn [(dq/put :q 1)]) - @(d/transact conn [(dq/put :q 2)]) + @(d/transact conn [(yq/put :q 1)]) + @(d/transact conn [(yq/put :q 2)]) - (is (= 1 (vq/consume-expect! :q :done))) - (vq/consume-expect! :q :error))) + (is (= 1 (tq/consume-expect! :q :done))) + (tq/consume-expect! :q :error))) (def ^:dynamic *some-binding* nil) @@ -252,34 +251,35 @@ (deftest binding-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :capture-bindings [#'*some-binding* #'timbre/*context*]}) - (dq/add-consumer! :q (fn [_] *some-binding*)) + (yq/add-consumer! :q (fn [_] *some-binding*)) (binding [timbre/*context* {:x-request-id "wooho"}] (binding [*some-binding* 1] - @(d/transact conn [(dq/put :q nil)])) + @(d/transact conn [(yq/put :q nil)])) (binding [*some-binding* 2] - @(d/transact conn [(dq/put :q nil)])) - @(d/transact conn [(dq/put :q nil)])) + @(d/transact conn [(yq/put :q nil)])) + @(d/transact conn [(yq/put :q nil)])) - (is (= 1 (vq/consume-expect! :q :done))) - (is (= 2 (vq/consume-expect! :q :done))) - (is (nil? (vq/consume-expect! :q :done))))) + (is (= 1 (tq/consume-expect! :q :done))) + (is (= 2 (tq/consume-expect! :q :done))) + (is (nil? (tq/consume-expect! :q :done))))) (deftest default-binding-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*))) + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*))) (binding [timbre/*context* {:x-request-id "123"}] - @(d/transact conn [(dq/put :q nil)])) - (is (= "123" (vq/consume-expect! :q :done))))) + @(d/transact conn [(yq/put :q nil)])) + (is (= "123" (tq/consume-expect! :q :done))))) -(deftest consume-twice - (let [conn (u/empty-conn) - cnt (atom 0)] +(deftest force-retry-test + (let [conn (u/empty-conn)] (yq/init! {:conn conn}) - (yq/add-consumer! :q (fn [_] (swap! cnt inc))) - @(d/transact conn [(dq/put :q nil)]) - (is (= 2 (vq/consume-twice! :q))))) + (yq/add-consumer! :q (let [cnt (atom 0)] + (fn [_] (swap! cnt inc)))) + @(d/transact conn [(yq/put :q nil)]) + (is (= 1 (tq/consume! :q))) + #_(is (= 2 (tq/force-retry! :q))))) |
