aboutsummaryrefslogtreecommitdiff
path: root/test/com/github
diff options
context:
space:
mode:
Diffstat (limited to 'test/com/github')
-rw-r--r--test/com/github/ivarref/yoltq/error_poller_test.clj35
-rw-r--r--test/com/github/ivarref/yoltq/http_hang_demo.clj45
-rw-r--r--test/com/github/ivarref/yoltq/log_init.clj66
-rw-r--r--test/com/github/ivarref/yoltq/migrate_test.clj92
-rw-r--r--test/com/github/ivarref/yoltq/readme_demo.clj48
-rw-r--r--test/com/github/ivarref/yoltq/test_utils.clj80
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj474
7 files changed, 840 insertions, 0 deletions
diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj
new file mode 100644
index 0000000..4d92b81
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/error_poller_test.clj
@@ -0,0 +1,35 @@
+(ns com.github.ivarref.yoltq.error-poller-test
+ (:require [clojure.edn :as edn]
+ [clojure.test :refer [deftest is]]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.error-poller :as ep]
+ [com.github.ivarref.yoltq.log-init :as logconfig]))
+
+
+(deftest error-poller
+ (logconfig/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"*"} (edn/read-string
+ (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]])
+ (let [cfg {:system-error-callback-backoff 100}
+ time (atom 0)
+ tick! (fn [& [amount]]
+ (swap! time + (or amount 1)))
+ verify (fn [state now-ns error-count expected-callback]
+ (let [{:keys [errors state run-callback] :as res} (ep/handle-error-count state cfg now-ns error-count)]
+ (log/info errors "=>" state "::" run-callback)
+ (is (= expected-callback run-callback))
+ res))]
+ (-> {}
+ (verify (tick!) 0 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 :error)
+ (verify (tick! 100) 0 nil)
+ (verify (tick!) 0 :error)
+ (verify (tick!) 0 :recovery)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 :error)
+ (verify (tick! 100) 1 nil)
+ (verify (tick!) 1 :error))))
diff --git a/test/com/github/ivarref/yoltq/http_hang_demo.clj b/test/com/github/ivarref/yoltq/http_hang_demo.clj
new file mode 100644
index 0000000..06d877b
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/http_hang_demo.clj
@@ -0,0 +1,45 @@
+(ns com.github.ivarref.yoltq.http-hang-demo
+ (:require [datomic.api :as d]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.log-init])
+ (:import (java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers)))
+
+(comment
+ (do
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq*"} :debug]
+ [#{"*"} :info]])
+ (yq/stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://hello-world-" (java.util.UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)]
+ (init! {:conn conn
+ :error-backoff-time (Duration/ofSeconds 5)
+ :poll-delay 5
+ :system-error-poll-interval [1 TimeUnit/MINUTES]
+ :system-error-callback-backoff (Duration/ofHours 1)
+ :max-execute-time (Duration/ofSeconds 3)
+ :on-system-error (fn [] (log/error "system in error state"))
+ :on-system-recovery (fn [] (log/info "system recovered"))})
+ (yq/add-consumer! :slow (fn [_payload]
+ (log/info "start slow handler...")
+ ; sudo tc qdisc del dev wlp3s0 root netem
+ ; sudo tc qdisc add dev wlp3s0 root netem delay 10000ms
+ ; https://jvns.ca/blog/2017/04/01/slow-down-your-internet-with-tc/
+ (let [request (-> (HttpRequest/newBuilder)
+ (.uri (java.net.URI. "https://postman-echo.com/get"))
+ (.timeout (Duration/ofSeconds 10))
+ (.GET)
+ (.build))]
+ (log/info "body is:" (-> (.send (HttpClient/newHttpClient) request (HttpResponse$BodyHandlers/ofString))
+ (.body))))
+ (log/info "slow handler is done!")))
+ (yq/start!)
+ @(d/transact conn [(put :slow {:work 123})])
+ #_(dotimes [x 1] @(d/transact conn [(put :q {:work x})]))
+ nil)))) \ No newline at end of file
diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj
new file mode 100644
index 0000000..7eae557
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/log_init.clj
@@ -0,0 +1,66 @@
+(ns com.github.ivarref.yoltq.log-init
+ (:require [clojure.term.colors :as colors]
+ [taoensso.timbre :as timbre]
+ [clojure.string :as str]))
+
+(set! *warn-on-reflection* true)
+
+(def level-colors
+ {;:warn colors/red
+ :error colors/red})
+
+(def ^:dynamic *override-color* nil)
+
+(defn min-length [n s]
+ (loop [s s]
+ (if (>= (count s) n)
+ s
+ (recur (str s " ")))))
+
+(defn local-console-format-fn
+ "A simpler log format, suitable for readable logs during development. colorized stacktraces"
+ [data]
+ (try
+ (let [{:keys [level ?err msg_ ?ns-str ?file hostname_
+ timestamp_ ?line context]} data
+ ts (force timestamp_)]
+ (let [color-f (if (nil? *override-color*)
+ (get level-colors level str)
+ colors/green)
+ maybe-stacktrace (when ?err
+ (str "\n" (timbre/stacktrace ?err)))]
+ (cond-> (str #_(str ?ns-str ":" ?line)
+ #_(min-length (count "[yoltq:326] ")
+ (str
+ "["
+ (some-> ?ns-str
+ (str/split #"\.")
+ (last))
+ ":" ?line))
+ ts
+ " "
+ (color-f (min-length 5 (str/upper-case (name level))))
+ " "
+
+ (when-let [x-req-id (:x-request-id context)]
+ (str "[" x-req-id "] "))
+ #_(.getName ^Thread (Thread/currentThread))
+
+ (color-f (force msg_))
+
+ maybe-stacktrace))))
+
+
+ (catch Throwable t
+ (println "error in local-console-format-fn:" (ex-message t))
+ nil)))
+
+
+(defn init-logging! [min-levels]
+ (timbre/merge-config!
+ {:min-level min-levels
+ :timestamp-opts {:pattern "HH:mm:ss.SSS"
+ :timezone :jvm-default}
+ :output-fn (fn [data] (local-console-format-fn data))
+ :appenders {:println (timbre/println-appender {:stream :std-out})}}))
+
diff --git a/test/com/github/ivarref/yoltq/migrate_test.clj b/test/com/github/ivarref/yoltq/migrate_test.clj
new file mode 100644
index 0000000..0063631
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/migrate_test.clj
@@ -0,0 +1,92 @@
+(ns com.github.ivarref.yoltq.migrate-test
+ (:require [clojure.test :refer [deftest is]]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.migrate :as m]
+ [com.github.ivarref.yoltq.impl :as impl]
+ [com.github.ivarref.yoltq.test-utils :as tu]
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d]))
+
+
+(deftest to-v2-migration
+ (with-bindings {#'ext/*squuid-atom* (atom 0)}
+ (let [conn (tu/empty-conn)]
+ @(d/transact conn impl/schema)
+ @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid)
+ :com.github.ivarref.yoltq/queue-name :dummy
+ :com.github.ivarref.yoltq/status u/status-processing
+ :com.github.ivarref.yoltq/init-time 1
+ :com.github.ivarref.yoltq/processing-time 2}])
+ @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid)
+ :com.github.ivarref.yoltq/queue-name :dummy
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/init-time 3}])
+ (is (= [[[:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000001"]
+ :com.github.ivarref.yoltq/version
+ nil
+ "2"]
+ [:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000001"]
+ :com.github.ivarref.yoltq/init-time
+ 1
+ 1000]
+ [:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000001"]
+ :com.github.ivarref.yoltq/processing-time
+ 2
+ 1000]]
+ [[:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000002"]
+ :com.github.ivarref.yoltq/version
+ nil
+ "2"]
+ [:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000002"]
+ :com.github.ivarref.yoltq/init-time
+ 3
+ 1000]]]
+ (m/migrate! {:conn conn
+ :now-ms 1000
+ :loop? true})))
+ (is (= []
+ (m/migrate! {:conn conn
+ :now-ms 1000
+ :loop? true}))))))
+
+
+(deftest to-v2-migration-real-time
+ (with-bindings {#'ext/*squuid-atom* (atom 0)}
+ (let [conn (tu/empty-conn)
+ id (u/squuid)]
+ @(d/transact conn impl/schema)
+ @(d/transact conn [{:com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name :dummy
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/init-time 1}])
+ (Thread/sleep 100)
+ @(d/transact conn [{:com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/init-time 2}])
+ (let [tx-times (->> (d/q '[:find [?txinst ...]
+ :in $ ?e
+ :where
+ [?e :com.github.ivarref.yoltq/init-time _ ?tx true]
+ [?tx :db/txInstant ?txinst]]
+ (d/history (d/db conn))
+ [:com.github.ivarref.yoltq/id id])
+ (sort)
+ (vec))]
+ (is (= 2 (count tx-times)))
+ (m/migrate! {:conn conn})
+ (is (= (.getTime (last tx-times))
+ (d/q '[:find ?init-time .
+ :in $ ?e
+ :where
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]]
+ (d/db conn)
+ [:com.github.ivarref.yoltq/id id])))))))
diff --git a/test/com/github/ivarref/yoltq/readme_demo.clj b/test/com/github/ivarref/yoltq/readme_demo.clj
new file mode 100644
index 0000000..eae0a3e
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/readme_demo.clj
@@ -0,0 +1,48 @@
+(ns com.github.ivarref.yoltq.readme-demo
+ (:require [clojure.tools.logging :as log]
+ [datomic.api :as d]
+ [com.github.ivarref.yoltq :as yq])
+ (:import (java.util UUID)))
+
+
+(defonce conn
+ (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (d/connect uri)))
+
+
+(com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq*"} :debug]
+ [#{"*"} :info]])
+
+
+(yq/stop!)
+
+(yq/init! {:conn conn})
+
+
+(yq/add-consumer! :q
+ (let [cnt (atom 0)]
+ (fn [payload]
+ (when (= 1 (swap! cnt inc))
+ (throw (ex-info "failed" {})))
+ (log/info "got payload" payload))))
+
+(yq/start!)
+
+@(d/transact conn [(yq/put :q {:work 123})])
+
+(comment
+ (yq/add-consumer! :q (fn [_] (throw (ex-info "always fail" {})))))
+
+(comment
+ @(d/transact conn [(yq/put :q {:work 123})]))
+
+(comment
+ (do
+ (yq/add-consumer! :q (fn [_] :ok))
+ nil))
diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj
new file mode 100644
index 0000000..0c1b2f0
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/test_utils.clj
@@ -0,0 +1,80 @@
+(ns com.github.ivarref.yoltq.test-utils
+ (:require [com.github.ivarref.yoltq.log-init :as logconfig]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq :as yq]
+ [datomic.api :as d]
+ [clojure.string :as str]
+ [com.github.ivarref.yoltq.impl :as i]
+ [clojure.edn :as edn]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [clojure.pprint :as pp])
+ (:import (java.util UUID)
+ (java.time Duration)))
+
+
+(logconfig/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"*"} (edn/read-string
+ (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]])
+
+
+(defn empty-conn []
+ (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (d/connect uri)))
+
+
+(defn break []
+ (log/info (str/join "*" (repeat 60 ""))))
+
+
+(defn clear []
+ (.print System/out "\033[H\033[2J")
+ (.flush System/out)
+ (break))
+
+
+(defn put-transact! [id payload]
+ @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload {})]))
+
+
+(defn advance! [tp]
+ (assert (some? ext/*now-ms-atom*) "Expected to be running in test-mode!")
+ (swap! ext/*now-ms-atom* + (if (number? tp)
+ tp
+ (.toMillis ^Duration tp))))
+
+
+(defn done-count []
+ (d/q '[:find (count ?e) .
+ :where
+ [?e :com.github.ivarref.yoltq/id _]
+ [?e :com.github.ivarref.yoltq/status :done]]
+ (d/db (:conn @yq/*config*))))
+
+
+(defn pp [x]
+ (pp/pprint x)
+ x)
+
+(defn get-init [& args]
+ (apply u/get-init @yq/*config* args))
+
+
+(defn get-error [& args]
+ (apply u/get-error @yq/*config* args))
+
+
+(defn get-hung [& args]
+ (apply u/get-hung @yq/*config* args))
+
+
+(defn take! [& args]
+ (apply i/take! @yq/*config* args))
+
+
+(defn execute! [& args]
+ (apply i/execute! @yq/*config* args))
+
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
new file mode 100644
index 0000000..a2ed269
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -0,0 +1,474 @@
+(ns com.github.ivarref.yoltq.virtual-test
+ (:require
+ [clojure.string :as str]
+ [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.error-poller :as error-poller]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq.migrate :as migrate]
+ [com.github.ivarref.yoltq.test-queue :as tq]
+ [com.github.ivarref.yoltq.test-utils :as u]
+ [com.github.ivarref.yoltq.utils :as uu]
+ [datomic-schema.core]
+ [datomic.api :as d]
+ [taoensso.nippy :as nippy]
+ [taoensso.timbre :as timbre])
+ (:import (java.time Duration LocalDateTime)))
+
+
+(use-fixtures :each tq/call-with-virtual-queue!)
+
+
+(deftest happy-case-1
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (is (= {:work 123} (tq/consume! :q)))))
+
+(deftest happy-case-no-migration-for-new-entities
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (is (= {:work 123} (tq/consume! :q)))
+ (is (= [] (migrate/migrate! @yq/*config*)))))
+
+(deftest happy-case-tx-report-q
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (is (= {:work 123} (:retval (tq/run-one-report-queue!))))
+ (is (= 1 (u/done-count)))))
+
+
+(deftest happy-case-poller
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :handlers {:q {:f (fn [payload] payload)}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (= {:work 123} (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :retval)))))
+
+
+(deftest happy-case-queue-fn-allow-cas-failure
+ (let [conn (u/empty-conn)
+ invoke-count (atom 0)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q
+ (fn [{:keys [id]}]
+ (swap! invoke-count inc)
+ @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]]))
+ {:allow-cas-failure? #{:e/val}})
+ @(d/transact conn #d/schema [[:e/id :one :string :id]
+ [:e/val :one :string]])
+ @(d/transact conn [{:e/id "demo"}
+ (yq/put :q {:id "demo"})])
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (is (nil? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!))))
+ (swap! yq/*config* dissoc :mark-status-fn!)
+
+ ; (mark-status! :done) failed but f succeeded.
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+ (is (some? (u/get-hung :q)))
+ (is (true? (some->> (u/get-hung :q)
+ (u/take!)
+ (u/execute!)
+ :allow-cas-failure?)))
+ (is (= 2 @invoke-count))))
+
+
+(deftest backoff-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (is (nil? (u/get-init :q)))
+
+ (u/advance! (dec (:init-backoff-time yq/default-opts)))
+ (is (nil? (u/get-init :q)))
+ (u/advance! 1)
+ (is (some? (u/get-init :q)))
+
+ (is (some? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :exception)))
+
+ (u/advance! (dec (:error-backoff-time @yq/*config*)))
+ (is (nil? (u/get-error :q)))
+ (u/advance! 1)
+ (is (some? (u/get-error :q)))))
+
+
+(deftest get-hung-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (is (= :processing (some->> (u/get-init :q)
+ (u/take!)
+ :com.github.ivarref.yoltq/status)))
+
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (dec (:hung-backoff-time yq/default-opts)))
+ (is (nil? (u/get-hung :q)))
+ (u/advance! 1)
+ (is (some? (u/get-hung :q)))))
+
+
+(deftest basic-locking
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :cas-failures (atom 0)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (let [job (u/get-init :q)]
+ (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status)))
+ (u/take! job)
+ (is (= 1 @(:cas-failures @yq/*config*))))))
+
+
+(deftest retry-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f
+ (let [c (atom 0)]
+ (fn [_]
+ (if (<= (swap! c inc) 2)
+ (throw (ex-info "janei" {}))
+ ::ok)))}}})
+ (u/put-transact! :q {:work 123})
+
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))))
+
+
+(deftest max-retries-test
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (yq/init! {:conn conn
+ :error-backoff-time 0})
+ (yq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 1})
+ (tq/put! :q {:work 123})
+ (is (some? (:exception (tq/run-one-report-queue!))))
+
+ (dotimes [_ 10]
+ (tq/run-queue-once! :q :error))
+ (is (= 2 @call-count))))
+
+
+(deftest max-retries-test-two
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (yq/init! {:conn conn
+ :error-backoff-time 0})
+ (yq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 3})
+ (tq/put! :q {:work 123})
+ (is (some? (:exception (tq/run-one-report-queue!))))
+
+ (timbre/with-level :fatal
+ (dotimes [_ 20]
+ (tq/run-queue-once! :q :error)))
+ (is (= 4 @call-count))))
+
+
+(deftest hung-to-error
+ (let [conn (u/empty-conn)
+ call-count (atom 0)
+ missed-mark-status (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q
+ (fn [_]
+ (if (= 1 (swap! call-count inc))
+ (throw (ex-info "error" {}))
+ (log/info "return OK"))))
+ (tq/put! :q {:id "demo"})
+ (tq/run-one-report-queue!) ; now in status :error
+
+
+ (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (reset! missed-mark-status new-status)
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (tq/run-queue-once! :q :error)
+ (swap! yq/*config* dissoc :mark-status-fn!)
+ (is (= :done @missed-mark-status))
+
+ (is (nil? (uu/get-hung @yq/*config* :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+
+ (is (some? (uu/get-hung @yq/*config* :q)))
+
+ (is (= 2 @call-count))
+
+ (is (true? (some->> (uu/get-hung (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q)
+ (i/take! @yq/*config*)
+ (i/execute! @yq/*config*)
+ :failed?)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (uu/get-error @yq/*config* :q)))
+ (is (nil? (uu/get-error (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q)))))
+
+
+(deftest consume-expect-test
+ (let [conn (u/empty-conn)
+ seen (atom #{})]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [payload]
+ (when (= #{1 2} (swap! seen conj payload))
+ (throw (ex-info "oops" {})))
+ payload))
+
+ @(d/transact conn [(yq/put :q 1)])
+ @(d/transact conn [(yq/put :q 2)])
+
+ (is (= 1 (tq/consume-expect! :q :done)))
+ (tq/consume-expect! :q :error)))
+
+
+(def ^:dynamic *some-binding* nil)
+
+
+(deftest binding-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :capture-bindings [#'*some-binding* #'timbre/*context*]})
+ (yq/add-consumer! :q (fn [_] *some-binding*))
+ (binding [timbre/*context* {:x-request-id "wooho"}]
+ (binding [*some-binding* 1]
+ @(d/transact conn [(yq/put :q nil)]))
+ (binding [*some-binding* 2]
+ @(d/transact conn [(yq/put :q nil)]))
+ @(d/transact conn [(yq/put :q nil)]))
+
+ (is (= 1 (tq/consume-expect! :q :done)))
+ (is (= 2 (tq/consume-expect! :q :done)))
+ (is (nil? (tq/consume-expect! :q :done)))))
+
+
+(deftest default-binding-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*)))
+ (binding [timbre/*context* {:x-request-id "123"}]
+ @(d/transact conn [(yq/put :q nil)]))
+ (is (= "123" (tq/consume-expect! :q :done)))))
+
+
+(deftest force-retry-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (let [cnt (atom 0)]
+ (fn [_] (swap! cnt inc))))
+ @(d/transact conn [(yq/put :q nil)])
+ (is (= 1 (tq/consume! :q)))
+ (is (= 2 (tq/force-retry! :q)))
+ (is (= 3 (tq/force-retry! :q)))))
+
+
+(deftest ext-id-no-duplicates
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q nil {:id "123"})])
+ (is (thrown? Exception @(d/transact conn [(yq/put :q nil {:id "123"})])))))
+
+
+(deftest depends-on
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :a identity)
+ (yq/add-consumer! :b identity)
+ @(d/transact conn [(yq/put :a {:id "a1"} {:id "a1"})])
+ @(d/transact conn [(yq/put :b {:id "b1"} {:depends-on [:a "a1"]})])
+
+ ; can't consume :b yet:
+ (is (= {:depends-on [:a "a1"]} (tq/consume! :b)))
+
+ (is (= {:id "a1"} (tq/consume! :a)))
+ (is (= {:id "b1"} (tq/consume! :b)))))
+
+
+(deftest depends-on-queue-level
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :a identity)
+ (yq/add-consumer! :b identity {:depends-on (fn [{:keys [id]}] [:a id])})
+ @(d/transact conn [(yq/put :a {:id "1"} {:id "1"})])
+ @(d/transact conn [(yq/put :b {:id "1"})])
+
+ ; can't consume :b yet:
+ (is (= {:depends-on [:a "1"]} (tq/consume! :b)))
+
+ (is (= {:id "1"} (tq/consume! :a)))
+ (is (= {:id "1"} (tq/consume! :b)))))
+
+
+(deftest verify-can-read-string
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :a identity)
+ (timbre/with-level :fatal
+ (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})]))))))
+
+
+(deftest payload-verifier
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity
+ {:valid-payload? (fn [{:keys [id]}]
+ (some? id))})
+ @(d/transact conn [(yq/put :q {:id "a"})])
+ (timbre/with-level :fatal
+ (is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))
+
+
+(defn my-consumer
+ {:yoltq/queue-id :some-q}
+ [state payload]
+ (swap! state conj payload))
+
+(deftest queue-id-can-be-var
+ (let [conn (u/empty-conn)
+ received (atom #{})]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! #'my-consumer (partial my-consumer received))
+ @(d/transact conn [(yq/put #'my-consumer {:id "a"})])
+ (tq/consume! :some-q)
+ (is (= #{{:id "a"}} @received))
+ #_(timbre/with-level :fatal
+ (is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))
+
+(deftest healthy-allowed-error-time-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [_] (throw (ex-info "" {}))))
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume-expect! :q :error)
+ (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms))))
+ (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms)))))
+ (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms)))))))
+
+(deftest global-encode-decode
+ (let [conn (u/empty-conn)
+ ldt (LocalDateTime/now)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :encode nippy/freeze
+ :decode nippy/thaw})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q {:work ldt})])
+ (tq/consume! :q)
+ (is (= @got-work {:work ldt}))))
+
+(deftest queue-encode-decode
+ (let [conn (u/empty-conn)
+ ldt (LocalDateTime/now)
+ got-work (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work))
+ {:encode nippy/freeze
+ :decode nippy/thaw})
+ @(d/transact conn [(yq/put :q {:work ldt})])
+ (tq/consume! :q)
+ (is (= @got-work {:work ldt}))))
+
+(deftest global-partition
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :partition-fn (fn [_queue-name] :my-part)})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume! :q)
+ (is (some? (d/q '[:find ?e .
+ :in $ ?part
+ :where
+ [?e :db/ident ?part]]
+ (d/db conn)
+ :my-part)))
+ (is (= @got-work {:work 123}))))
+
+(deftest partition-per-queue
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work))
+ {:partition-fn (fn [_queue-name] :my-part)})
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume! :q)
+ (is (some? (d/q '[:find ?e .
+ :in $ ?part
+ :where
+ [?e :db/ident ?part]]
+ (d/db conn)
+ :my-part)))
+ (is (= @got-work {:work 123}))))
+
+(deftest string-encode-decode
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :encode (fn [x] (str/join (reverse x)))
+ :decode (fn [x] (str/join (reverse x)))})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q "asdf")])
+ (tq/consume! :q)
+ (is (= @got-work "asdf"))))
+
+(deftest job-group-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q1 identity)
+ (yq/add-consumer! :q2 identity)
+ @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1})
+ (yq/put :q1 {:work 456} {:job-group :group2})
+ (yq/put :q2 {:work 789} {:job-group :group1})])
+ (is (= [{:qname :q1
+ :job-group :group1
+ :status :init
+ :count 1}]
+ (yq/job-group-progress :q1 :group1)))
+
+ (is (= {:work 123} (tq/consume! :q1)))
+
+ (is (= [{:qname :q1
+ :job-group :group1
+ :status :done
+ :count 1}]
+ (yq/job-group-progress :q1 :group1)))))