diff options
Diffstat (limited to 'test/com/github/ivarref/yoltq/virtual_test.clj')
| -rw-r--r-- | test/com/github/ivarref/yoltq/virtual_test.clj | 474 |
1 files changed, 474 insertions, 0 deletions
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj new file mode 100644 index 0000000..a2ed269 --- /dev/null +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -0,0 +1,474 @@ +(ns com.github.ivarref.yoltq.virtual-test + (:require + [clojure.string :as str] + [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.error-poller :as error-poller] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.test-queue :as tq] + [com.github.ivarref.yoltq.test-utils :as u] + [com.github.ivarref.yoltq.utils :as uu] + [datomic-schema.core] + [datomic.api :as d] + [taoensso.nippy :as nippy] + [taoensso.timbre :as timbre]) + (:import (java.time Duration LocalDateTime))) + + +(use-fixtures :each tq/call-with-virtual-queue!) + + +(deftest happy-case-1 + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))))) + +(deftest happy-case-no-migration-for-new-entities + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))) + (is (= [] (migrate/migrate! @yq/*config*))))) + +(deftest happy-case-tx-report-q + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (:retval (tq/run-one-report-queue!)))) + (is (= 1 (u/done-count))))) + + +(deftest happy-case-poller + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :handlers {:q {:f (fn [payload] payload)}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (= {:work 123} (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :retval))))) + + +(deftest happy-case-queue-fn-allow-cas-failure + (let [conn (u/empty-conn) + invoke-count (atom 0)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q + (fn [{:keys [id]}] + (swap! invoke-count inc) + @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]])) + {:allow-cas-failure? #{:e/val}}) + @(d/transact conn #d/schema [[:e/id :one :string :id] + [:e/val :one :string]]) + @(d/transact conn [{:e/id "demo"} + (yq/put :q {:id "demo"})]) + (u/advance! (:init-backoff-time yq/default-opts)) + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (log/info "mark-status! doing nothing for new status" new-status))) + (is (nil? (some->> (u/get-init :q) + (u/take!) + (u/execute!)))) + (swap! yq/*config* dissoc :mark-status-fn!) + + ; (mark-status! :done) failed but f succeeded. + (is (nil? (u/get-hung :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + (is (some? (u/get-hung :q))) + (is (true? (some->> (u/get-hung :q) + (u/take!) + (u/execute!) + :allow-cas-failure?))) + (is (= 2 @invoke-count)))) + + +(deftest backoff-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (is (nil? (u/get-init :q))) + + (u/advance! (dec (:init-backoff-time yq/default-opts))) + (is (nil? (u/get-init :q))) + (u/advance! 1) + (is (some? (u/get-init :q))) + + (is (some? (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :exception))) + + (u/advance! (dec (:error-backoff-time @yq/*config*))) + (is (nil? (u/get-error :q))) + (u/advance! 1) + (is (some? (u/get-error :q))))) + + +(deftest get-hung-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (is (= :processing (some->> (u/get-init :q) + (u/take!) + :com.github.ivarref.yoltq/status))) + + (is (nil? (u/get-hung :q))) + (u/advance! (dec (:hung-backoff-time yq/default-opts))) + (is (nil? (u/get-hung :q))) + (u/advance! 1) + (is (some? (u/get-hung :q))))) + + +(deftest basic-locking + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :cas-failures (atom 0) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (let [job (u/get-init :q)] + (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status))) + (u/take! job) + (is (= 1 @(:cas-failures @yq/*config*)))))) + + +(deftest retry-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f + (let [c (atom 0)] + (fn [_] + (if (<= (swap! c inc) 2) + (throw (ex-info "janei" {})) + ::ok)))}}}) + (u/put-transact! :q {:work 123}) + + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))))) + + +(deftest max-retries-test + (let [conn (u/empty-conn) + call-count (atom 0)] + (yq/init! {:conn conn + :error-backoff-time 0}) + (yq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 1}) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) + + (dotimes [_ 10] + (tq/run-queue-once! :q :error)) + (is (= 2 @call-count)))) + + +(deftest max-retries-test-two + (let [conn (u/empty-conn) + call-count (atom 0)] + (yq/init! {:conn conn + :error-backoff-time 0}) + (yq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 3}) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) + + (timbre/with-level :fatal + (dotimes [_ 20] + (tq/run-queue-once! :q :error))) + (is (= 4 @call-count)))) + + +(deftest hung-to-error + (let [conn (u/empty-conn) + call-count (atom 0) + missed-mark-status (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q + (fn [_] + (if (= 1 (swap! call-count inc)) + (throw (ex-info "error" {})) + (log/info "return OK")))) + (tq/put! :q {:id "demo"}) + (tq/run-one-report-queue!) ; now in status :error + + + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (reset! missed-mark-status new-status) + (log/info "mark-status! doing nothing for new status" new-status))) + (u/advance! (:error-backoff-time @yq/*config*)) + (tq/run-queue-once! :q :error) + (swap! yq/*config* dissoc :mark-status-fn!) + (is (= :done @missed-mark-status)) + + (is (nil? (uu/get-hung @yq/*config* :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + + (is (some? (uu/get-hung @yq/*config* :q))) + + (is (= 2 @call-count)) + + (is (true? (some->> (uu/get-hung (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q) + (i/take! @yq/*config*) + (i/execute! @yq/*config*) + :failed?))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (uu/get-error @yq/*config* :q))) + (is (nil? (uu/get-error (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q))))) + + +(deftest consume-expect-test + (let [conn (u/empty-conn) + seen (atom #{})] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [payload] + (when (= #{1 2} (swap! seen conj payload)) + (throw (ex-info "oops" {}))) + payload)) + + @(d/transact conn [(yq/put :q 1)]) + @(d/transact conn [(yq/put :q 2)]) + + (is (= 1 (tq/consume-expect! :q :done))) + (tq/consume-expect! :q :error))) + + +(def ^:dynamic *some-binding* nil) + + +(deftest binding-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :capture-bindings [#'*some-binding* #'timbre/*context*]}) + (yq/add-consumer! :q (fn [_] *some-binding*)) + (binding [timbre/*context* {:x-request-id "wooho"}] + (binding [*some-binding* 1] + @(d/transact conn [(yq/put :q nil)])) + (binding [*some-binding* 2] + @(d/transact conn [(yq/put :q nil)])) + @(d/transact conn [(yq/put :q nil)])) + + (is (= 1 (tq/consume-expect! :q :done))) + (is (= 2 (tq/consume-expect! :q :done))) + (is (nil? (tq/consume-expect! :q :done))))) + + +(deftest default-binding-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*))) + (binding [timbre/*context* {:x-request-id "123"}] + @(d/transact conn [(yq/put :q nil)])) + (is (= "123" (tq/consume-expect! :q :done))))) + + +(deftest force-retry-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (let [cnt (atom 0)] + (fn [_] (swap! cnt inc)))) + @(d/transact conn [(yq/put :q nil)]) + (is (= 1 (tq/consume! :q))) + (is (= 2 (tq/force-retry! :q))) + (is (= 3 (tq/force-retry! :q))))) + + +(deftest ext-id-no-duplicates + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q nil {:id "123"})]) + (is (thrown? Exception @(d/transact conn [(yq/put :q nil {:id "123"})]))))) + + +(deftest depends-on + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity) + @(d/transact conn [(yq/put :a {:id "a1"} {:id "a1"})]) + @(d/transact conn [(yq/put :b {:id "b1"} {:depends-on [:a "a1"]})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "a1"]} (tq/consume! :b))) + + (is (= {:id "a1"} (tq/consume! :a))) + (is (= {:id "b1"} (tq/consume! :b))))) + + +(deftest depends-on-queue-level + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity {:depends-on (fn [{:keys [id]}] [:a id])}) + @(d/transact conn [(yq/put :a {:id "1"} {:id "1"})]) + @(d/transact conn [(yq/put :b {:id "1"})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "1"]} (tq/consume! :b))) + + (is (= {:id "1"} (tq/consume! :a))) + (is (= {:id "1"} (tq/consume! :b))))) + + +(deftest verify-can-read-string + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})])))))) + + +(deftest payload-verifier + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity + {:valid-payload? (fn [{:keys [id]}] + (some? id))}) + @(d/transact conn [(yq/put :q {:id "a"})]) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + + +(defn my-consumer + {:yoltq/queue-id :some-q} + [state payload] + (swap! state conj payload)) + +(deftest queue-id-can-be-var + (let [conn (u/empty-conn) + received (atom #{})] + (yq/init! {:conn conn}) + (yq/add-consumer! #'my-consumer (partial my-consumer received)) + @(d/transact conn [(yq/put #'my-consumer {:id "a"})]) + (tq/consume! :some-q) + (is (= #{{:id "a"}} @received)) + #_(timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + +(deftest healthy-allowed-error-time-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (throw (ex-info "" {})))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume-expect! :q :error) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms)))) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms))))) + (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms))))))) + +(deftest global-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest queue-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:encode nippy/freeze + :decode nippy/thaw}) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest global-partition + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :partition-fn (fn [_queue-name] :my-part)}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest partition-per-queue + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:partition-fn (fn [_queue-name] :my-part)}) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest string-encode-decode + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :encode (fn [x] (str/join (reverse x))) + :decode (fn [x] (str/join (reverse x)))}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q "asdf")]) + (tq/consume! :q) + (is (= @got-work "asdf")))) + +(deftest job-group-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q1 identity) + (yq/add-consumer! :q2 identity) + @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1}) + (yq/put :q1 {:work 456} {:job-group :group2}) + (yq/put :q2 {:work 789} {:job-group :group1})]) + (is (= [{:qname :q1 + :job-group :group1 + :status :init + :count 1}] + (yq/job-group-progress :q1 :group1))) + + (is (= {:work 123} (tq/consume! :q1))) + + (is (= [{:qname :q1 + :job-group :group1 + :status :done + :count 1}] + (yq/job-group-progress :q1 :group1))))) |
