diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/com/github/ivarref/yoltq/error_poller_test.clj | 35 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/http_hang_demo.clj | 45 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/log_init.clj | 66 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/migrate_test.clj | 92 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/readme_demo.clj | 48 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/test_utils.clj | 80 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/virtual_test.clj | 474 |
7 files changed, 840 insertions, 0 deletions
diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj new file mode 100644 index 0000000..4d92b81 --- /dev/null +++ b/test/com/github/ivarref/yoltq/error_poller_test.clj @@ -0,0 +1,35 @@ +(ns com.github.ivarref.yoltq.error-poller-test + (:require [clojure.edn :as edn] + [clojure.test :refer [deftest is]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.error-poller :as ep] + [com.github.ivarref.yoltq.log-init :as logconfig])) + + +(deftest error-poller + (logconfig/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"*"} (edn/read-string + (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]]) + (let [cfg {:system-error-callback-backoff 100} + time (atom 0) + tick! (fn [& [amount]] + (swap! time + (or amount 1))) + verify (fn [state now-ns error-count expected-callback] + (let [{:keys [errors state run-callback] :as res} (ep/handle-error-count state cfg now-ns error-count)] + (log/info errors "=>" state "::" run-callback) + (is (= expected-callback run-callback)) + res))] + (-> {} + (verify (tick!) 0 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 :error) + (verify (tick! 100) 0 nil) + (verify (tick!) 0 :error) + (verify (tick!) 0 :recovery) + (verify (tick!) 1 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 :error) + (verify (tick! 100) 1 nil) + (verify (tick!) 1 :error)))) diff --git a/test/com/github/ivarref/yoltq/http_hang_demo.clj b/test/com/github/ivarref/yoltq/http_hang_demo.clj new file mode 100644 index 0000000..06d877b --- /dev/null +++ b/test/com/github/ivarref/yoltq/http_hang_demo.clj @@ -0,0 +1,45 @@ +(ns com.github.ivarref.yoltq.http-hang-demo + (:require [datomic.api :as d] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.log-init]) + (:import (java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers))) + +(comment + (do + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq*"} :debug] + [#{"*"} :info]]) + (yq/stop!) + (let [received (atom []) + uri (str "datomic:mem://hello-world-" (java.util.UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri)] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 5) + :poll-delay 5 + :system-error-poll-interval [1 TimeUnit/MINUTES] + :system-error-callback-backoff (Duration/ofHours 1) + :max-execute-time (Duration/ofSeconds 3) + :on-system-error (fn [] (log/error "system in error state")) + :on-system-recovery (fn [] (log/info "system recovered"))}) + (yq/add-consumer! :slow (fn [_payload] + (log/info "start slow handler...") + ; sudo tc qdisc del dev wlp3s0 root netem + ; sudo tc qdisc add dev wlp3s0 root netem delay 10000ms + ; https://jvns.ca/blog/2017/04/01/slow-down-your-internet-with-tc/ + (let [request (-> (HttpRequest/newBuilder) + (.uri (java.net.URI. "https://postman-echo.com/get")) + (.timeout (Duration/ofSeconds 10)) + (.GET) + (.build))] + (log/info "body is:" (-> (.send (HttpClient/newHttpClient) request (HttpResponse$BodyHandlers/ofString)) + (.body)))) + (log/info "slow handler is done!"))) + (yq/start!) + @(d/transact conn [(put :slow {:work 123})]) + #_(dotimes [x 1] @(d/transact conn [(put :q {:work x})])) + nil))))
\ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj new file mode 100644 index 0000000..7eae557 --- /dev/null +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -0,0 +1,66 @@ +(ns com.github.ivarref.yoltq.log-init + (:require [clojure.term.colors :as colors] + [taoensso.timbre :as timbre] + [clojure.string :as str])) + +(set! *warn-on-reflection* true) + +(def level-colors + {;:warn colors/red + :error colors/red}) + +(def ^:dynamic *override-color* nil) + +(defn min-length [n s] + (loop [s s] + (if (>= (count s) n) + s + (recur (str s " "))))) + +(defn local-console-format-fn + "A simpler log format, suitable for readable logs during development. colorized stacktraces" + [data] + (try + (let [{:keys [level ?err msg_ ?ns-str ?file hostname_ + timestamp_ ?line context]} data + ts (force timestamp_)] + (let [color-f (if (nil? *override-color*) + (get level-colors level str) + colors/green) + maybe-stacktrace (when ?err + (str "\n" (timbre/stacktrace ?err)))] + (cond-> (str #_(str ?ns-str ":" ?line) + #_(min-length (count "[yoltq:326] ") + (str + "[" + (some-> ?ns-str + (str/split #"\.") + (last)) + ":" ?line)) + ts + " " + (color-f (min-length 5 (str/upper-case (name level)))) + " " + + (when-let [x-req-id (:x-request-id context)] + (str "[" x-req-id "] ")) + #_(.getName ^Thread (Thread/currentThread)) + + (color-f (force msg_)) + + maybe-stacktrace)))) + + + (catch Throwable t + (println "error in local-console-format-fn:" (ex-message t)) + nil))) + + +(defn init-logging! [min-levels] + (timbre/merge-config! + {:min-level min-levels + :timestamp-opts {:pattern "HH:mm:ss.SSS" + :timezone :jvm-default} + :output-fn (fn [data] (local-console-format-fn data)) + :appenders {:println (timbre/println-appender {:stream :std-out})}})) + diff --git a/test/com/github/ivarref/yoltq/migrate_test.clj b/test/com/github/ivarref/yoltq/migrate_test.clj new file mode 100644 index 0000000..0063631 --- /dev/null +++ b/test/com/github/ivarref/yoltq/migrate_test.clj @@ -0,0 +1,92 @@ +(ns com.github.ivarref.yoltq.migrate-test + (:require [clojure.test :refer [deftest is]] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.migrate :as m] + [com.github.ivarref.yoltq.impl :as impl] + [com.github.ivarref.yoltq.test-utils :as tu] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d])) + + +(deftest to-v2-migration + (with-bindings {#'ext/*squuid-atom* (atom 0)} + (let [conn (tu/empty-conn)] + @(d/transact conn impl/schema) + @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid) + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-processing + :com.github.ivarref.yoltq/init-time 1 + :com.github.ivarref.yoltq/processing-time 2}]) + @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid) + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/init-time 3}]) + (is (= [[[:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/version + nil + "2"] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/init-time + 1 + 1000] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/processing-time + 2 + 1000]] + [[:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000002"] + :com.github.ivarref.yoltq/version + nil + "2"] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000002"] + :com.github.ivarref.yoltq/init-time + 3 + 1000]]] + (m/migrate! {:conn conn + :now-ms 1000 + :loop? true}))) + (is (= [] + (m/migrate! {:conn conn + :now-ms 1000 + :loop? true})))))) + + +(deftest to-v2-migration-real-time + (with-bindings {#'ext/*squuid-atom* (atom 0)} + (let [conn (tu/empty-conn) + id (u/squuid)] + @(d/transact conn impl/schema) + @(d/transact conn [{:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/init-time 1}]) + (Thread/sleep 100) + @(d/transact conn [{:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/init-time 2}]) + (let [tx-times (->> (d/q '[:find [?txinst ...] + :in $ ?e + :where + [?e :com.github.ivarref.yoltq/init-time _ ?tx true] + [?tx :db/txInstant ?txinst]] + (d/history (d/db conn)) + [:com.github.ivarref.yoltq/id id]) + (sort) + (vec))] + (is (= 2 (count tx-times))) + (m/migrate! {:conn conn}) + (is (= (.getTime (last tx-times)) + (d/q '[:find ?init-time . + :in $ ?e + :where + [?e :com.github.ivarref.yoltq/init-time ?init-time]] + (d/db conn) + [:com.github.ivarref.yoltq/id id]))))))) diff --git a/test/com/github/ivarref/yoltq/readme_demo.clj b/test/com/github/ivarref/yoltq/readme_demo.clj new file mode 100644 index 0000000..eae0a3e --- /dev/null +++ b/test/com/github/ivarref/yoltq/readme_demo.clj @@ -0,0 +1,48 @@ +(ns com.github.ivarref.yoltq.readme-demo + (:require [clojure.tools.logging :as log] + [datomic.api :as d] + [com.github.ivarref.yoltq :as yq]) + (:import (java.util UUID))) + + +(defonce conn + (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (d/connect uri))) + + +(com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq*"} :debug] + [#{"*"} :info]]) + + +(yq/stop!) + +(yq/init! {:conn conn}) + + +(yq/add-consumer! :q + (let [cnt (atom 0)] + (fn [payload] + (when (= 1 (swap! cnt inc)) + (throw (ex-info "failed" {}))) + (log/info "got payload" payload)))) + +(yq/start!) + +@(d/transact conn [(yq/put :q {:work 123})]) + +(comment + (yq/add-consumer! :q (fn [_] (throw (ex-info "always fail" {}))))) + +(comment + @(d/transact conn [(yq/put :q {:work 123})])) + +(comment + (do + (yq/add-consumer! :q (fn [_] :ok)) + nil)) diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj new file mode 100644 index 0000000..0c1b2f0 --- /dev/null +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -0,0 +1,80 @@ +(ns com.github.ivarref.yoltq.test-utils + (:require [com.github.ivarref.yoltq.log-init :as logconfig] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq :as yq] + [datomic.api :as d] + [clojure.string :as str] + [com.github.ivarref.yoltq.impl :as i] + [clojure.edn :as edn] + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.pprint :as pp]) + (:import (java.util UUID) + (java.time Duration))) + + +(logconfig/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"*"} (edn/read-string + (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]]) + + +(defn empty-conn [] + (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (d/connect uri))) + + +(defn break [] + (log/info (str/join "*" (repeat 60 "")))) + + +(defn clear [] + (.print System/out "\033[H\033[2J") + (.flush System/out) + (break)) + + +(defn put-transact! [id payload] + @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload {})])) + + +(defn advance! [tp] + (assert (some? ext/*now-ms-atom*) "Expected to be running in test-mode!") + (swap! ext/*now-ms-atom* + (if (number? tp) + tp + (.toMillis ^Duration tp)))) + + +(defn done-count [] + (d/q '[:find (count ?e) . + :where + [?e :com.github.ivarref.yoltq/id _] + [?e :com.github.ivarref.yoltq/status :done]] + (d/db (:conn @yq/*config*)))) + + +(defn pp [x] + (pp/pprint x) + x) + +(defn get-init [& args] + (apply u/get-init @yq/*config* args)) + + +(defn get-error [& args] + (apply u/get-error @yq/*config* args)) + + +(defn get-hung [& args] + (apply u/get-hung @yq/*config* args)) + + +(defn take! [& args] + (apply i/take! @yq/*config* args)) + + +(defn execute! [& args] + (apply i/execute! @yq/*config* args)) + diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj new file mode 100644 index 0000000..a2ed269 --- /dev/null +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -0,0 +1,474 @@ +(ns com.github.ivarref.yoltq.virtual-test + (:require + [clojure.string :as str] + [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.error-poller :as error-poller] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.test-queue :as tq] + [com.github.ivarref.yoltq.test-utils :as u] + [com.github.ivarref.yoltq.utils :as uu] + [datomic-schema.core] + [datomic.api :as d] + [taoensso.nippy :as nippy] + [taoensso.timbre :as timbre]) + (:import (java.time Duration LocalDateTime))) + + +(use-fixtures :each tq/call-with-virtual-queue!) + + +(deftest happy-case-1 + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))))) + +(deftest happy-case-no-migration-for-new-entities + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))) + (is (= [] (migrate/migrate! @yq/*config*))))) + +(deftest happy-case-tx-report-q + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (:retval (tq/run-one-report-queue!)))) + (is (= 1 (u/done-count))))) + + +(deftest happy-case-poller + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :handlers {:q {:f (fn [payload] payload)}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (= {:work 123} (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :retval))))) + + +(deftest happy-case-queue-fn-allow-cas-failure + (let [conn (u/empty-conn) + invoke-count (atom 0)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q + (fn [{:keys [id]}] + (swap! invoke-count inc) + @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]])) + {:allow-cas-failure? #{:e/val}}) + @(d/transact conn #d/schema [[:e/id :one :string :id] + [:e/val :one :string]]) + @(d/transact conn [{:e/id "demo"} + (yq/put :q {:id "demo"})]) + (u/advance! (:init-backoff-time yq/default-opts)) + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (log/info "mark-status! doing nothing for new status" new-status))) + (is (nil? (some->> (u/get-init :q) + (u/take!) + (u/execute!)))) + (swap! yq/*config* dissoc :mark-status-fn!) + + ; (mark-status! :done) failed but f succeeded. + (is (nil? (u/get-hung :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + (is (some? (u/get-hung :q))) + (is (true? (some->> (u/get-hung :q) + (u/take!) + (u/execute!) + :allow-cas-failure?))) + (is (= 2 @invoke-count)))) + + +(deftest backoff-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (is (nil? (u/get-init :q))) + + (u/advance! (dec (:init-backoff-time yq/default-opts))) + (is (nil? (u/get-init :q))) + (u/advance! 1) + (is (some? (u/get-init :q))) + + (is (some? (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :exception))) + + (u/advance! (dec (:error-backoff-time @yq/*config*))) + (is (nil? (u/get-error :q))) + (u/advance! 1) + (is (some? (u/get-error :q))))) + + +(deftest get-hung-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (is (= :processing (some->> (u/get-init :q) + (u/take!) + :com.github.ivarref.yoltq/status))) + + (is (nil? (u/get-hung :q))) + (u/advance! (dec (:hung-backoff-time yq/default-opts))) + (is (nil? (u/get-hung :q))) + (u/advance! 1) + (is (some? (u/get-hung :q))))) + + +(deftest basic-locking + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :cas-failures (atom 0) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (let [job (u/get-init :q)] + (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status))) + (u/take! job) + (is (= 1 @(:cas-failures @yq/*config*)))))) + + +(deftest retry-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f + (let [c (atom 0)] + (fn [_] + (if (<= (swap! c inc) 2) + (throw (ex-info "janei" {})) + ::ok)))}}}) + (u/put-transact! :q {:work 123}) + + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))))) + + +(deftest max-retries-test + (let [conn (u/empty-conn) + call-count (atom 0)] + (yq/init! {:conn conn + :error-backoff-time 0}) + (yq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 1}) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) + + (dotimes [_ 10] + (tq/run-queue-once! :q :error)) + (is (= 2 @call-count)))) + + +(deftest max-retries-test-two + (let [conn (u/empty-conn) + call-count (atom 0)] + (yq/init! {:conn conn + :error-backoff-time 0}) + (yq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 3}) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) + + (timbre/with-level :fatal + (dotimes [_ 20] + (tq/run-queue-once! :q :error))) + (is (= 4 @call-count)))) + + +(deftest hung-to-error + (let [conn (u/empty-conn) + call-count (atom 0) + missed-mark-status (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q + (fn [_] + (if (= 1 (swap! call-count inc)) + (throw (ex-info "error" {})) + (log/info "return OK")))) + (tq/put! :q {:id "demo"}) + (tq/run-one-report-queue!) ; now in status :error + + + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (reset! missed-mark-status new-status) + (log/info "mark-status! doing nothing for new status" new-status))) + (u/advance! (:error-backoff-time @yq/*config*)) + (tq/run-queue-once! :q :error) + (swap! yq/*config* dissoc :mark-status-fn!) + (is (= :done @missed-mark-status)) + + (is (nil? (uu/get-hung @yq/*config* :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + + (is (some? (uu/get-hung @yq/*config* :q))) + + (is (= 2 @call-count)) + + (is (true? (some->> (uu/get-hung (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q) + (i/take! @yq/*config*) + (i/execute! @yq/*config*) + :failed?))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (uu/get-error @yq/*config* :q))) + (is (nil? (uu/get-error (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q))))) + + +(deftest consume-expect-test + (let [conn (u/empty-conn) + seen (atom #{})] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [payload] + (when (= #{1 2} (swap! seen conj payload)) + (throw (ex-info "oops" {}))) + payload)) + + @(d/transact conn [(yq/put :q 1)]) + @(d/transact conn [(yq/put :q 2)]) + + (is (= 1 (tq/consume-expect! :q :done))) + (tq/consume-expect! :q :error))) + + +(def ^:dynamic *some-binding* nil) + + +(deftest binding-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :capture-bindings [#'*some-binding* #'timbre/*context*]}) + (yq/add-consumer! :q (fn [_] *some-binding*)) + (binding [timbre/*context* {:x-request-id "wooho"}] + (binding [*some-binding* 1] + @(d/transact conn [(yq/put :q nil)])) + (binding [*some-binding* 2] + @(d/transact conn [(yq/put :q nil)])) + @(d/transact conn [(yq/put :q nil)])) + + (is (= 1 (tq/consume-expect! :q :done))) + (is (= 2 (tq/consume-expect! :q :done))) + (is (nil? (tq/consume-expect! :q :done))))) + + +(deftest default-binding-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*))) + (binding [timbre/*context* {:x-request-id "123"}] + @(d/transact conn [(yq/put :q nil)])) + (is (= "123" (tq/consume-expect! :q :done))))) + + +(deftest force-retry-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (let [cnt (atom 0)] + (fn [_] (swap! cnt inc)))) + @(d/transact conn [(yq/put :q nil)]) + (is (= 1 (tq/consume! :q))) + (is (= 2 (tq/force-retry! :q))) + (is (= 3 (tq/force-retry! :q))))) + + +(deftest ext-id-no-duplicates + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q nil {:id "123"})]) + (is (thrown? Exception @(d/transact conn [(yq/put :q nil {:id "123"})]))))) + + +(deftest depends-on + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity) + @(d/transact conn [(yq/put :a {:id "a1"} {:id "a1"})]) + @(d/transact conn [(yq/put :b {:id "b1"} {:depends-on [:a "a1"]})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "a1"]} (tq/consume! :b))) + + (is (= {:id "a1"} (tq/consume! :a))) + (is (= {:id "b1"} (tq/consume! :b))))) + + +(deftest depends-on-queue-level + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity {:depends-on (fn [{:keys [id]}] [:a id])}) + @(d/transact conn [(yq/put :a {:id "1"} {:id "1"})]) + @(d/transact conn [(yq/put :b {:id "1"})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "1"]} (tq/consume! :b))) + + (is (= {:id "1"} (tq/consume! :a))) + (is (= {:id "1"} (tq/consume! :b))))) + + +(deftest verify-can-read-string + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})])))))) + + +(deftest payload-verifier + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity + {:valid-payload? (fn [{:keys [id]}] + (some? id))}) + @(d/transact conn [(yq/put :q {:id "a"})]) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + + +(defn my-consumer + {:yoltq/queue-id :some-q} + [state payload] + (swap! state conj payload)) + +(deftest queue-id-can-be-var + (let [conn (u/empty-conn) + received (atom #{})] + (yq/init! {:conn conn}) + (yq/add-consumer! #'my-consumer (partial my-consumer received)) + @(d/transact conn [(yq/put #'my-consumer {:id "a"})]) + (tq/consume! :some-q) + (is (= #{{:id "a"}} @received)) + #_(timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + +(deftest healthy-allowed-error-time-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (throw (ex-info "" {})))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume-expect! :q :error) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms)))) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms))))) + (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms))))))) + +(deftest global-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest queue-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:encode nippy/freeze + :decode nippy/thaw}) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest global-partition + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :partition-fn (fn [_queue-name] :my-part)}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest partition-per-queue + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:partition-fn (fn [_queue-name] :my-part)}) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest string-encode-decode + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :encode (fn [x] (str/join (reverse x))) + :decode (fn [x] (str/join (reverse x)))}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q "asdf")]) + (tq/consume! :q) + (is (= @got-work "asdf")))) + +(deftest job-group-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q1 identity) + (yq/add-consumer! :q2 identity) + @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1}) + (yq/put :q1 {:work 456} {:job-group :group2}) + (yq/put :q2 {:work 789} {:job-group :group1})]) + (is (= [{:qname :q1 + :job-group :group1 + :status :init + :count 1}] + (yq/job-group-progress :q1 :group1))) + + (is (= {:work 123} (tq/consume! :q1))) + + (is (= [{:qname :q1 + :job-group :group1 + :status :done + :count 1}] + (yq/job-group-progress :q1 :group1))))) |
