aboutsummaryrefslogtreecommitdiff
path: root/test/com/github/ivarref/yoltq/virtual_test.clj
diff options
context:
space:
mode:
Diffstat (limited to 'test/com/github/ivarref/yoltq/virtual_test.clj')
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj232
1 files changed, 232 insertions, 0 deletions
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
new file mode 100644
index 0000000..41d2461
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -0,0 +1,232 @@
+(ns com.github.ivarref.yoltq.virtual-test
+ (:require [datomic-schema.core]
+ [clojure.test :refer :all]
+ [com.github.ivarref.yoltq.virtual-queue :as vq]
+ [com.github.ivarref.yoltq :as dq]
+ [com.github.ivarref.yoltq.test-utils :as u]
+ [datomic.api :as d]
+ [com.github.ivarref.yoltq.utils :as uu]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq :as yq]))
+
+
+(use-fixtures :each vq/call-with-virtual-queue!)
+
+
+(deftest happy-case-1
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q identity)
+ @(d/transact conn [(dq/put :q {:work 123})])
+ (is (= {:work 123} (:retval (vq/run-queue-once! :q :init))))))
+
+
+(deftest happy-case-tx-report-q
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q identity)
+ @(d/transact conn [(dq/put :q {:work 123})])
+ (is (= {:work 123} (:retval (vq/run-one-report-queue!))))
+ (is (= 1 (u/done-count)))))
+
+
+(deftest happy-case-poller
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :handlers {:q {:f (fn [payload] payload)}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (= {:work 123} (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :retval)))))
+
+
+(deftest happy-case-queue-fn-allow-cas-failure
+ (let [conn (u/empty-conn)
+ invoke-count (atom 0)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q
+ (fn [{:keys [id]}]
+ (swap! invoke-count inc)
+ @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]]))
+ {:allow-cas-failure? #{:e/val}})
+ @(d/transact conn #d/schema [[:e/id :one :string :id]
+ [:e/val :one :string]])
+ @(d/transact conn [{:e/id "demo"}
+ (dq/put :q {:id "demo"})])
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (is (nil? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!))))
+ (swap! dq/*config* dissoc :mark-status-fn!)
+
+ ; (mark-status! :done) failed but f succeeded.
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+ (is (some? (u/get-hung :q)))
+ (is (true? (some->> (u/get-hung :q)
+ (u/take!)
+ (u/execute!)
+ :allow-cas-failure?)))
+ (is (= 2 @invoke-count))))
+
+
+(deftest backoff-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (is (nil? (u/get-init :q)))
+
+ (u/advance! (dec (:init-backoff-time yq/default-opts)))
+ (is (nil? (u/get-init :q)))
+ (u/advance! 1)
+ (is (some? (u/get-init :q)))
+
+ (is (some? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :exception)))
+
+ (u/advance! (dec (:error-backoff-time @yq/*config*)))
+ (is (nil? (u/get-error :q)))
+ (u/advance! 1)
+ (is (some? (u/get-error :q)))))
+
+
+(deftest get-hung-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (is (= :processing (some->> (u/get-init :q)
+ (u/take!)
+ :com.github.ivarref.yoltq/status)))
+
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (dec (:hung-backoff-time yq/default-opts)))
+ (is (nil? (u/get-hung :q)))
+ (u/advance! 1)
+ (is (some? (u/get-hung :q)))))
+
+
+(deftest basic-locking
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :cas-failures (atom 0)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (let [job (u/get-init :q)]
+ (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status)))
+ (u/take! job)
+ (is (= 1 @(:cas-failures @dq/*config*))))))
+
+
+(deftest retry-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f
+ (let [c (atom 0)]
+ (fn [_]
+ (if (<= (swap! c inc) 2)
+ (throw (ex-info "janei" {}))
+ ::ok)))}}})
+ (u/put-transact! :q {:work 123})
+
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))))
+
+
+(deftest max-retries-test
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (dq/init! {:conn conn
+ :error-backoff-time 0})
+ (dq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 1})
+ (vq/put! :q {:work 123})
+ (is (some? (:exception (vq/run-one-report-queue!))))
+
+ (dotimes [_ 10]
+ (vq/run-queue-once! :q :error))
+ (is (= 2 @call-count))))
+
+
+(deftest max-retries-test-two
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (dq/init! {:conn conn
+ :error-backoff-time 0})
+ (dq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 3})
+ (vq/put! :q {:work 123})
+ (is (some? (:exception (vq/run-one-report-queue!))))
+
+ (dotimes [_ 20]
+ (vq/run-queue-once! :q :error))
+ (is (= 4 @call-count))))
+
+
+(deftest hung-to-error
+ (let [conn (u/empty-conn)
+ call-count (atom 0)
+ missed-mark-status (atom nil)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q
+ (fn [_]
+ (if (= 1 (swap! call-count inc))
+ (throw (ex-info "error" {}))
+ (log/info "return OK"))))
+ (vq/put! :q {:id "demo"})
+ (vq/run-one-report-queue!) ; now in status :error
+
+
+ (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (reset! missed-mark-status new-status)
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (vq/run-queue-once! :q :error)
+ (swap! dq/*config* dissoc :mark-status-fn!)
+ (is (= :done @missed-mark-status))
+
+ (is (nil? (uu/get-hung @dq/*config* :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+
+ (is (some? (uu/get-hung @dq/*config* :q)))
+
+ (is (= 2 @call-count))
+
+ (is (true? (some->> (uu/get-hung (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q)
+ (i/take! @dq/*config*)
+ (i/execute! @dq/*config*)
+ :failed?)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (uu/get-error @dq/*config* :q)))
+ (is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q)))))
+
+