aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorIvar Refsdal <ivar.refsdal@nsd.no>2021-09-04 13:23:07 +0200
committerIvar Refsdal <ivar.refsdal@nsd.no>2021-09-14 12:52:42 +0200
commitea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc (patch)
tree38db9a13c41576dd39a18ec4f4b2d498322a30c2 /test
downloadfiinha-ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc.tar.gz
fiinha-ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc.tar.xz
Initial commit
Add release script Release 0.1.3 Use com.github.ivarref.yoltq namespace Use com.github.ivarref.yoltq namespace
Diffstat (limited to 'test')
-rw-r--r--test/com/github/ivarref/yoltq/error_poller_test.clj35
-rw-r--r--test/com/github/ivarref/yoltq/http_hang_demo.clj45
-rw-r--r--test/com/github/ivarref/yoltq/log_init.clj61
-rw-r--r--test/com/github/ivarref/yoltq/readme_demo.clj48
-rw-r--r--test/com/github/ivarref/yoltq/test_utils.clj74
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj232
6 files changed, 495 insertions, 0 deletions
diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj
new file mode 100644
index 0000000..2e0873e
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/error_poller_test.clj
@@ -0,0 +1,35 @@
+(ns com.github.ivarref.yoltq.error-poller-test
+ (:require [clojure.test :refer :all]
+ [com.github.ivarref.yoltq.error-poller :as ep]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.log-init :as logconfig]
+ [clojure.edn :as edn]))
+
+
+(deftest error-poller
+ (logconfig/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"*"} (edn/read-string
+ (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]])
+ (let [cfg {:system-error-callback-backoff 100}
+ time (atom 0)
+ tick! (fn [& [amount]]
+ (swap! time + (or amount 1)))
+ verify (fn [state now-ns error-count expected-callback]
+ (let [{:keys [errors state run-callback] :as res} (ep/handle-error-count state cfg now-ns error-count)]
+ (log/info errors "=>" state "::" run-callback)
+ (is (= expected-callback run-callback))
+ res))]
+ (-> {}
+ (verify (tick!) 0 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 :error)
+ (verify (tick! 100) 0 nil)
+ (verify (tick!) 0 :error)
+ (verify (tick!) 0 :recovery)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 :error)
+ (verify (tick! 100) 1 nil)
+ (verify (tick!) 1 :error))))
diff --git a/test/com/github/ivarref/yoltq/http_hang_demo.clj b/test/com/github/ivarref/yoltq/http_hang_demo.clj
new file mode 100644
index 0000000..06d877b
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/http_hang_demo.clj
@@ -0,0 +1,45 @@
+(ns com.github.ivarref.yoltq.http-hang-demo
+ (:require [datomic.api :as d]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.log-init])
+ (:import (java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers)))
+
+(comment
+ (do
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq*"} :debug]
+ [#{"*"} :info]])
+ (yq/stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://hello-world-" (java.util.UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)]
+ (init! {:conn conn
+ :error-backoff-time (Duration/ofSeconds 5)
+ :poll-delay 5
+ :system-error-poll-interval [1 TimeUnit/MINUTES]
+ :system-error-callback-backoff (Duration/ofHours 1)
+ :max-execute-time (Duration/ofSeconds 3)
+ :on-system-error (fn [] (log/error "system in error state"))
+ :on-system-recovery (fn [] (log/info "system recovered"))})
+ (yq/add-consumer! :slow (fn [_payload]
+ (log/info "start slow handler...")
+ ; sudo tc qdisc del dev wlp3s0 root netem
+ ; sudo tc qdisc add dev wlp3s0 root netem delay 10000ms
+ ; https://jvns.ca/blog/2017/04/01/slow-down-your-internet-with-tc/
+ (let [request (-> (HttpRequest/newBuilder)
+ (.uri (java.net.URI. "https://postman-echo.com/get"))
+ (.timeout (Duration/ofSeconds 10))
+ (.GET)
+ (.build))]
+ (log/info "body is:" (-> (.send (HttpClient/newHttpClient) request (HttpResponse$BodyHandlers/ofString))
+ (.body))))
+ (log/info "slow handler is done!")))
+ (yq/start!)
+ @(d/transact conn [(put :slow {:work 123})])
+ #_(dotimes [x 1] @(d/transact conn [(put :q {:work x})]))
+ nil)))) \ No newline at end of file
diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj
new file mode 100644
index 0000000..cf69e55
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/log_init.clj
@@ -0,0 +1,61 @@
+(ns com.github.ivarref.yoltq.log-init
+ (:require [clojure.term.colors :as colors]
+ [taoensso.timbre :as timbre]
+ [clojure.string :as str]))
+
+(def level-colors
+ {;:warn colors/red
+ :error colors/red})
+
+(def ^:dynamic *override-color* nil)
+
+(defn min-length [n s]
+ (loop [s s]
+ (if (>= (count s) n)
+ s
+ (recur (str s " ")))))
+
+(defn local-console-format-fn
+ "A simpler log format, suitable for readable logs during development. colorized stacktraces"
+ [data]
+ (try
+ (let [{:keys [level ?err msg_ ?ns-str ?file hostname_
+ timestamp_ ?line context]} data
+ ts (force timestamp_)]
+ (let [color-f (if (nil? *override-color*)
+ (get level-colors level str)
+ colors/green)
+ maybe-stacktrace (when ?err
+ (str "\n" (timbre/stacktrace ?err)))]
+ (cond-> (str #_(str ?ns-str ":" ?line)
+ #_(min-length (count "[yoltq:326] ")
+ (str
+ "["
+ (some-> ?ns-str
+ (str/split #"\.")
+ (last))
+ ":" ?line))
+ ts
+ " "
+ (color-f (min-length 5 (str/upper-case (name level))))
+ " "
+ #_(.getName ^Thread (Thread/currentThread))
+
+ (color-f (force msg_))
+
+ #_maybe-stacktrace))))
+
+
+ (catch Throwable t
+ (println "error in local-console-format-fn:" (ex-message t))
+ nil)))
+
+
+(defn init-logging! [min-levels]
+ (timbre/merge-config!
+ {:min-level min-levels
+ :timestamp-opts {:pattern "HH:mm:ss.SSS"
+ :timezone :jvm-default}
+ :output-fn (fn [data] (local-console-format-fn data))
+ :appenders {:println (timbre/println-appender {:stream :std-out})}}))
+
diff --git a/test/com/github/ivarref/yoltq/readme_demo.clj b/test/com/github/ivarref/yoltq/readme_demo.clj
new file mode 100644
index 0000000..eae0a3e
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/readme_demo.clj
@@ -0,0 +1,48 @@
+(ns com.github.ivarref.yoltq.readme-demo
+ (:require [clojure.tools.logging :as log]
+ [datomic.api :as d]
+ [com.github.ivarref.yoltq :as yq])
+ (:import (java.util UUID)))
+
+
+(defonce conn
+ (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (d/connect uri)))
+
+
+(com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq*"} :debug]
+ [#{"*"} :info]])
+
+
+(yq/stop!)
+
+(yq/init! {:conn conn})
+
+
+(yq/add-consumer! :q
+ (let [cnt (atom 0)]
+ (fn [payload]
+ (when (= 1 (swap! cnt inc))
+ (throw (ex-info "failed" {})))
+ (log/info "got payload" payload))))
+
+(yq/start!)
+
+@(d/transact conn [(yq/put :q {:work 123})])
+
+(comment
+ (yq/add-consumer! :q (fn [_] (throw (ex-info "always fail" {})))))
+
+(comment
+ @(d/transact conn [(yq/put :q {:work 123})]))
+
+(comment
+ (do
+ (yq/add-consumer! :q (fn [_] :ok))
+ nil))
diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj
new file mode 100644
index 0000000..dacba68
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/test_utils.clj
@@ -0,0 +1,74 @@
+(ns com.github.ivarref.yoltq.test-utils
+ (:require [com.github.ivarref.yoltq.log-init :as logconfig]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq :as dq]
+ [datomic.api :as d]
+ [clojure.string :as str]
+ [com.github.ivarref.yoltq.impl :as i]
+ [clojure.edn :as edn]
+ [com.github.ivarref.yoltq.ext-sys :as ext])
+ (:import (java.util UUID)))
+
+
+(logconfig/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"*"} (edn/read-string
+ (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]])
+
+
+(defn empty-conn []
+ (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (d/connect uri)))
+
+
+(defn break []
+ (log/info (str/join "*" (repeat 60 ""))))
+
+
+(defn clear []
+ (.print System/out "\033[H\033[2J")
+ (.flush System/out)
+ (break))
+
+
+(defn put-transact! [id payload]
+ @(d/transact (:conn @dq/*config*) [(i/put @dq/*config* id payload)]))
+
+
+(defn advance! [tp]
+ (assert (some? ext/*now-ns-atom*) "Expected to be running in test-mode!")
+ (swap! ext/*now-ns-atom* + (if (number? tp)
+ tp
+ (.toNanos tp))))
+
+
+(defn done-count []
+ (d/q '[:find (count ?e) .
+ :where
+ [?e :com.github.ivarref.yoltq/id _]
+ [?e :com.github.ivarref.yoltq/status :done]]
+ (d/db (:conn @dq/*config*))))
+
+
+(defn get-init [& args]
+ (apply u/get-init @dq/*config* args))
+
+
+(defn get-error [& args]
+ (apply u/get-error @dq/*config* args))
+
+
+(defn get-hung [& args]
+ (apply u/get-hung @dq/*config* args))
+
+
+(defn take! [& args]
+ (apply i/take! @dq/*config* args))
+
+
+(defn execute! [& args]
+ (apply i/execute! @dq/*config* args))
+
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
new file mode 100644
index 0000000..41d2461
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -0,0 +1,232 @@
+(ns com.github.ivarref.yoltq.virtual-test
+ (:require [datomic-schema.core]
+ [clojure.test :refer :all]
+ [com.github.ivarref.yoltq.virtual-queue :as vq]
+ [com.github.ivarref.yoltq :as dq]
+ [com.github.ivarref.yoltq.test-utils :as u]
+ [datomic.api :as d]
+ [com.github.ivarref.yoltq.utils :as uu]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq :as yq]))
+
+
+(use-fixtures :each vq/call-with-virtual-queue!)
+
+
+(deftest happy-case-1
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q identity)
+ @(d/transact conn [(dq/put :q {:work 123})])
+ (is (= {:work 123} (:retval (vq/run-queue-once! :q :init))))))
+
+
+(deftest happy-case-tx-report-q
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q identity)
+ @(d/transact conn [(dq/put :q {:work 123})])
+ (is (= {:work 123} (:retval (vq/run-one-report-queue!))))
+ (is (= 1 (u/done-count)))))
+
+
+(deftest happy-case-poller
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :handlers {:q {:f (fn [payload] payload)}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (= {:work 123} (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :retval)))))
+
+
+(deftest happy-case-queue-fn-allow-cas-failure
+ (let [conn (u/empty-conn)
+ invoke-count (atom 0)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q
+ (fn [{:keys [id]}]
+ (swap! invoke-count inc)
+ @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]]))
+ {:allow-cas-failure? #{:e/val}})
+ @(d/transact conn #d/schema [[:e/id :one :string :id]
+ [:e/val :one :string]])
+ @(d/transact conn [{:e/id "demo"}
+ (dq/put :q {:id "demo"})])
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (is (nil? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!))))
+ (swap! dq/*config* dissoc :mark-status-fn!)
+
+ ; (mark-status! :done) failed but f succeeded.
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+ (is (some? (u/get-hung :q)))
+ (is (true? (some->> (u/get-hung :q)
+ (u/take!)
+ (u/execute!)
+ :allow-cas-failure?)))
+ (is (= 2 @invoke-count))))
+
+
+(deftest backoff-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (is (nil? (u/get-init :q)))
+
+ (u/advance! (dec (:init-backoff-time yq/default-opts)))
+ (is (nil? (u/get-init :q)))
+ (u/advance! 1)
+ (is (some? (u/get-init :q)))
+
+ (is (some? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :exception)))
+
+ (u/advance! (dec (:error-backoff-time @yq/*config*)))
+ (is (nil? (u/get-error :q)))
+ (u/advance! 1)
+ (is (some? (u/get-error :q)))))
+
+
+(deftest get-hung-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (is (= :processing (some->> (u/get-init :q)
+ (u/take!)
+ :com.github.ivarref.yoltq/status)))
+
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (dec (:hung-backoff-time yq/default-opts)))
+ (is (nil? (u/get-hung :q)))
+ (u/advance! 1)
+ (is (some? (u/get-hung :q)))))
+
+
+(deftest basic-locking
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :cas-failures (atom 0)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (let [job (u/get-init :q)]
+ (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status)))
+ (u/take! job)
+ (is (= 1 @(:cas-failures @dq/*config*))))))
+
+
+(deftest retry-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f
+ (let [c (atom 0)]
+ (fn [_]
+ (if (<= (swap! c inc) 2)
+ (throw (ex-info "janei" {}))
+ ::ok)))}}})
+ (u/put-transact! :q {:work 123})
+
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))))
+
+
+(deftest max-retries-test
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (dq/init! {:conn conn
+ :error-backoff-time 0})
+ (dq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 1})
+ (vq/put! :q {:work 123})
+ (is (some? (:exception (vq/run-one-report-queue!))))
+
+ (dotimes [_ 10]
+ (vq/run-queue-once! :q :error))
+ (is (= 2 @call-count))))
+
+
+(deftest max-retries-test-two
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (dq/init! {:conn conn
+ :error-backoff-time 0})
+ (dq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 3})
+ (vq/put! :q {:work 123})
+ (is (some? (:exception (vq/run-one-report-queue!))))
+
+ (dotimes [_ 20]
+ (vq/run-queue-once! :q :error))
+ (is (= 4 @call-count))))
+
+
+(deftest hung-to-error
+ (let [conn (u/empty-conn)
+ call-count (atom 0)
+ missed-mark-status (atom nil)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q
+ (fn [_]
+ (if (= 1 (swap! call-count inc))
+ (throw (ex-info "error" {}))
+ (log/info "return OK"))))
+ (vq/put! :q {:id "demo"})
+ (vq/run-one-report-queue!) ; now in status :error
+
+
+ (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (reset! missed-mark-status new-status)
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (vq/run-queue-once! :q :error)
+ (swap! dq/*config* dissoc :mark-status-fn!)
+ (is (= :done @missed-mark-status))
+
+ (is (nil? (uu/get-hung @dq/*config* :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+
+ (is (some? (uu/get-hung @dq/*config* :q)))
+
+ (is (= 2 @call-count))
+
+ (is (true? (some->> (uu/get-hung (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q)
+ (i/take! @dq/*config*)
+ (i/execute! @dq/*config*)
+ :failed?)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (uu/get-error @dq/*config* :q)))
+ (is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q)))))
+
+