From ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sat, 4 Sep 2021 13:23:07 +0200 Subject: Initial commit Add release script Release 0.1.3 Use com.github.ivarref.yoltq namespace Use com.github.ivarref.yoltq namespace --- .../com/github/ivarref/yoltq/error_poller_test.clj | 35 ++++ test/com/github/ivarref/yoltq/http_hang_demo.clj | 45 ++++ test/com/github/ivarref/yoltq/log_init.clj | 61 ++++++ test/com/github/ivarref/yoltq/readme_demo.clj | 48 +++++ test/com/github/ivarref/yoltq/test_utils.clj | 74 +++++++ test/com/github/ivarref/yoltq/virtual_test.clj | 232 +++++++++++++++++++++ 6 files changed, 495 insertions(+) create mode 100644 test/com/github/ivarref/yoltq/error_poller_test.clj create mode 100644 test/com/github/ivarref/yoltq/http_hang_demo.clj create mode 100644 test/com/github/ivarref/yoltq/log_init.clj create mode 100644 test/com/github/ivarref/yoltq/readme_demo.clj create mode 100644 test/com/github/ivarref/yoltq/test_utils.clj create mode 100644 test/com/github/ivarref/yoltq/virtual_test.clj (limited to 'test/com/github/ivarref/yoltq') diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj new file mode 100644 index 0000000..2e0873e --- /dev/null +++ b/test/com/github/ivarref/yoltq/error_poller_test.clj @@ -0,0 +1,35 @@ +(ns com.github.ivarref.yoltq.error-poller-test + (:require [clojure.test :refer :all] + [com.github.ivarref.yoltq.error-poller :as ep] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.log-init :as logconfig] + [clojure.edn :as edn])) + + +(deftest error-poller + (logconfig/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"*"} (edn/read-string + (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]]) + (let [cfg {:system-error-callback-backoff 100} + time (atom 0) + tick! (fn [& [amount]] + (swap! time + (or amount 1))) + verify (fn [state now-ns error-count expected-callback] + (let [{:keys [errors state run-callback] :as res} (ep/handle-error-count state cfg now-ns error-count)] + (log/info errors "=>" state "::" run-callback) + (is (= expected-callback run-callback)) + res))] + (-> {} + (verify (tick!) 0 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 :error) + (verify (tick! 100) 0 nil) + (verify (tick!) 0 :error) + (verify (tick!) 0 :recovery) + (verify (tick!) 1 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 :error) + (verify (tick! 100) 1 nil) + (verify (tick!) 1 :error)))) diff --git a/test/com/github/ivarref/yoltq/http_hang_demo.clj b/test/com/github/ivarref/yoltq/http_hang_demo.clj new file mode 100644 index 0000000..06d877b --- /dev/null +++ b/test/com/github/ivarref/yoltq/http_hang_demo.clj @@ -0,0 +1,45 @@ +(ns com.github.ivarref.yoltq.http-hang-demo + (:require [datomic.api :as d] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.log-init]) + (:import (java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers))) + +(comment + (do + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq*"} :debug] + [#{"*"} :info]]) + (yq/stop!) + (let [received (atom []) + uri (str "datomic:mem://hello-world-" (java.util.UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri)] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 5) + :poll-delay 5 + :system-error-poll-interval [1 TimeUnit/MINUTES] + :system-error-callback-backoff (Duration/ofHours 1) + :max-execute-time (Duration/ofSeconds 3) + :on-system-error (fn [] (log/error "system in error state")) + :on-system-recovery (fn [] (log/info "system recovered"))}) + (yq/add-consumer! :slow (fn [_payload] + (log/info "start slow handler...") + ; sudo tc qdisc del dev wlp3s0 root netem + ; sudo tc qdisc add dev wlp3s0 root netem delay 10000ms + ; https://jvns.ca/blog/2017/04/01/slow-down-your-internet-with-tc/ + (let [request (-> (HttpRequest/newBuilder) + (.uri (java.net.URI. "https://postman-echo.com/get")) + (.timeout (Duration/ofSeconds 10)) + (.GET) + (.build))] + (log/info "body is:" (-> (.send (HttpClient/newHttpClient) request (HttpResponse$BodyHandlers/ofString)) + (.body)))) + (log/info "slow handler is done!"))) + (yq/start!) + @(d/transact conn [(put :slow {:work 123})]) + #_(dotimes [x 1] @(d/transact conn [(put :q {:work x})])) + nil)))) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj new file mode 100644 index 0000000..cf69e55 --- /dev/null +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -0,0 +1,61 @@ +(ns com.github.ivarref.yoltq.log-init + (:require [clojure.term.colors :as colors] + [taoensso.timbre :as timbre] + [clojure.string :as str])) + +(def level-colors + {;:warn colors/red + :error colors/red}) + +(def ^:dynamic *override-color* nil) + +(defn min-length [n s] + (loop [s s] + (if (>= (count s) n) + s + (recur (str s " "))))) + +(defn local-console-format-fn + "A simpler log format, suitable for readable logs during development. colorized stacktraces" + [data] + (try + (let [{:keys [level ?err msg_ ?ns-str ?file hostname_ + timestamp_ ?line context]} data + ts (force timestamp_)] + (let [color-f (if (nil? *override-color*) + (get level-colors level str) + colors/green) + maybe-stacktrace (when ?err + (str "\n" (timbre/stacktrace ?err)))] + (cond-> (str #_(str ?ns-str ":" ?line) + #_(min-length (count "[yoltq:326] ") + (str + "[" + (some-> ?ns-str + (str/split #"\.") + (last)) + ":" ?line)) + ts + " " + (color-f (min-length 5 (str/upper-case (name level)))) + " " + #_(.getName ^Thread (Thread/currentThread)) + + (color-f (force msg_)) + + #_maybe-stacktrace)))) + + + (catch Throwable t + (println "error in local-console-format-fn:" (ex-message t)) + nil))) + + +(defn init-logging! [min-levels] + (timbre/merge-config! + {:min-level min-levels + :timestamp-opts {:pattern "HH:mm:ss.SSS" + :timezone :jvm-default} + :output-fn (fn [data] (local-console-format-fn data)) + :appenders {:println (timbre/println-appender {:stream :std-out})}})) + diff --git a/test/com/github/ivarref/yoltq/readme_demo.clj b/test/com/github/ivarref/yoltq/readme_demo.clj new file mode 100644 index 0000000..eae0a3e --- /dev/null +++ b/test/com/github/ivarref/yoltq/readme_demo.clj @@ -0,0 +1,48 @@ +(ns com.github.ivarref.yoltq.readme-demo + (:require [clojure.tools.logging :as log] + [datomic.api :as d] + [com.github.ivarref.yoltq :as yq]) + (:import (java.util UUID))) + + +(defonce conn + (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (d/connect uri))) + + +(com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq*"} :debug] + [#{"*"} :info]]) + + +(yq/stop!) + +(yq/init! {:conn conn}) + + +(yq/add-consumer! :q + (let [cnt (atom 0)] + (fn [payload] + (when (= 1 (swap! cnt inc)) + (throw (ex-info "failed" {}))) + (log/info "got payload" payload)))) + +(yq/start!) + +@(d/transact conn [(yq/put :q {:work 123})]) + +(comment + (yq/add-consumer! :q (fn [_] (throw (ex-info "always fail" {}))))) + +(comment + @(d/transact conn [(yq/put :q {:work 123})])) + +(comment + (do + (yq/add-consumer! :q (fn [_] :ok)) + nil)) diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj new file mode 100644 index 0000000..dacba68 --- /dev/null +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -0,0 +1,74 @@ +(ns com.github.ivarref.yoltq.test-utils + (:require [com.github.ivarref.yoltq.log-init :as logconfig] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq :as dq] + [datomic.api :as d] + [clojure.string :as str] + [com.github.ivarref.yoltq.impl :as i] + [clojure.edn :as edn] + [com.github.ivarref.yoltq.ext-sys :as ext]) + (:import (java.util UUID))) + + +(logconfig/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"*"} (edn/read-string + (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]]) + + +(defn empty-conn [] + (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (d/connect uri))) + + +(defn break [] + (log/info (str/join "*" (repeat 60 "")))) + + +(defn clear [] + (.print System/out "\033[H\033[2J") + (.flush System/out) + (break)) + + +(defn put-transact! [id payload] + @(d/transact (:conn @dq/*config*) [(i/put @dq/*config* id payload)])) + + +(defn advance! [tp] + (assert (some? ext/*now-ns-atom*) "Expected to be running in test-mode!") + (swap! ext/*now-ns-atom* + (if (number? tp) + tp + (.toNanos tp)))) + + +(defn done-count [] + (d/q '[:find (count ?e) . + :where + [?e :com.github.ivarref.yoltq/id _] + [?e :com.github.ivarref.yoltq/status :done]] + (d/db (:conn @dq/*config*)))) + + +(defn get-init [& args] + (apply u/get-init @dq/*config* args)) + + +(defn get-error [& args] + (apply u/get-error @dq/*config* args)) + + +(defn get-hung [& args] + (apply u/get-hung @dq/*config* args)) + + +(defn take! [& args] + (apply i/take! @dq/*config* args)) + + +(defn execute! [& args] + (apply i/execute! @dq/*config* args)) + diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj new file mode 100644 index 0000000..41d2461 --- /dev/null +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -0,0 +1,232 @@ +(ns com.github.ivarref.yoltq.virtual-test + (:require [datomic-schema.core] + [clojure.test :refer :all] + [com.github.ivarref.yoltq.virtual-queue :as vq] + [com.github.ivarref.yoltq :as dq] + [com.github.ivarref.yoltq.test-utils :as u] + [datomic.api :as d] + [com.github.ivarref.yoltq.utils :as uu] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq :as yq])) + + +(use-fixtures :each vq/call-with-virtual-queue!) + + +(deftest happy-case-1 + (let [conn (u/empty-conn)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q identity) + @(d/transact conn [(dq/put :q {:work 123})]) + (is (= {:work 123} (:retval (vq/run-queue-once! :q :init)))))) + + +(deftest happy-case-tx-report-q + (let [conn (u/empty-conn)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q identity) + @(d/transact conn [(dq/put :q {:work 123})]) + (is (= {:work 123} (:retval (vq/run-one-report-queue!)))) + (is (= 1 (u/done-count))))) + + +(deftest happy-case-poller + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :handlers {:q {:f (fn [payload] payload)}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (= {:work 123} (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :retval))))) + + +(deftest happy-case-queue-fn-allow-cas-failure + (let [conn (u/empty-conn) + invoke-count (atom 0)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q + (fn [{:keys [id]}] + (swap! invoke-count inc) + @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]])) + {:allow-cas-failure? #{:e/val}}) + @(d/transact conn #d/schema [[:e/id :one :string :id] + [:e/val :one :string]]) + @(d/transact conn [{:e/id "demo"} + (dq/put :q {:id "demo"})]) + (u/advance! (:init-backoff-time yq/default-opts)) + (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (log/info "mark-status! doing nothing for new status" new-status))) + (is (nil? (some->> (u/get-init :q) + (u/take!) + (u/execute!)))) + (swap! dq/*config* dissoc :mark-status-fn!) + + ; (mark-status! :done) failed but f succeeded. + (is (nil? (u/get-hung :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + (is (some? (u/get-hung :q))) + (is (true? (some->> (u/get-hung :q) + (u/take!) + (u/execute!) + :allow-cas-failure?))) + (is (= 2 @invoke-count)))) + + +(deftest backoff-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (is (nil? (u/get-init :q))) + + (u/advance! (dec (:init-backoff-time yq/default-opts))) + (is (nil? (u/get-init :q))) + (u/advance! 1) + (is (some? (u/get-init :q))) + + (is (some? (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :exception))) + + (u/advance! (dec (:error-backoff-time @yq/*config*))) + (is (nil? (u/get-error :q))) + (u/advance! 1) + (is (some? (u/get-error :q))))) + + +(deftest get-hung-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (is (= :processing (some->> (u/get-init :q) + (u/take!) + :com.github.ivarref.yoltq/status))) + + (is (nil? (u/get-hung :q))) + (u/advance! (dec (:hung-backoff-time yq/default-opts))) + (is (nil? (u/get-hung :q))) + (u/advance! 1) + (is (some? (u/get-hung :q))))) + + +(deftest basic-locking + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :cas-failures (atom 0) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (let [job (u/get-init :q)] + (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status))) + (u/take! job) + (is (= 1 @(:cas-failures @dq/*config*)))))) + + +(deftest retry-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f + (let [c (atom 0)] + (fn [_] + (if (<= (swap! c inc) 2) + (throw (ex-info "janei" {})) + ::ok)))}}}) + (u/put-transact! :q {:work 123}) + + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))))) + + +(deftest max-retries-test + (let [conn (u/empty-conn) + call-count (atom 0)] + (dq/init! {:conn conn + :error-backoff-time 0}) + (dq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 1}) + (vq/put! :q {:work 123}) + (is (some? (:exception (vq/run-one-report-queue!)))) + + (dotimes [_ 10] + (vq/run-queue-once! :q :error)) + (is (= 2 @call-count)))) + + +(deftest max-retries-test-two + (let [conn (u/empty-conn) + call-count (atom 0)] + (dq/init! {:conn conn + :error-backoff-time 0}) + (dq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 3}) + (vq/put! :q {:work 123}) + (is (some? (:exception (vq/run-one-report-queue!)))) + + (dotimes [_ 20] + (vq/run-queue-once! :q :error)) + (is (= 4 @call-count)))) + + +(deftest hung-to-error + (let [conn (u/empty-conn) + call-count (atom 0) + missed-mark-status (atom nil)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q + (fn [_] + (if (= 1 (swap! call-count inc)) + (throw (ex-info "error" {})) + (log/info "return OK")))) + (vq/put! :q {:id "demo"}) + (vq/run-one-report-queue!) ; now in status :error + + + (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (reset! missed-mark-status new-status) + (log/info "mark-status! doing nothing for new status" new-status))) + (u/advance! (:error-backoff-time @yq/*config*)) + (vq/run-queue-once! :q :error) + (swap! dq/*config* dissoc :mark-status-fn!) + (is (= :done @missed-mark-status)) + + (is (nil? (uu/get-hung @dq/*config* :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + + (is (some? (uu/get-hung @dq/*config* :q))) + + (is (= 2 @call-count)) + + (is (true? (some->> (uu/get-hung (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q) + (i/take! @dq/*config*) + (i/execute! @dq/*config*) + :failed?))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (uu/get-error @dq/*config* :q))) + (is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q))))) + + -- cgit v1.2.3 From ad8a41bd7d9e6fed77f633a75ef36410b7afbef1 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 17 Sep 2021 14:25:08 +0200 Subject: Start add bindings ... --- src/com/github/ivarref/yoltq.clj | 10 +++-- src/com/github/ivarref/yoltq/poller.clj | 12 ++++-- src/com/github/ivarref/yoltq/report_queue.clj | 11 ++--- src/com/github/ivarref/yoltq/utils.clj | 58 ++++++++++++++++---------- src/com/github/ivarref/yoltq/virtual_queue.clj | 23 +++++----- test/com/github/ivarref/yoltq/virtual_test.clj | 32 +++++++++++++- 6 files changed, 98 insertions(+), 48 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index d3eefef..6341e41 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -58,6 +58,8 @@ ; contain the stacktrace of the stuck threads. :pool-size 4 + :capture-bindings [] + ; How often should the system be polled for failed queue jobs :system-error-poll-delay (Duration/ofMinutes 1) @@ -159,17 +161,17 @@ (d/create-database uri) (let [ok-items (atom []) conn (d/connect uri) - n 100] + n 1] (init! {:conn conn :error-backoff-time (Duration/ofSeconds 1) :poll-delay (Duration/ofSeconds 1)}) (add-consumer! :q (fn [payload] - (when (> (Math/random) 0.5) - (throw (ex-info "oops" {}))) + #_(when (> (Math/random) 0.5) + (throw (ex-info "oops" {}))) (if (= n (count (swap! received conj (:work payload)))) (log/info "... and we are done!") (log/info "got payload" payload "total ok:" (count @received))))) (start!) (dotimes [x n] - @(d/transact conn [(put :q {:work x})])) + @(d/transact conn [(put :q {:work 123})])) nil)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj index ad9d32a..1f4e65d 100644 --- a/src/com/github/ivarref/yoltq/poller.clj +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -5,10 +5,14 @@ (defn poll-once! [cfg q status] - (case status - :init (some->> (u/get-init cfg q) (i/take! cfg) (i/execute! cfg)) - :error (some->> (u/get-error cfg q) (i/take! cfg) (i/execute! cfg)) - :hung (some->> (u/get-hung cfg q) (i/take! cfg) (i/execute! cfg)))) + (when-let [item (case status + :init (u/get-init cfg q) + :error (u/get-error cfg q) + :hung (u/get-hung cfg q))] + (with-bindings (get item :bindings {}) + (some->> item + (i/take! cfg) + (i/execute! cfg))))) (defn poll-queue! [running? diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index a40d29a..c6559bf 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -19,11 +19,12 @@ (doseq [id new-ids] (consumer (fn [] (try - (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name]} (u/get-queue-item db-after id)] - (some->> - (u/prepare-processing id queue-name lock status) - (i/take! cfg) - (i/execute! cfg))) + (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name bindings]} (u/get-queue-item db-after id)] + (with-bindings (or bindings {}) + (some->> + (u/prepare-processing db-after id queue-name lock status) + (i/take! cfg) + (i/execute! cfg)))) (catch Throwable t (log/error t "unexpected error in process-poll-result!"))))))))) diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index c96d1dc..9501343 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -56,14 +56,22 @@ (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) (dissoc :db/id) (update :com.github.ivarref.yoltq/payload edn/read-string) - (update :com.github.ivarref.yoltq/bindings edn/read-string))) + (update :com.github.ivarref.yoltq/bindings + (fn [s] + (when s + (->> s + (edn/read-string) + (reduce-kv (fn [o k v] + (assoc o (resolve k) v)) + {}))))))) -(defn prepare-processing [id queue-name old-lock old-status] +(defn prepare-processing [db id queue-name old-lock old-status] (let [new-lock (random-uuid)] {:id id :lock new-lock :queue-name queue-name + :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]})) @@ -73,29 +81,31 @@ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) - (if-let [ids (->> (d/q '[:find ?id ?lock - :in $ ?queue-name ?backoff - :where - [?e :com.github.ivarref.yoltq/status :init] - [?e :com.github.ivarref.yoltq/queue-name ?queue-name] - [?e :com.github.ivarref.yoltq/init-time ?init-time] - [(>= ?backoff ?init-time)] - [?e :com.github.ivarref.yoltq/id ?id] - [?e :com.github.ivarref.yoltq/lock ?lock]] - (or db (d/db conn)) - queue-name - (- (now-ns) init-backoff-time)) - (not-empty))] - (let [[id old-lock] (rand-nth (into [] ids))] - (prepare-processing id queue-name old-lock :init)) - (log/trace "no new-items in :init status for queue" queue-name))) + (let [db (or db (d/db conn))] + (if-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff + :where + [?e :com.github.ivarref.yoltq/status :init] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(>= ?backoff ?init-time)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock]] + db + queue-name + (- (now-ns) init-backoff-time)) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing db id queue-name old-lock :init)) + (log/trace "no new-items in :init status for queue" queue-name)))) (defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name] (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) - (let [max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] + (let [db (or db (d/db conn)) + max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] (when-let [ids (->> (d/q '[:find ?id ?lock :in $ ?queue-name ?backoff ?max-tries :where @@ -107,13 +117,13 @@ [(> ?max-tries ?tries)] [?e :com.github.ivarref.yoltq/id ?id] [?e :com.github.ivarref.yoltq/lock ?lock]] - (or db (d/db conn)) + db queue-name (- (now-ns) error-backoff-time) (inc max-retries)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] - (prepare-processing id queue-name old-lock :error))))) + (prepare-processing db id queue-name old-lock :error))))) (defn get-hung [{:keys [conn db now hung-backoff-time max-retries] :as cfg} queue-name] @@ -121,7 +131,8 @@ (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) (let [now (or now (now-ns)) - max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] + max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries) + db (or db (d/db conn))] (when-let [ids (->> (d/q '[:find ?id ?lock ?tries :in $ ?qname ?backoff :where @@ -132,7 +143,7 @@ [?e :com.github.ivarref.yoltq/tries ?tries] [?e :com.github.ivarref.yoltq/id ?id] [?e :com.github.ivarref.yoltq/lock ?lock]] - (or db (d/db conn)) + db queue-name (- now hung-backoff-time)) (not-empty))] @@ -144,6 +155,7 @@ :queue-name queue-name :was-hung? true :to-error? to-error? + :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) :tx (if (not to-error?) [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj index f133bde..71c7b6d 100644 --- a/src/com/github/ivarref/yoltq/virtual_queue.clj +++ b/src/com/github/ivarref/yoltq/virtual_queue.clj @@ -146,17 +146,18 @@ (defmacro consume-expect! [queue-name expected-status] `(if-let [job# (get-tx-q-job ~queue-name)] (try - (let [res# (some->> - (u/prepare-processing (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! @yq/*config*))] - (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) - (if (:retval res#) - (:retval res#) - (:exception res#))) + (with-bindings (:com.github.ivarref.yoltq/bindings job#) + (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#)) + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#)))) (catch Throwable t# (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) (test/is nil (str "No job found for queue " ~queue-name)))) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 41d2461..575dc1b 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -8,7 +8,9 @@ [com.github.ivarref.yoltq.utils :as uu] [clojure.tools.logging :as log] [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq :as yq])) + [com.github.ivarref.yoltq :as yq] + [clojure.pprint :as pprint] + [clojure.edn :as edn])) (use-fixtures :each vq/call-with-virtual-queue!) @@ -230,3 +232,31 @@ (is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q))))) +(deftest consume-expect-test + (let [conn (u/empty-conn) + seen (atom #{})] + (dq/init! {:conn conn}) + (dq/add-consumer! :q (fn [payload] + (when (= #{1 2} (swap! seen conj payload)) + (throw (ex-info "oops" {}))) + payload)) + + @(d/transact conn [(dq/put :q 1)]) + @(d/transact conn [(dq/put :q 2)]) + + (is (= 1 (vq/consume-expect! :q :done))) + (vq/consume-expect! :q :error))) + + +(def ^:dynamic *some-binding* nil) + + +(deftest binding-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :bindings [#'*some-binding*]}) + (dq/add-consumer! :q (fn [_] *some-binding*)) + (binding [*some-binding* 1] @(d/transact conn [(dq/put :q nil)])) + #_(binding [*some-binding* 2] @(d/transact conn [(dq/put :q nil)])) + #_@(d/transact conn [(dq/put :q nil)]) + (is (= 1 (vq/consume-expect! :q :done))))) -- cgit v1.2.3 From 7c3f605d9420ce8ff1d8e8b226a63d7ee1dacf1f Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 17 Sep 2021 14:28:15 +0200 Subject: Capture-bindings works --- test/com/github/ivarref/yoltq/virtual_test.clj | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 575dc1b..e2ea19b 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -254,9 +254,11 @@ (deftest binding-test (let [conn (u/empty-conn)] (dq/init! {:conn conn - :bindings [#'*some-binding*]}) + :capture-bindings [#'*some-binding*]}) (dq/add-consumer! :q (fn [_] *some-binding*)) (binding [*some-binding* 1] @(d/transact conn [(dq/put :q nil)])) - #_(binding [*some-binding* 2] @(d/transact conn [(dq/put :q nil)])) - #_@(d/transact conn [(dq/put :q nil)]) - (is (= 1 (vq/consume-expect! :q :done))))) + (binding [*some-binding* 2] @(d/transact conn [(dq/put :q nil)])) + @(d/transact conn [(dq/put :q nil)]) + (is (= 1 (vq/consume-expect! :q :done))) + (is (= 2 (vq/consume-expect! :q :done))) + (is (nil? (vq/consume-expect! :q :done))))) -- cgit v1.2.3 From d13b0cb0b72a9cef9f8e9bd82616899796a4853f Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 17 Sep 2021 14:51:01 +0200 Subject: Use [#'taoensso.timbre/*context*] as default :capture-bindings if present --- README.md | 10 +++++++--- deps.edn | 3 ++- src/com/github/ivarref/yoltq.clj | 7 ++++--- test/com/github/ivarref/yoltq/log_init.clj | 3 +++ test/com/github/ivarref/yoltq/virtual_test.clj | 24 ++++++++++++++++++------ 5 files changed, 34 insertions(+), 13 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/README.md b/README.md index 514c4a3..902be2f 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,7 @@ Inspecting `(yq/put :q {:work 123})]` you will see something like this: :queue-name :q, ; Destination queue :status :init, ; Status :payload "{:work 123}", ; Payload persisted to the database with pr-str - :bindings "{}", + :bindings "{}", ; Bindings that will be applied before executing consumer function :lock #uuid"037d7da1-5158-4243-8f72-feb1e47e15ca", ; Lock to protect from multiple consumers :tries 0, ; How many times the job has been executed :init-time 4305758012289 ; Time of initialization (System/nanoTime) @@ -153,7 +153,7 @@ The `payload` will be deserialized from the database using `clojure.edn/read-str you will get back what you put into `yq/put`. The yoltq system treats a queue consumer function invocation as successful if it does not throw an exception. -Thus any regular return value, be it `nil`, `false`, `true`, etc. is considered a success. +Any return value, be it `nil`, `false`, `true`, etc. is considered a success. ### Listening for queue jobs @@ -166,7 +166,7 @@ and process newly created queue jobs fairly quickly. This also means that queue jobs in status `:init` will almost always be processed without any type of backoff*. -This pool also schedules polling jobs that will regularly check for various statuses: +The threadpool also schedules polling jobs that will check for various statuses regularly: * Jobs in status `:error` that have waited for at least `:error-backoff-time` (default: 5 seconds) will be retried. * Jobs that have been in `:processing` for at least `:hung-backoff-time` (default: 30 minutes) will be considered hung and retried. @@ -212,6 +212,10 @@ A queue job will remain in status `:error` once `:max-retries` (default: 100) ha Ideally this will not happen. +### Logging + + + ### Total health and system sanity diff --git a/deps.edn b/deps.edn index cf8297c..a457628 100644 --- a/deps.edn +++ b/deps.edn @@ -3,7 +3,8 @@ :paths ["src"] - :aliases {:test {:extra-paths ["test"] + :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} + :test {:extra-paths ["test"] :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} com.taoensso/timbre {:mvn/version "5.1.2"} com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 6341e41..58efca1 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -1,6 +1,5 @@ (ns com.github.ivarref.yoltq - (:require [datomic-schema.core] - [datomic.api :as d] + (:require [datomic.api :as d] [clojure.tools.logging :as log] [com.github.ivarref.yoltq.impl :as i] [com.github.ivarref.yoltq.report-queue :as rq] @@ -58,7 +57,9 @@ ; contain the stacktrace of the stuck threads. :pool-size 4 - :capture-bindings [] + :capture-bindings (if-let [s (resolve (symbol "taoensso.timbre/*context*"))] + [s] + []) ; How often should the system be polled for failed queue jobs :system-error-poll-delay (Duration/ofMinutes 1) diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj index cf69e55..1aa6c02 100644 --- a/test/com/github/ivarref/yoltq/log_init.clj +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -39,6 +39,9 @@ " " (color-f (min-length 5 (str/upper-case (name level)))) " " + + (when-let [x-req-id (:x-request-id context)] + (str "[" x-req-id "] ")) #_(.getName ^Thread (Thread/currentThread)) (color-f (force msg_)) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index e2ea19b..3c7c5b4 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -9,8 +9,7 @@ [clojure.tools.logging :as log] [com.github.ivarref.yoltq.impl :as i] [com.github.ivarref.yoltq :as yq] - [clojure.pprint :as pprint] - [clojure.edn :as edn])) + [taoensso.timbre :as timbre])) (use-fixtures :each vq/call-with-virtual-queue!) @@ -254,11 +253,24 @@ (deftest binding-test (let [conn (u/empty-conn)] (dq/init! {:conn conn - :capture-bindings [#'*some-binding*]}) + :capture-bindings [#'*some-binding* #'timbre/*context*]}) (dq/add-consumer! :q (fn [_] *some-binding*)) - (binding [*some-binding* 1] @(d/transact conn [(dq/put :q nil)])) - (binding [*some-binding* 2] @(d/transact conn [(dq/put :q nil)])) - @(d/transact conn [(dq/put :q nil)]) + (binding [timbre/*context* {:x-request-id "wooho"}] + (binding [*some-binding* 1] + @(d/transact conn [(dq/put :q nil)])) + (binding [*some-binding* 2] + @(d/transact conn [(dq/put :q nil)])) + @(d/transact conn [(dq/put :q nil)])) + (is (= 1 (vq/consume-expect! :q :done))) (is (= 2 (vq/consume-expect! :q :done))) (is (nil? (vq/consume-expect! :q :done))))) + + +(deftest default-binding-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*))) + (binding [timbre/*context* {:x-request-id "123"}] + @(d/transact conn [(dq/put :q nil)])) + (is (= "123" (vq/consume-expect! :q :done))))) -- cgit v1.2.3 From 2a236e6d90410821370761434fad45b13621fbdf Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 17 Sep 2021 22:16:14 +0200 Subject: Add consume-twice! test function for verifying idempotence --- README.md | 76 ++++++++++++++++++++++++-- src/com/github/ivarref/yoltq/virtual_queue.clj | 37 ++++++++++++- test/com/github/ivarref/yoltq/virtual_test.clj | 11 +++- 3 files changed, 117 insertions(+), 7 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/README.md b/README.md index 902be2f..29b6a62 100644 --- a/README.md +++ b/README.md @@ -164,13 +164,13 @@ One thread is permanently allocated for listening to the and responding to changes. This means that yoltq will respond and process newly created queue jobs fairly quickly. This also means that queue jobs in status `:init` will almost always be processed without -any type of backoff*. +any type of backoff. The threadpool also schedules polling jobs that will check for various statuses regularly: * Jobs in status `:error` that have waited for at least `:error-backoff-time` (default: 5 seconds) will be retried. * Jobs that have been in `:processing` for at least `:hung-backoff-time` (default: 30 minutes) will be considered hung and retried. -* Old `:init-backoff-time` (default: 1 minute) `:init` jobs that have not been processed. *Queue jobs can be left in status `:init` during application restart/upgrade, and thus the need for this strategy. +* Old `:init-backoff-time` (default: 1 minute) `:init` jobs that have not been processed. Queue jobs can be left in status `:init` during application restart/upgrade, and thus the need for this strategy. ### Retry and backoff strategy @@ -212,15 +212,66 @@ A queue job will remain in status `:error` once `:max-retries` (default: 100) ha Ideally this will not happen. -### Logging +# Regular and REPL usage +For a regular system and/or REPL session you'll want to do: +```clojure +(require '[com.github.ivarref.yoltq :as yq]) -### Total health and system sanity +(yq/init! {:conn conn}) + +(yq/add-consumer! :q-one ...) +(yq/add-consumer! :q-two ...) + +; Start yoltq system +(yq/start!) + +; Oops I need another consumer. This works fine: +(yq/add-consumer! :q-three ...) +``` + +You may invoke `yq/add-consumer!` and `yq/init!` on a live system as you like. +If you change `:pool-size` or `:poll-delay` you will have to `(yq/stop!)` and +`(yq/start!)` to make changes take effect. +# Testing +For testing you will probably want determinism over an extra threadpool +by enabling the virtual queue: + +```clojure +... +(:require [clojure.test :refer :all] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.virtual-queue :as vq]) + +; Enables the virtual queue and disables the threadpool for each test. +; yq/start! and yq/stop! becomes a no-op. +(use-fixtures :each vq/call-with-virtual-queue!) + +(deftest demo + (let [conn ...] + (dq/init! {:conn conn}) ; Setup + (dq/add-consumer! :q identity) + + @(d/transact conn [(yq/put :q {:work 123})]) ; Add work + + ; vq/consume! consumes one job and asserts that it succeeds. + ; It returns the return value of the consumer function + (is (= {:work 123} (vq/consume! :q))))) +``` + + +## Other features and notes + + +### Logging + + + +### Total health and system sanity -## Misc ### Ordering @@ -228,3 +279,18 @@ There is no attempt at ordering the execution of queue jobs. In fact the opposite is done to guard against the case that a single failing queue job could effectively take down the entire retry polling job. + +## License + +Copyright © 2021 Ivar Refsdal + +This program and the accompanying materials are made available under the +terms of the Eclipse Public License 2.0 which is available at +http://www.eclipse.org/legal/epl-2.0. + +This Source Code may also be made available under the following Secondary +Licenses when the conditions for such availability set forth in the Eclipse +Public License, v. 2.0 are satisfied: GNU General Public License as published by +the Free Software Foundation, either version 2 of the License, or (at your +option) any later version, with the GNU Classpath Exception which is available +at https://www.gnu.org/software/classpath/license.html. \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj index 71c7b6d..db429a8 100644 --- a/src/com/github/ivarref/yoltq/virtual_queue.clj +++ b/src/com/github/ivarref/yoltq/virtual_queue.clj @@ -160,4 +160,39 @@ (:exception res#)))) (catch Throwable t# (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) - (test/is nil (str "No job found for queue " ~queue-name)))) \ No newline at end of file + (test/is false (str "No job found for queue " ~queue-name)))) +tx-spent-time! + +(defmacro consume! [queue-name] + `(consume-expect! ~queue-name :done)) + + +(defn mark-fails! [{:keys [conn]} + {:com.github.ivarref.yoltq/keys [id lock tries]} + _] + (try + (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]] + @(d/transact conn tx) + nil) + (catch Throwable t + (log/error t "unexpected error in mark-status!: " (ex-message t)) + nil))) + + +(defmacro consume-twice! [queue-name] + `(if-let [job# (get-tx-q-job ~queue-name)] + (try + (with-bindings (:com.github.ivarref.yoltq/bindings job#) + (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#)) + (i/take! @yq/*config*) + (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!))) + (consume! ~queue-name)) + (catch Throwable t# + (log/error t# "unexpected error in consume-twice!:" (ex-message t#)))) + (test/is false (str "No job found for queue " ~queue-name)))) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 3c7c5b4..5e5fc92 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -20,7 +20,7 @@ (dq/init! {:conn conn}) (dq/add-consumer! :q identity) @(d/transact conn [(dq/put :q {:work 123})]) - (is (= {:work 123} (:retval (vq/run-queue-once! :q :init)))))) + (is (= {:work 123} (vq/consume! :q))))) (deftest happy-case-tx-report-q @@ -274,3 +274,12 @@ (binding [timbre/*context* {:x-request-id "123"}] @(d/transact conn [(dq/put :q nil)])) (is (= "123" (vq/consume-expect! :q :done))))) + + +(deftest consume-twice + (let [conn (u/empty-conn) + cnt (atom 0)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (swap! cnt inc))) + @(d/transact conn [(dq/put :q nil)]) + (is (= 2 (vq/consume-twice! :q))))) -- cgit v1.2.3 From 60f7371f4d2dd43c5b177039406eeaab00ba27cc Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 23 Sep 2021 11:06:33 +0200 Subject: To test-queue namespace --- README.md | 24 +-- src/com/github/ivarref/yoltq/test_queue.clj | 197 ++++++++++++++++++++++++ src/com/github/ivarref/yoltq/virtual_queue.clj | 198 ------------------------- test/com/github/ivarref/yoltq/test_utils.clj | 16 +- test/com/github/ivarref/yoltq/virtual_test.clj | 140 ++++++++--------- 5 files changed, 287 insertions(+), 288 deletions(-) create mode 100644 src/com/github/ivarref/yoltq/test_queue.clj delete mode 100644 src/com/github/ivarref/yoltq/virtual_queue.clj (limited to 'test/com/github/ivarref/yoltq') diff --git a/README.md b/README.md index 29b6a62..84dc91b 100644 --- a/README.md +++ b/README.md @@ -243,23 +243,23 @@ by enabling the virtual queue: ```clojure ... (:require [clojure.test :refer :all] - [com.github.ivarref.yoltq :as yq] - [com.github.ivarref.yoltq.virtual-queue :as vq]) + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.test-queue :as tq]) ; Enables the virtual queue and disables the threadpool for each test. ; yq/start! and yq/stop! becomes a no-op. -(use-fixtures :each vq/call-with-virtual-queue!) +(use-fixtures :each tq/call-with-virtual-queue!) (deftest demo - (let [conn ...] - (dq/init! {:conn conn}) ; Setup - (dq/add-consumer! :q identity) - - @(d/transact conn [(yq/put :q {:work 123})]) ; Add work - - ; vq/consume! consumes one job and asserts that it succeeds. - ; It returns the return value of the consumer function - (is (= {:work 123} (vq/consume! :q))))) + (let [conn ...] + (yq/init! {:conn conn}) ; Setup + (yq/add-consumer! :q identity) + + @(d/transact conn [(yq/put :q {:work 123})]) ; Add work + + ; tq/consume! consumes one job and asserts that it succeeds. + ; It returns the return value of the consumer function + (is (= {:work 123} (tq/consume! :q))))) ``` diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj new file mode 100644 index 0000000..6aeb959 --- /dev/null +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -0,0 +1,197 @@ +(ns com.github.ivarref.yoltq.test-queue + (:require [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq :as yq] + [datomic.api :as d] + [com.github.ivarref.yoltq.poller :as poller] + [clojure.test :as test] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i]) + (:import (java.util.concurrent BlockingQueue TimeUnit) + (datomic Datom))) + + +(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn] + (let [ready? (promise)] + (future + (let [q (d/tx-report-queue conn)] + (try + (while @running? + (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)] + (swap! txq conj poll-result)) + (deliver ready? true) + (reset! bootstrapped? true)) + (catch Throwable t + (log/error t "test-poller crashed: " (ex-message t))) + (finally + (try + (d/remove-tx-report-queue conn) + (catch Throwable t + (log/warn t "remove-tx-report-queue failed:" (ex-message t)))) + (deliver poller-exited? true))))) + @ready?)) + + +(defmacro with-virtual-queue! + [& body] + `(let [txq# (atom []) + poller-exited?# (promise) + bootstrapped?# (atom false) + running?# (atom true) + config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) + :init-backoff-time 0 + :hung-log-level :warn + :tx-queue txq#})] + (with-bindings {#'yq/*config* config# + #'yq/*running?* (atom false) + #'yq/*test-mode* true + #'ext/*now-ns-atom* (atom 0) + #'ext/*random-atom* (atom 0) + #'ext/*squuid-atom* (atom 0)} + (try + ~@body + (finally + (reset! running?# false) + (when @bootstrapped?# + @poller-exited?#)))))) + + +(defn call-with-virtual-queue! + [f] + (with-virtual-queue! + (f))) + + +(defn run-report-queue! [min-items] + (let [{:keys [tx-queue conn]} @yq/*config* + id-ident (d/q '[:find ?e . + :where [?e :db/ident :com.github.ivarref.yoltq/id]] + (d/db conn))] + (let [timeout (+ 3000 (System/currentTimeMillis))] + (while (and (< (System/currentTimeMillis) timeout) + (< (count @tx-queue) min-items)) + (Thread/sleep 10))) + (when (< (count @tx-queue) min-items) + (let [msg (str "run-report-queue: timeout waiting for " min-items " items")] + (log/error msg) + (throw (ex-info msg {})))) + (let [res (atom [])] + (doseq [itm (first (swap-vals! tx-queue (constantly [])))] + (rq/process-poll-result! + @yq/*config* + id-ident + itm + (fn [f] (swap! res conj (f))))) + @res))) + + +(defn run-one-report-queue! [] + (first (run-report-queue! 1))) + + +(defn run-queue-once! [q status] + (poller/poll-once! @yq/*config* q status)) + + +(defn put! [q payload] + @(d/transact (:conn @yq/*config*) [(yq/put q payload)])) + + +(defn transact-result->maps [{:keys [tx-data db-after]}] + (let [m (->> tx-data + (group-by (fn [^Datom d] (.e d))) + (vals) + (mapv (fn [datoms] + (reduce (fn [o ^Datom d] + (if (.added d) + (assoc o (d/q '[:find ?r . + :in $ ?e + :where [?e :db/ident ?r]] + db-after + (.a d)) + (.v d)) + o)) + {} + datoms))))] + m)) + +(defn contains-queue-job? + [queue-id conn {::yq/keys [id queue-name status] :as m}] + (when (and (= queue-id queue-name) + (= status :init) + (d/q '[:find ?e . + :in $ ?id + :where + [?e ::yq/id ?id] + [?e ::yq/status :init]] + (d/db conn) + id)) + m)) + + +(defn get-tx-q-job [q-id] + (let [{:keys [tx-queue conn]} @yq/*config*] + (loop [timeout (+ 3000 (System/currentTimeMillis))] + (if-let [job (->> @tx-queue + (mapcat transact-result->maps) + (filter (partial contains-queue-job? q-id conn)) + (first))] + (u/get-queue-item (d/db conn) (::yq/id job)) + (if (< (System/currentTimeMillis) timeout) + (do (Thread/sleep 10) + (recur timeout)) + nil))))) + +(defmacro consume-expect! [queue-name expected-status] + `(if-let [job# (get-tx-q-job ~queue-name)] + (try + (with-bindings (:com.github.ivarref.yoltq/bindings job#) + (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#)) + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#)))) + (catch Throwable t# + (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) + (test/is false (str "No job found for queue " ~queue-name)))) + +(defmacro consume! [queue-name] + `(consume-expect! ~queue-name :done)) + + +(defn mark-fails! [{:keys [conn]} + {:com.github.ivarref.yoltq/keys [id lock tries]} + _] + (try + (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]] + @(d/transact conn tx) + nil) + (catch Throwable t + (log/error t "unexpected error in mark-status!: " (ex-message t)) + nil))) + + +(defmacro force-retry! [queue-name] + `(if-let [job# (get-tx-q-job ~queue-name)] + (try + (with-bindings (:com.github.ivarref.yoltq/bindings job#) + (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#)) + (i/take! @yq/*config*) + (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!))) + (consume! ~queue-name)) + (catch Throwable t# + (log/error t# "unexpected error in consume-twice!:" (ex-message t#)))) + (test/is false (str "No job found for queue " ~queue-name)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj deleted file mode 100644 index db429a8..0000000 --- a/src/com/github/ivarref/yoltq/virtual_queue.clj +++ /dev/null @@ -1,198 +0,0 @@ -(ns com.github.ivarref.yoltq.virtual-queue - (:require [clojure.tools.logging :as log] - [com.github.ivarref.yoltq.report-queue :as rq] - [com.github.ivarref.yoltq.ext-sys :as ext] - [com.github.ivarref.yoltq :as yq] - [datomic.api :as d] - [com.github.ivarref.yoltq.poller :as poller] - [clojure.test :as test] - [com.github.ivarref.yoltq.utils :as u] - [com.github.ivarref.yoltq.impl :as i]) - (:import (java.util.concurrent BlockingQueue TimeUnit) - (datomic Datom))) - - -(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn] - (let [ready? (promise)] - (future - (let [q (d/tx-report-queue conn)] - (try - (while @running? - (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)] - (swap! txq conj poll-result)) - (deliver ready? true) - (reset! bootstrapped? true)) - (catch Throwable t - (log/error t "test-poller crashed: " (ex-message t))) - (finally - (try - (d/remove-tx-report-queue conn) - (catch Throwable t - (log/warn t "remove-tx-report-queue failed:" (ex-message t)))) - (deliver poller-exited? true))))) - @ready?)) - - -(defmacro with-virtual-queue! - [& body] - `(let [txq# (atom []) - poller-exited?# (promise) - bootstrapped?# (atom false) - running?# (atom true) - config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) - :init-backoff-time 0 - :hung-log-level :warn - :tx-queue txq#})] - (with-bindings {#'yq/*config* config# - #'yq/*running?* (atom false) - #'yq/*test-mode* true - #'ext/*now-ns-atom* (atom 0) - #'ext/*random-atom* (atom 0) - #'ext/*squuid-atom* (atom 0)} - (try - ~@body - (finally - (reset! running?# false) - (when @bootstrapped?# - @poller-exited?#)))))) - - -(defn call-with-virtual-queue! - [f] - (with-virtual-queue! - (f))) - - -(defn run-report-queue! [min-items] - (let [{:keys [tx-queue conn]} @yq/*config* - id-ident (d/q '[:find ?e . - :where [?e :db/ident :com.github.ivarref.yoltq/id]] - (d/db conn))] - (let [timeout (+ 3000 (System/currentTimeMillis))] - (while (and (< (System/currentTimeMillis) timeout) - (< (count @tx-queue) min-items)) - (Thread/sleep 10))) - (when (< (count @tx-queue) min-items) - (let [msg (str "run-report-queue: timeout waiting for " min-items " items")] - (log/error msg) - (throw (ex-info msg {})))) - (let [res (atom [])] - (doseq [itm (first (swap-vals! tx-queue (constantly [])))] - (rq/process-poll-result! - @yq/*config* - id-ident - itm - (fn [f] (swap! res conj (f))))) - @res))) - - -(defn run-one-report-queue! [] - (first (run-report-queue! 1))) - - -(defn run-queue-once! [q status] - (poller/poll-once! @yq/*config* q status)) - - -(defn put! [q payload] - @(d/transact (:conn @yq/*config*) [(yq/put q payload)])) - - -(defn transact-result->maps [{:keys [tx-data db-after]}] - (let [m (->> tx-data - (group-by (fn [^Datom d] (.e d))) - (vals) - (mapv (fn [datoms] - (reduce (fn [o ^Datom d] - (if (.added d) - (assoc o (d/q '[:find ?r . - :in $ ?e - :where [?e :db/ident ?r]] - db-after - (.a d)) - (.v d)) - o)) - {} - datoms))))] - m)) - -(defn contains-queue-job? - [queue-id conn {::yq/keys [id queue-name status] :as m}] - (when (and (= queue-id queue-name) - (= status :init) - (d/q '[:find ?e . - :in $ ?id - :where - [?e ::yq/id ?id] - [?e ::yq/status :init]] - (d/db conn) - id)) - m)) - - -(defn get-tx-q-job [q-id] - (let [{:keys [tx-queue conn]} @yq/*config*] - (loop [timeout (+ 3000 (System/currentTimeMillis))] - (if-let [job (->> @tx-queue - (mapcat transact-result->maps) - (filter (partial contains-queue-job? q-id conn)) - (first))] - (u/get-queue-item (d/db conn) (::yq/id job)) - (if (< (System/currentTimeMillis) timeout) - (do (Thread/sleep 10) - (recur timeout)) - nil))))) - -(defmacro consume-expect! [queue-name expected-status] - `(if-let [job# (get-tx-q-job ~queue-name)] - (try - (with-bindings (:com.github.ivarref.yoltq/bindings job#) - (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) - (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! @yq/*config*))] - (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) - (if (:retval res#) - (:retval res#) - (:exception res#)))) - (catch Throwable t# - (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) - (test/is false (str "No job found for queue " ~queue-name)))) -tx-spent-time! - -(defmacro consume! [queue-name] - `(consume-expect! ~queue-name :done)) - - -(defn mark-fails! [{:keys [conn]} - {:com.github.ivarref.yoltq/keys [id lock tries]} - _] - (try - (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]] - @(d/transact conn tx) - nil) - (catch Throwable t - (log/error t "unexpected error in mark-status!: " (ex-message t)) - nil))) - - -(defmacro consume-twice! [queue-name] - `(if-let [job# (get-tx-q-job ~queue-name)] - (try - (with-bindings (:com.github.ivarref.yoltq/bindings job#) - (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) - (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!))) - (consume! ~queue-name)) - (catch Throwable t# - (log/error t# "unexpected error in consume-twice!:" (ex-message t#)))) - (test/is false (str "No job found for queue " ~queue-name)))) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index dacba68..df56460 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -2,7 +2,7 @@ (:require [com.github.ivarref.yoltq.log-init :as logconfig] [clojure.tools.logging :as log] [com.github.ivarref.yoltq.utils :as u] - [com.github.ivarref.yoltq :as dq] + [com.github.ivarref.yoltq :as yq] [datomic.api :as d] [clojure.string :as str] [com.github.ivarref.yoltq.impl :as i] @@ -35,7 +35,7 @@ (defn put-transact! [id payload] - @(d/transact (:conn @dq/*config*) [(i/put @dq/*config* id payload)])) + @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload)])) (defn advance! [tp] @@ -50,25 +50,25 @@ :where [?e :com.github.ivarref.yoltq/id _] [?e :com.github.ivarref.yoltq/status :done]] - (d/db (:conn @dq/*config*)))) + (d/db (:conn @yq/*config*)))) (defn get-init [& args] - (apply u/get-init @dq/*config* args)) + (apply u/get-init @yq/*config* args)) (defn get-error [& args] - (apply u/get-error @dq/*config* args)) + (apply u/get-error @yq/*config* args)) (defn get-hung [& args] - (apply u/get-hung @dq/*config* args)) + (apply u/get-hung @yq/*config* args)) (defn take! [& args] - (apply i/take! @dq/*config* args)) + (apply i/take! @yq/*config* args)) (defn execute! [& args] - (apply i/execute! @dq/*config* args)) + (apply i/execute! @yq/*config* args)) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 5e5fc92..442acac 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -1,8 +1,7 @@ (ns com.github.ivarref.yoltq.virtual-test (:require [datomic-schema.core] [clojure.test :refer :all] - [com.github.ivarref.yoltq.virtual-queue :as vq] - [com.github.ivarref.yoltq :as dq] + [com.github.ivarref.yoltq.test-queue :as tq] [com.github.ivarref.yoltq.test-utils :as u] [datomic.api :as d] [com.github.ivarref.yoltq.utils :as uu] @@ -12,29 +11,29 @@ [taoensso.timbre :as timbre])) -(use-fixtures :each vq/call-with-virtual-queue!) +(use-fixtures :each tq/call-with-virtual-queue!) (deftest happy-case-1 (let [conn (u/empty-conn)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q identity) - @(d/transact conn [(dq/put :q {:work 123})]) - (is (= {:work 123} (vq/consume! :q))))) + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))))) (deftest happy-case-tx-report-q (let [conn (u/empty-conn)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q identity) - @(d/transact conn [(dq/put :q {:work 123})]) - (is (= {:work 123} (:retval (vq/run-one-report-queue!)))) + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (:retval (tq/run-one-report-queue!)))) (is (= 1 (u/done-count))))) (deftest happy-case-poller (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :handlers {:q {:f (fn [payload] payload)}}}) (u/put-transact! :q {:work 123}) (u/advance! (:init-backoff-time yq/default-opts)) @@ -47,8 +46,8 @@ (deftest happy-case-queue-fn-allow-cas-failure (let [conn (u/empty-conn) invoke-count (atom 0)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [{:keys [id]}] (swap! invoke-count inc) @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]])) @@ -56,14 +55,14 @@ @(d/transact conn #d/schema [[:e/id :one :string :id] [:e/val :one :string]]) @(d/transact conn [{:e/id "demo"} - (dq/put :q {:id "demo"})]) + (yq/put :q {:id "demo"})]) (u/advance! (:init-backoff-time yq/default-opts)) - (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] (log/info "mark-status! doing nothing for new status" new-status))) (is (nil? (some->> (u/get-init :q) (u/take!) (u/execute!)))) - (swap! dq/*config* dissoc :mark-status-fn!) + (swap! yq/*config* dissoc :mark-status-fn!) ; (mark-status! :done) failed but f succeeded. (is (nil? (u/get-hung :q))) @@ -78,7 +77,7 @@ (deftest backoff-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :init-backoff-time (:init-backoff-time yq/default-opts) :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) (u/put-transact! :q {:work 123}) @@ -102,7 +101,7 @@ (deftest get-hung-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :init-backoff-time (:init-backoff-time yq/default-opts) :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) (u/put-transact! :q {:work 123}) @@ -122,7 +121,7 @@ (deftest basic-locking (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :init-backoff-time (:init-backoff-time yq/default-opts) :cas-failures (atom 0) :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) @@ -133,12 +132,12 @@ (let [job (u/get-init :q)] (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status))) (u/take! job) - (is (= 1 @(:cas-failures @dq/*config*)))))) + (is (= 1 @(:cas-failures @yq/*config*)))))) (deftest retry-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :init-backoff-time (:init-backoff-time yq/default-opts) :handlers {:q {:f (let [c (atom 0)] @@ -161,34 +160,34 @@ (deftest max-retries-test (let [conn (u/empty-conn) call-count (atom 0)] - (dq/init! {:conn conn + (yq/init! {:conn conn :error-backoff-time 0}) - (dq/add-consumer! :q (fn [_] + (yq/add-consumer! :q (fn [_] (swap! call-count inc) (throw (ex-info "janei" {}))) {:max-retries 1}) - (vq/put! :q {:work 123}) - (is (some? (:exception (vq/run-one-report-queue!)))) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) (dotimes [_ 10] - (vq/run-queue-once! :q :error)) + (tq/run-queue-once! :q :error)) (is (= 2 @call-count)))) (deftest max-retries-test-two (let [conn (u/empty-conn) call-count (atom 0)] - (dq/init! {:conn conn + (yq/init! {:conn conn :error-backoff-time 0}) - (dq/add-consumer! :q (fn [_] + (yq/add-consumer! :q (fn [_] (swap! call-count inc) (throw (ex-info "janei" {}))) {:max-retries 3}) - (vq/put! :q {:work 123}) - (is (some? (:exception (vq/run-one-report-queue!)))) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) (dotimes [_ 20] - (vq/run-queue-once! :q :error)) + (tq/run-queue-once! :q :error)) (is (= 4 @call-count)))) @@ -196,55 +195,55 @@ (let [conn (u/empty-conn) call-count (atom 0) missed-mark-status (atom nil)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (if (= 1 (swap! call-count inc)) (throw (ex-info "error" {})) (log/info "return OK")))) - (vq/put! :q {:id "demo"}) - (vq/run-one-report-queue!) ; now in status :error + (tq/put! :q {:id "demo"}) + (tq/run-one-report-queue!) ; now in status :error - (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] (reset! missed-mark-status new-status) (log/info "mark-status! doing nothing for new status" new-status))) (u/advance! (:error-backoff-time @yq/*config*)) - (vq/run-queue-once! :q :error) - (swap! dq/*config* dissoc :mark-status-fn!) + (tq/run-queue-once! :q :error) + (swap! yq/*config* dissoc :mark-status-fn!) (is (= :done @missed-mark-status)) - (is (nil? (uu/get-hung @dq/*config* :q))) + (is (nil? (uu/get-hung @yq/*config* :q))) (u/advance! (:hung-backoff-time @yq/*config*)) - (is (some? (uu/get-hung @dq/*config* :q))) + (is (some? (uu/get-hung @yq/*config* :q))) (is (= 2 @call-count)) - (is (true? (some->> (uu/get-hung (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q) - (i/take! @dq/*config*) - (i/execute! @dq/*config*) + (is (true? (some->> (uu/get-hung (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q) + (i/take! @yq/*config*) + (i/execute! @yq/*config*) :failed?))) (u/advance! (:error-backoff-time @yq/*config*)) - (is (some? (uu/get-error @dq/*config* :q))) - (is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q))))) + (is (some? (uu/get-error @yq/*config* :q))) + (is (nil? (uu/get-error (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q))))) (deftest consume-expect-test (let [conn (u/empty-conn) seen (atom #{})] - (dq/init! {:conn conn}) - (dq/add-consumer! :q (fn [payload] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [payload] (when (= #{1 2} (swap! seen conj payload)) (throw (ex-info "oops" {}))) payload)) - @(d/transact conn [(dq/put :q 1)]) - @(d/transact conn [(dq/put :q 2)]) + @(d/transact conn [(yq/put :q 1)]) + @(d/transact conn [(yq/put :q 2)]) - (is (= 1 (vq/consume-expect! :q :done))) - (vq/consume-expect! :q :error))) + (is (= 1 (tq/consume-expect! :q :done))) + (tq/consume-expect! :q :error))) (def ^:dynamic *some-binding* nil) @@ -252,34 +251,35 @@ (deftest binding-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn + (yq/init! {:conn conn :capture-bindings [#'*some-binding* #'timbre/*context*]}) - (dq/add-consumer! :q (fn [_] *some-binding*)) + (yq/add-consumer! :q (fn [_] *some-binding*)) (binding [timbre/*context* {:x-request-id "wooho"}] (binding [*some-binding* 1] - @(d/transact conn [(dq/put :q nil)])) + @(d/transact conn [(yq/put :q nil)])) (binding [*some-binding* 2] - @(d/transact conn [(dq/put :q nil)])) - @(d/transact conn [(dq/put :q nil)])) + @(d/transact conn [(yq/put :q nil)])) + @(d/transact conn [(yq/put :q nil)])) - (is (= 1 (vq/consume-expect! :q :done))) - (is (= 2 (vq/consume-expect! :q :done))) - (is (nil? (vq/consume-expect! :q :done))))) + (is (= 1 (tq/consume-expect! :q :done))) + (is (= 2 (tq/consume-expect! :q :done))) + (is (nil? (tq/consume-expect! :q :done))))) (deftest default-binding-test (let [conn (u/empty-conn)] - (dq/init! {:conn conn}) - (dq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*))) + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*))) (binding [timbre/*context* {:x-request-id "123"}] - @(d/transact conn [(dq/put :q nil)])) - (is (= "123" (vq/consume-expect! :q :done))))) + @(d/transact conn [(yq/put :q nil)])) + (is (= "123" (tq/consume-expect! :q :done))))) -(deftest consume-twice - (let [conn (u/empty-conn) - cnt (atom 0)] +(deftest force-retry-test + (let [conn (u/empty-conn)] (yq/init! {:conn conn}) - (yq/add-consumer! :q (fn [_] (swap! cnt inc))) - @(d/transact conn [(dq/put :q nil)]) - (is (= 2 (vq/consume-twice! :q))))) + (yq/add-consumer! :q (let [cnt (atom 0)] + (fn [_] (swap! cnt inc)))) + @(d/transact conn [(yq/put :q nil)]) + (is (= 1 (tq/consume! :q))) + #_(is (= 2 (tq/force-retry! :q))))) -- cgit v1.2.3 From 79cc3f448949d755c59265e2316408d037be20cb Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 23 Sep 2021 11:30:03 +0200 Subject: Add force-retry! --- src/com/github/ivarref/yoltq/impl.clj | 2 +- src/com/github/ivarref/yoltq/test_queue.clj | 49 +++++++++++--------------- test/com/github/ivarref/yoltq/virtual_test.clj | 3 +- 3 files changed, 23 insertions(+), 31 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index cb99b08..9c95cff 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -67,7 +67,7 @@ (cond (= :db.error/cas-failed error) (do - (log/info ":db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) + (log/info "take! :db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) (when cas-failures (swap! cas-failures inc)) nil) diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index 6aeb959..4c4f903 100644 --- a/src/com/github/ivarref/yoltq/test_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -42,6 +42,7 @@ config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) :init-backoff-time 0 :hung-log-level :warn + :prev-consumed (atom {}) :tx-queue txq#})] (with-bindings {#'yq/*config* config# #'yq/*running?* (atom false) @@ -154,44 +155,34 @@ (:com.github.ivarref.yoltq/status job#)) (i/take! @yq/*config*) (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) (if (:retval res#) (:retval res#) (:exception res#)))) (catch Throwable t# - (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) + (log/error t# "unexpected error in consume-expect:" (ex-message t#)) + (throw t#))) (test/is false (str "No job found for queue " ~queue-name)))) (defmacro consume! [queue-name] `(consume-expect! ~queue-name :done)) -(defn mark-fails! [{:keys [conn]} - {:com.github.ivarref.yoltq/keys [id lock tries]} - _] - (try - (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]] - @(d/transact conn tx) - nil) - (catch Throwable t - (log/error t "unexpected error in mark-status!: " (ex-message t)) - nil))) - - (defmacro force-retry! [queue-name] - `(if-let [job# (get-tx-q-job ~queue-name)] - (try - (with-bindings (:com.github.ivarref.yoltq/bindings job#) - (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) - (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!))) - (consume! ~queue-name)) - (catch Throwable t# - (log/error t# "unexpected error in consume-twice!:" (ex-message t#)))) - (test/is false (str "No job found for queue " ~queue-name)))) \ No newline at end of file + `(if-let [job# (some-> @yq/*config* :prev-consumed deref (get ~queue-name))] + (let [db-res# @(d/transact (:conn @yq/*config*) [{:com.github.ivarref.yoltq/id (:com.github.ivarref.yoltq/id job#) + :com.github.ivarref.yoltq/status :init}]) + res# (some->> (u/prepare-processing (:db-after db-res#) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + :init) + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) + (test/is (= :done (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#))) + (test/is false "Expected to have previously consumed something. Was nil."))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 442acac..2b67e5e 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -282,4 +282,5 @@ (fn [_] (swap! cnt inc)))) @(d/transact conn [(yq/put :q nil)]) (is (= 1 (tq/consume! :q))) - #_(is (= 2 (tq/force-retry! :q))))) + (is (= 2 (tq/force-retry! :q))) + (is (= 3 (tq/force-retry! :q))))) -- cgit v1.2.3 From dc2e14b4e1e91e6fefecc01c312a44c0033640c9 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 23 Sep 2021 13:01:23 +0200 Subject: Basic depends-on works for test queue --- src/com/github/ivarref/yoltq.clj | 12 +++--- src/com/github/ivarref/yoltq/impl.clj | 51 +++++++++++++++++++++----- src/com/github/ivarref/yoltq/test_queue.clj | 27 ++++++++------ src/com/github/ivarref/yoltq/utils.clj | 1 + test/com/github/ivarref/yoltq/test_utils.clj | 2 +- test/com/github/ivarref/yoltq/virtual_test.clj | 27 ++++++++++++++ 6 files changed, 92 insertions(+), 28 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 58efca1..3164020 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -94,11 +94,13 @@ (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f})))))) -(defn put [queue-id payload] - (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] - (when (and *test-mode* bootstrap-poller!) - (bootstrap-poller! conn)) - (i/put cfg queue-id payload))) +(defn put + ([queue-id payload] (put queue-id payload {})) + ([queue-id payload opts] + (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] + (when (and *test-mode* bootstrap-poller!) + (bootstrap-poller! conn)) + (i/put cfg queue-id payload opts)))) (defn- do-start! [] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 9c95cff..9811c93 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -8,9 +8,11 @@ (def schema [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} + #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true} @@ -20,8 +22,10 @@ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) -(defn put [{:keys [capture-bindings] :as config} - queue-name payload] +(defn put [{:keys [capture-bindings conn] :as config} + queue-name + payload + opts] (if-let [_ (get-in config [:handlers queue-name])] (let [id (u/squuid) str-bindings (->> (reduce (fn [o k] @@ -30,19 +34,46 @@ (or capture-bindings [])) (pr-str))] (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) - {:com.github.ivarref.yoltq/id id - :com.github.ivarref.yoltq/queue-name queue-name - :com.github.ivarref.yoltq/status u/status-init - :com.github.ivarref.yoltq/payload (pr-str payload) - :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/lock (u/random-uuid) - :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ns)}) + (merge + {:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/payload (pr-str payload) + :com.github.ivarref.yoltq/bindings str-bindings + :com.github.ivarref.yoltq/opts (pr-str (or opts {})) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ns)} + (when-let [[q ext-id] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] + (d/db conn) + (pr-str [q ext-id])) + (throw (ex-info ":depends-on not found in database" opts)))) + (when-let [ext-id (:id opts)] + {:com.github.ivarref.yoltq/ext-id (pr-str [queue-name ext-id])}))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) +(defn depends-on-waiting? [{:keys [conn]} + {:keys [id]}] + (let [db (d/db conn)] + (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db id)] + (when-let [[q id :as depends-on] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id] + [?e :com.github.ivarref.yoltq/status :done]] + db + (pr-str [q id])) + {:depends-on depends-on}))))) + + (defn take! [{:keys [conn cas-failures hung-log-level tx-spent-time!] :or {hung-log-level :error}} {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}] diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index 4c4f903..6183216 100644 --- a/src/com/github/ivarref/yoltq/test_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -148,18 +148,21 @@ `(if-let [job# (get-tx-q-job ~queue-name)] (try (with-bindings (:com.github.ivarref.yoltq/bindings job#) - (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) - (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! @yq/*config*))] - (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) - (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) - (if (:retval res#) - (:retval res#) - (:exception res#)))) + (let [prep# (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#))] + (if-let [depends-on# (i/depends-on-waiting? @yq/*config* prep#)] + depends-on# + (let [res# (some->> prep# + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) + (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#)))))) (catch Throwable t# (log/error t# "unexpected error in consume-expect:" (ex-message t#)) (throw t#))) diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index 9501343..d551510 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -56,6 +56,7 @@ (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) (dissoc :db/id) (update :com.github.ivarref.yoltq/payload edn/read-string) + (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {}))) (update :com.github.ivarref.yoltq/bindings (fn [s] (when s diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index df56460..5427ff5 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -35,7 +35,7 @@ (defn put-transact! [id payload] - @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload)])) + @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload {})])) (defn advance! [tp] diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 2b67e5e..789e5b4 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -284,3 +284,30 @@ (is (= 1 (tq/consume! :q))) (is (= 2 (tq/force-retry! :q))) (is (= 3 (tq/force-retry! :q))))) + + +(deftest ext-id-no-duplicates + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q nil {:id "123"})]) + (is (thrown? Exception @(d/transact conn [(yq/put :q nil {:id "123"})]))))) + + +(deftest depends-on + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity) + @(d/transact conn [(yq/put :a "a" {:id "1"})]) + (is (thrown? Exception @(d/transact conn [(yq/put :b "b" {:depends-on [:a "0"]})]))) + @(d/transact conn [(yq/put :b "b" {:depends-on [:a "1"]})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "1"]} (tq/consume! :b))) + (is (= {:depends-on [:a "1"]} (tq/consume! :b))) + + (is (= "a" (tq/consume! :a))) + (is (= "b" (tq/consume! :b))) + (is (= "b" (tq/force-retry! :b))))) + -- cgit v1.2.3 From 74ef39824b6eb0f8f69720e2f6209ec3de0cefbe Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 23 Sep 2021 14:33:09 +0200 Subject: Document ordering and depends on as well as add license --- LICENSE | 277 +++++++++++++++++++++++++ README.md | 38 +++- test/com/github/ivarref/yoltq/virtual_test.clj | 14 +- 3 files changed, 313 insertions(+), 16 deletions(-) create mode 100644 LICENSE (limited to 'test/com/github/ivarref/yoltq') diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d3087e4 --- /dev/null +++ b/LICENSE @@ -0,0 +1,277 @@ +Eclipse Public License - v 2.0 + + THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE + PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION + OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + + a) in the case of the initial Contributor, the initial content + Distributed under this Agreement, and + + b) in the case of each subsequent Contributor: + i) changes to the Program, and + ii) additions to the Program; + where such changes and/or additions to the Program originate from + and are Distributed by that particular Contributor. A Contribution + "originates" from a Contributor if it was added to the Program by + such Contributor itself or anyone acting on such Contributor's behalf. + Contributions do not include changes or additions to the Program that + are not Modified Works. + +"Contributor" means any person or entity that Distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which +are necessarily infringed by the use or sale of its Contribution alone +or when combined with the Program. + +"Program" means the Contributions Distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement +or any Secondary License (as applicable), including Contributors. + +"Derivative Works" shall mean any work, whether in Source Code or other +form, that is based on (or derived from) the Program and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. + +"Modified Works" shall mean any work in Source Code or other form that +results from an addition to, deletion from, or modification of the +contents of the Program, including, for purposes of clarity any new file +in Source Code form that contains any contents of the Program. Modified +Works shall not include works that contain only declarations, +interfaces, types, classes, structures, or files of the Program solely +in each case in order to link to, bind by name, or subclass the Program +or Modified Works thereof. + +"Distribute" means the acts of a) distributing or b) making available +in any manner that enables the transfer of a copy. + +"Source Code" means the form of a Program preferred for making +modifications, including but not limited to software source code, +documentation source, and configuration files. + +"Secondary License" means either the GNU General Public License, +Version 2.0, or any later versions of that license, including any +exceptions or additional permissions as identified by the initial +Contributor. + +2. GRANT OF RIGHTS + + a) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free copyright + license to reproduce, prepare Derivative Works of, publicly display, + publicly perform, Distribute and sublicense the Contribution of such + Contributor, if any, and such Derivative Works. + + b) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free patent + license under Licensed Patents to make, use, sell, offer to sell, + import and otherwise transfer the Contribution of such Contributor, + if any, in Source Code or other form. This patent license shall + apply to the combination of the Contribution and the Program if, at + the time the Contribution is added by the Contributor, such addition + of the Contribution causes such combination to be covered by the + Licensed Patents. The patent license shall not apply to any other + combinations which include the Contribution. No hardware per se is + licensed hereunder. + + c) Recipient understands that although each Contributor grants the + licenses to its Contributions set forth herein, no assurances are + provided by any Contributor that the Program does not infringe the + patent or other intellectual property rights of any other entity. + Each Contributor disclaims any liability to Recipient for claims + brought by any other entity based on infringement of intellectual + property rights or otherwise. As a condition to exercising the + rights and licenses granted hereunder, each Recipient hereby + assumes sole responsibility to secure any other intellectual + property rights needed, if any. For example, if a third party + patent license is required to allow Recipient to Distribute the + Program, it is Recipient's responsibility to acquire that license + before distributing the Program. + + d) Each Contributor represents that to its knowledge it has + sufficient copyright rights in its Contribution, if any, to grant + the copyright license set forth in this Agreement. + + e) Notwithstanding the terms of any Secondary License, no + Contributor makes additional grants to any Recipient (other than + those set forth in this Agreement) as a result of such Recipient's + receipt of the Program under the terms of a Secondary License + (if permitted under the terms of Section 3). + +3. REQUIREMENTS + +3.1 If a Contributor Distributes the Program in any form, then: + + a) the Program must also be made available as Source Code, in + accordance with section 3.2, and the Contributor must accompany + the Program with a statement that the Source Code for the Program + is available under this Agreement, and informs Recipients how to + obtain it in a reasonable manner on or through a medium customarily + used for software exchange; and + + b) the Contributor may Distribute the Program under a license + different than this Agreement, provided that such license: + i) effectively disclaims on behalf of all other Contributors all + warranties and conditions, express and implied, including + warranties or conditions of title and non-infringement, and + implied warranties or conditions of merchantability and fitness + for a particular purpose; + + ii) effectively excludes on behalf of all other Contributors all + liability for damages, including direct, indirect, special, + incidental and consequential damages, such as lost profits; + + iii) does not attempt to limit or alter the recipients' rights + in the Source Code under section 3.2; and + + iv) requires any subsequent distribution of the Program by any + party to be under a license that satisfies the requirements + of this section 3. + +3.2 When the Program is Distributed as Source Code: + + a) it must be made available under this Agreement, or if the + Program (i) is combined with other material in a separate file or + files made available under a Secondary License, and (ii) the initial + Contributor attached to the Source Code the notice described in + Exhibit A of this Agreement, then the Program may be made available + under the terms of such Secondary Licenses, and + + b) a copy of this Agreement must be included with each copy of + the Program. + +3.3 Contributors may not remove or alter any copyright, patent, +trademark, attribution notices, disclaimers of warranty, or limitations +of liability ("notices") contained within the Program from any copy of +the Program which they Distribute, provided that Contributors may add +their own appropriate notices. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities +with respect to end users, business partners and the like. While this +license is intended to facilitate the commercial use of the Program, +the Contributor who includes the Program in a commercial product +offering should do so in a manner which does not create potential +liability for other Contributors. Therefore, if a Contributor includes +the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and indemnify every +other Contributor ("Indemnified Contributor") against any losses, +damages and costs (collectively "Losses") arising from claims, lawsuits +and other legal actions brought by a third party against the Indemnified +Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program +in a commercial product offering. The obligations in this section do not +apply to any claims or Losses relating to any actual or alleged +intellectual property infringement. In order to qualify, an Indemnified +Contributor must: a) promptly notify the Commercial Contributor in +writing of such claim, and b) allow the Commercial Contributor to control, +and cooperate with the Commercial Contributor in, the defense and any +related settlement negotiations. The Indemnified Contributor may +participate in any such claim at its own expense. + +For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those performance +claims and warranties, and if a court requires any other Contributor to +pay any damages as a result, the Commercial Contributor must pay +those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS" +BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR +IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF +TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR +PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all +risks associated with its exercise of rights under this Agreement, +including but not limited to the risks and costs of program errors, +compliance with applicable laws, damage to or loss of data, programs +or equipment, and unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS +SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST +PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further +action by the parties hereto, such provision shall be reformed to the +minimum extent necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other software +or hardware) infringes such Recipient's patent(s), then such Recipient's +rights granted under Section 2(b) shall terminate as of the date such +litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of +time after becoming aware of such noncompliance. If all Recipient's +rights under this Agreement terminate, Recipient agrees to cease use +and distribution of the Program as soon as reasonably practicable. +However, Recipient's obligations under this Agreement and any licenses +granted by Recipient relating to the Program shall continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, +but in order to avoid inconsistency the Agreement is copyrighted and +may only be modified in the following manner. The Agreement Steward +reserves the right to publish new versions (including revisions) of +this Agreement from time to time. No one other than the Agreement +Steward has the right to modify this Agreement. The Eclipse Foundation +is the initial Agreement Steward. The Eclipse Foundation may assign the +responsibility to serve as the Agreement Steward to a suitable separate +entity. Each new version of the Agreement will be given a distinguishing +version number. The Program (including Contributions) may always be +Distributed subject to the version of the Agreement under which it was +received. In addition, after a new version of the Agreement is published, +Contributor may elect to Distribute the Program (including its +Contributions) under the new version. + +Except as expressly stated in Sections 2(a) and 2(b) above, Recipient +receives no rights or licenses to the intellectual property of any +Contributor under this Agreement, whether expressly, by implication, +estoppel or otherwise. All rights in the Program not expressly granted +under this Agreement are reserved. Nothing in this Agreement is intended +to be enforceable by any entity that is not a Contributor or Recipient. +No third-party beneficiary rights are created under this Agreement. + +Exhibit A - Form of Secondary Licenses Notice + +"This Source Code may also be made available under the following +Secondary Licenses when the conditions for such availability set forth +in the Eclipse Public License, v. 2.0 are satisfied: {name license(s), +version(s), and exceptions or additional permissions here}." + + Simply including a copy of this Agreement, including this Exhibit A + is not sufficient to license the Source Code under Secondary Licenses. + + If it is not possible or desirable to put the notice in a particular + file, then You may include the notice in a location (such as a LICENSE + file in a relevant directory) where a recipient would be likely to + look for such a notice. + + You may add additional accurate notices of copyright ownership. diff --git a/README.md b/README.md index dab1cef..8078866 100644 --- a/README.md +++ b/README.md @@ -239,6 +239,37 @@ You may invoke `yq/add-consumer!` and `yq/init!` on a live system as you like. If you change `:pool-size` or `:poll-delay` you will have to `(yq/stop!)` and `(yq/start!)` to make changes take effect. +## Queue job dependencies and ordering + +It is possible to specify that one queue job must wait for another queue +job to complete before it will be executed: + +```clojure +@(d/transact conn [(yq/put :a + ; Payload: + {:id "a1"} + ; Job options: + {:id "a1"})]) + +@(d/transact conn [(yq/put :b + ; Payload: + {:id "b1"} + ; Jobs options: + {:depends-on [:a "a1"]})]) +``` + +Here queue job `b1` will not execute before `:a1` is `:done`. + +Note that queue-name plus `:id` in job options must be an unique value. +In the example above that means `:a` plus `a1` must be unique. + +When specifying `:depends-on`, the job must at least exist in the database, +otherwise `yq/put` will throw an exception. + +Other than this there is no attempt at ordering the execution of queue jobs. +In fact the opposite is done in the poller to guard against the case that a single failing queue job +could effectively take down the entire retry polling job. + # Testing For testing you will probably want determinism over an extra threadpool @@ -273,13 +304,6 @@ by using the test queue: ``` -### Ordering - -There is no attempt at ordering the execution of queue jobs. -In fact the opposite is done to guard against the case that a single failing queue job -could effectively take down the entire retry polling job. - - ## License Copyright © 2021 Ivar Refsdal diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 789e5b4..93ad0b6 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -299,15 +299,11 @@ (yq/init! {:conn conn}) (yq/add-consumer! :a identity) (yq/add-consumer! :b identity) - @(d/transact conn [(yq/put :a "a" {:id "1"})]) - (is (thrown? Exception @(d/transact conn [(yq/put :b "b" {:depends-on [:a "0"]})]))) - @(d/transact conn [(yq/put :b "b" {:depends-on [:a "1"]})]) + @(d/transact conn [(yq/put :a {:id "a1"} {:id "a1"})]) + @(d/transact conn [(yq/put :b {:id "b1"} {:depends-on [:a "a1"]})]) ; can't consume :b yet: - (is (= {:depends-on [:a "1"]} (tq/consume! :b))) - (is (= {:depends-on [:a "1"]} (tq/consume! :b))) - - (is (= "a" (tq/consume! :a))) - (is (= "b" (tq/consume! :b))) - (is (= "b" (tq/force-retry! :b))))) + (is (= {:depends-on [:a "a1"]} (tq/consume! :b))) + (is (= {:id "a1"} (tq/consume! :a))) + (is (= {:id "b1"} (tq/consume! :b))))) -- cgit v1.2.3 From f2bc137283616b46aad9519cacade93969af3fdb Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 24 Sep 2021 10:42:56 +0200 Subject: Be paranoid when persisting with pr-str --- src/com/github/ivarref/yoltq/impl.clj | 23 +++++++++++++++++------ test/com/github/ivarref/yoltq/virtual_test.clj | 8 ++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index a315545..adc169d 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -3,7 +3,8 @@ [clojure.tools.logging :as log] [clojure.string :as str] [com.github.ivarref.yoltq.utils :as u] - [com.github.ivarref.yoltq.ext-sys :as ext])) + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.edn :as edn])) (def schema @@ -22,6 +23,16 @@ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) +(defn pr-str-safe [what x] + (try + (if (= x (edn/read-string (pr-str x))) + (pr-str x) + (throw (ex-info (str "Could not read-string " what) {:input x}))) + (catch Exception e + (log/error "could not read-string" what ":" (ex-message e)) + (throw e)))) + + (defn put [{:keys [capture-bindings conn] :as config} queue-name payload @@ -32,15 +43,15 @@ (assoc o (symbol k) (deref k))) {} (or capture-bindings [])) - (pr-str))] + (pr-str-safe :capture-bindings))] (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) (merge {:com.github.ivarref.yoltq/id id :com.github.ivarref.yoltq/queue-name queue-name :com.github.ivarref.yoltq/status u/status-init - :com.github.ivarref.yoltq/payload (pr-str payload) + :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload) :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/opts (pr-str (or opts {})) + :com.github.ivarref.yoltq/opts (pr-str-safe :opts (or opts {})) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 :com.github.ivarref.yoltq/init-time (u/now-ns)} @@ -50,10 +61,10 @@ :where [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] (d/db conn) - (pr-str [q ext-id])) + (pr-str-safe :depends-on [q ext-id])) (throw (ex-info ":depends-on not found in database" opts)))) (when-let [ext-id (:id opts)] - {:com.github.ivarref.yoltq/ext-id (pr-str [queue-name ext-id])}))) + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 93ad0b6..fdbf6b3 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -307,3 +307,11 @@ (is (= {:id "a1"} (tq/consume! :a))) (is (= {:id "b1"} (tq/consume! :b))))) + + +(deftest verify-can-read-string + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})])))))) -- cgit v1.2.3 From e142149a4282a669f3f95cb52f708d234a8ded23 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 24 Sep 2021 10:59:06 +0200 Subject: Support :depends-on on queue level --- src/com/github/ivarref/yoltq/impl.clj | 11 ++++++++--- test/com/github/ivarref/yoltq/virtual_test.clj | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index adc169d..50441ff 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -37,8 +37,9 @@ queue-name payload opts] - (if-let [_ (get-in config [:handlers queue-name])] + (if-let [q-config (get-in config [:handlers queue-name])] (let [id (u/squuid) + depends-on (get q-config :depends-on (fn [_] nil)) str-bindings (->> (reduce (fn [o k] (assoc o (symbol k) (deref k))) {} @@ -51,11 +52,15 @@ :com.github.ivarref.yoltq/status u/status-init :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload) :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/opts (pr-str-safe :opts (or opts {})) + :com.github.ivarref.yoltq/opts (pr-str-safe :opts + (merge + (when-let [deps (depends-on payload)] + {:depends-on deps}) + (or opts {}))) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 :com.github.ivarref.yoltq/init-time (u/now-ns)} - (when-let [[q ext-id] (:depends-on opts)] + (when-let [[q ext-id] (or (:depends-on opts) (depends-on payload))] (when-not (d/q '[:find ?e . :in $ ?ext-id :where diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index fdbf6b3..3f7365f 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -309,6 +309,21 @@ (is (= {:id "b1"} (tq/consume! :b))))) +(deftest depends-on-queue-level + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity {:depends-on (fn [{:keys [id]}] [:a id])}) + @(d/transact conn [(yq/put :a {:id "1"} {:id "1"})]) + @(d/transact conn [(yq/put :b {:id "1"})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "1"]} (tq/consume! :b))) + + (is (= {:id "1"} (tq/consume! :a))) + (is (= {:id "1"} (tq/consume! :b))))) + + (deftest verify-can-read-string (let [conn (u/empty-conn)] (yq/init! {:conn conn}) -- cgit v1.2.3 From 951831f14b4a14e3e7b36dd18f118d6e7404d72e Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 24 Sep 2021 11:01:48 +0200 Subject: Clean test output --- test/com/github/ivarref/yoltq/virtual_test.clj | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 3f7365f..8f7b454 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -186,8 +186,9 @@ (tq/put! :q {:work 123}) (is (some? (:exception (tq/run-one-report-queue!)))) - (dotimes [_ 20] - (tq/run-queue-once! :q :error)) + (timbre/with-level :fatal + (dotimes [_ 20] + (tq/run-queue-once! :q :error))) (is (= 4 @call-count)))) -- cgit v1.2.3 From 79acba1b716685bb601e05a2e9824eefd19d1f5d Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Mon, 27 Sep 2021 14:36:24 +0200 Subject: Add :valid-payload? function --- README.md | 2 ++ src/com/github/ivarref/yoltq/impl.clj | 4 ++++ test/com/github/ivarref/yoltq/virtual_test.clj | 11 +++++++++++ 3 files changed, 17 insertions(+) (limited to 'test/com/github/ivarref/yoltq') diff --git a/README.md b/README.md index 314c779..7e49431 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,8 @@ the payload. It can be added like this: ; An optional map of queue opts {:allow-cas-failure? true ; Treat [:db.cas ...] failures as success. This is one way for the ; consumer function to ensure idempotence. + :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload. Should return truthy for valid payloads. + ; The default function always returns true. :max-retries 10}) ; Specify maximum number of times an item will be retried. Default: 100 ``` diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 02cc102..8b75fc3 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -40,6 +40,7 @@ (if-let [q-config (get-in config [:handlers queue-name])] (let [id (u/squuid) depends-on (get q-config :depends-on (fn [_] nil)) + valid-payload? (get q-config :valid-payload? (fn [_] true)) opts (merge (when-let [deps (depends-on payload)] {:depends-on deps}) @@ -49,6 +50,9 @@ {} (or capture-bindings [])) (pr-str-safe :capture-bindings))] + (when-not (valid-payload? payload) + (log/error "Payload was not valid. Payload was:" payload) + (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) (merge {:com.github.ivarref.yoltq/id id diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 8f7b454..acd3eb7 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -331,3 +331,14 @@ (yq/add-consumer! :a identity) (timbre/with-level :fatal (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})])))))) + + +(deftest payload-verifier + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity + {:valid-payload? (fn [{:keys [id]}] + (some? id))}) + @(d/transact conn [(yq/put :q {:id "a"})]) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) \ No newline at end of file -- cgit v1.2.3 From b28837ea804fbc6abd14fae23a92933b9406d5e1 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sun, 27 Mar 2022 13:49:50 +0200 Subject: Add healthy?, queue-stats functions and default functions for :on-system-error and :on-system-recovery --- README.md | 30 +++++++++++++++++++- deps.edn | 8 ++++-- pom.xml | 4 +-- release.sh | 4 +-- src/com/github/ivarref/yoltq.clj | 32 ++++++++++++++++++++-- src/com/github/ivarref/yoltq/error_poller.clj | 19 +++++++++---- .../com/github/ivarref/yoltq/error_poller_test.clj | 2 +- 7 files changed, 80 insertions(+), 19 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/README.md b/README.md index 7e49431..f62d46c 100644 --- a/README.md +++ b/README.md @@ -331,6 +331,34 @@ These dynamic bindings will be in place when yoltq logs errors, warnings etc. about failing consumer functions, possibly making troubleshooting easier. +## Change log + +### 2022-03-27 v0.2.41 +``` + Added function `healthy?` that returns: + true if no errors + false if one or more errors + nil if error-poller is yet to be executed. + + Added default functions for `:on-system-error` and `:on-system-recovery` + that simply logs that the system is in error (ERROR level) or has + recovered (INFO level). + + Added function `queue-stats` that returns a nicely "formatted" + vector of queue stats, for example: + (queue-stats) + => + [{:qname :add-message-thread, :status :done, :count 10274} + {:qname :add-message-thread, :status :init, :count 30} + {:qname :add-message-thread, :status :processing, :count 1} + {:qname :send-message, :status :done, :count 21106} + {:qname :send-message, :status :init, :count 56}] +``` + +### 2021-09-27 v0.2.39: ? +### 2021-09-27 v0.2.37: ? + +### 2021-09-24 v0.2.33: First publicly announced release. ## License @@ -345,4 +373,4 @@ Licenses when the conditions for such availability set forth in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version, with the GNU Classpath Exception which is available -at https://www.gnu.org/software/classpath/license.html. \ No newline at end of file +at https://www.gnu.org/software/classpath/license.html. diff --git a/deps.edn b/deps.edn index a457628..d0f0a26 100644 --- a/deps.edn +++ b/deps.edn @@ -22,8 +22,10 @@ :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} - :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.1.3"}} - :main-opts ["-m" "deps-deploy.deps-deploy" "deploy" - "target/out.jar" "true"]}} + :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} + :exec-fn deps-deploy.deps-deploy/deploy + :exec-args {:installer :remote + :sign-releases? false + :artifact "target/out.jar"}}} :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}} diff --git a/pom.xml b/pom.xml index 9784836..e486fb1 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ jar com.github.ivarref yoltq - 0.2.39 + 0.2.40 yoltq @@ -30,7 +30,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.39 + v0.2.40 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/release.sh b/release.sh index 70f67b5..dec59a2 100755 --- a/release.sh +++ b/release.sh @@ -13,8 +13,6 @@ git commit -m "Release $VERSION" git tag -a v$VERSION -m "Release v$VERSION" git push --follow-tags -clojure -M:deploy +clojure -X:deploy echo "Released $VERSION" - -rm *.pom.asc \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 3164020..03a364f 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -17,7 +17,6 @@ (defonce ^:dynamic *running?* (atom false)) (defonce ^:dynamic *test-mode* false) - (def default-opts (-> {; Default number of times a queue job will be retried before giving up ; Can be overridden on a per consumer basis with @@ -79,7 +78,8 @@ (-> (merge-with (fn [a b] (or b a)) {:running-queues (atom #{}) :start-execute-time (atom {}) - :system-error (atom {})} + :system-error (atom {}) + :healthy? (atom nil)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) cfg) @@ -148,6 +148,32 @@ (reset! threadpool nil)))))) +(defn healthy? [] + (some->> @*config* + :healthy? + (deref))) + +(defn queue-stats [] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?status + :in $ + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status]] + db) + (mapv (partial zipmap [:e :qname :status])) + (mapv #(select-keys % [:qname :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname status]} v]] + (array-map + :qname qname + :status status + :count v))) + (sort-by (juxt :qname :status)) + (vec)))) + (comment (do (require 'com.github.ivarref.yoltq.log-init) @@ -177,4 +203,4 @@ (start!) (dotimes [x n] @(d/transact conn [(put :q {:work 123})])) - nil)))) \ No newline at end of file + nil)))) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index 77339f7..1268482 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -66,9 +66,13 @@ (defn do-poll-errors [{:keys [conn system-error on-system-error - on-system-recovery] - :or {on-system-error (fn [] nil) - on-system-recovery (fn [] nil)} + on-system-recovery + healthy?] + :or {on-system-error (fn [] + (log/error "There are yoltq queues which have errors") + nil) + on-system-recovery (fn [] + (log/info "Yoltq recovered"))} :as config}] (assert (some? conn) "expected :conn to be present") (assert (some? system-error) "expected :system-error to be present") @@ -79,8 +83,11 @@ (d/db conn) u/status-error) 0)] - (when (pos-int? error-count) - (log/debug "poll-errors found" error-count "errors in system")) + (if (pos-int? error-count) + (do + (log/debug "poll-errors found" error-count "errors in system") + (reset! healthy? false)) + (reset! healthy? true)) (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] (when run-callback (cond (= run-callback :error) @@ -100,7 +107,7 @@ (when @running? (do-poll-errors @config-atom)) (catch Throwable t - (log/error t "unexpected error in poll-erros:" (ex-message t)) + (log/error t "unexpected error in poll-errors:" (ex-message t)) nil))) diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj index 2e0873e..18f0aa7 100644 --- a/test/com/github/ivarref/yoltq/error_poller_test.clj +++ b/test/com/github/ivarref/yoltq/error_poller_test.clj @@ -1,5 +1,5 @@ (ns com.github.ivarref.yoltq.error-poller-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [deftest is]] [com.github.ivarref.yoltq.error-poller :as ep] [clojure.tools.logging :as log] [com.github.ivarref.yoltq.log-init :as logconfig] -- cgit v1.2.3 From 6c26a3b6871286510bb8e9770ee7f7e3abf97abe Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sun, 27 Mar 2022 18:39:44 +0200 Subject: Start use current millis in the database, not nano offset --- .gitignore | 3 +- README.md | 38 ++++++++++++++-------- deps.edn | 2 +- src/com/github/ivarref/yoltq.clj | 8 ++--- src/com/github/ivarref/yoltq/error_poller.clj | 10 +++--- src/com/github/ivarref/yoltq/ext_sys.clj | 13 ++++---- src/com/github/ivarref/yoltq/impl.clj | 14 ++++---- src/com/github/ivarref/yoltq/poller.clj | 19 +++++++---- .../ivarref/yoltq/slow_executor_detector.clj | 4 +-- src/com/github/ivarref/yoltq/test_queue.clj | 2 +- src/com/github/ivarref/yoltq/utils.clj | 19 ++++++----- test/com/github/ivarref/yoltq/test_utils.clj | 9 ++--- 12 files changed, 81 insertions(+), 60 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/.gitignore b/.gitignore index cb9a7ca..c82fdd7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ tree.txt .stage-url.txt *.pom.asc *.pom -temp/ \ No newline at end of file +temp/ +.clj-kondo/ diff --git a/README.md b/README.md index 45ba8c4..9c5669c 100644 --- a/README.md +++ b/README.md @@ -333,12 +333,21 @@ easier. ## Change log -### 2022-03-27 [v0.2.41](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) +### 20..-..-.. vHEAD [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...HEAD) +* Critical bugfix. +``` +Started using (System/currentTimeMillis) and not (System/nanoTime) +when storing time in the database. +``` + +* Bump Clojure to `1.11.0`. + +### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) * Added function `healthy?` that returns: ``` - true if no errors - false if one or more errors - nil if error-poller is yet to be executed. + true if no errors + false if one or more errors + nil if error-poller is yet to be executed. ``` * Added default functions for `:on-system-error` and `:on-system-recovery` @@ -348,22 +357,23 @@ easier. * Added function `queue-stats` that returns a nicely "formatted" vector of queue stats, for example: ``` -(queue-stats) -=> -[{:qname :add-message-thread, :status :done, :count 10274} - {:qname :add-message-thread, :status :init, :count 30} - {:qname :add-message-thread, :status :processing, :count 1} - {:qname :send-message, :status :done, :count 21106} - {:qname :send-message, :status :init, :count 56}] + (queue-stats) + => + [{:qname :add-message-thread, :status :done, :count 10274} + {:qname :add-message-thread, :status :init, :count 30} + {:qname :add-message-thread, :status :processing, :count 1} + {:qname :send-message, :status :done, :count 21106} + {:qname :send-message, :status :init, :count 56}] ``` -### 2021-09-27 [v0.2.39](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) +### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) Added `:valid-payload?` option for queue consumers. -### 2021-09-27 [v0.2.37](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) +### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) Improved error reporting. -### 2021-09-24 v0.2.33: First publicly announced release. +### 2021-09-24 v0.2.33 +First publicly announced release. ## License diff --git a/deps.edn b/deps.edn index d0f0a26..8e769e1 100644 --- a/deps.edn +++ b/deps.edn @@ -1,5 +1,5 @@ {:deps {org.clojure/tools.logging {:mvn/version "1.1.0"} - org.clojure/clojure {:mvn/version "1.10.3"}} + org.clojure/clojure {:mvn/version "1.11.0"}} :paths ["src"] diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 03a364f..17aa40a 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -19,7 +19,7 @@ (def default-opts (-> {; Default number of times a queue job will be retried before giving up - ; Can be overridden on a per consumer basis with + ; Can be overridden on a per-consumer basis with ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200}) :max-retries 100 @@ -34,7 +34,7 @@ :hung-backoff-time (Duration/ofMinutes 30) ; Most queue jobs in init state will be consumed by the tx-report-queue listener. - ; However in the case where a init job was added right before the application + ; However, in the case where an init job was added right before the application ; was shut down and did not have time to be processed by the tx-report-queue listener, ; it will be consumer by the init poller. This init poller backs off by ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could @@ -66,7 +66,7 @@ ; How often should the system invoke :system-error-callback-backoff (Duration/ofHours 1)} - u/duration->nanos)) + u/duration->millis)) (defn init! [{:keys [conn] :as cfg}] @@ -83,7 +83,7 @@ default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) cfg) - u/duration->nanos)))] + u/duration->millis)))] new-cfg))) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index 1268482..ee6359e 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -22,7 +22,7 @@ state :recovery}} {:keys [system-error-min-count system-error-callback-backoff] :or {system-error-min-count 3}} - now-ns + now-ms error-count] (let [new-errors (->> (conj errors error-count) (take-last system-error-min-count) @@ -50,14 +50,14 @@ (when (and (= old-state :recovery) (= new-state :error)) {:run-callback :error - :last-notify now-ns}) + :last-notify now-ms}) (when (and (= new-state :error) (= old-state :error) - (> now-ns + (> now-ms (+ last-notify system-error-callback-backoff))) {:run-callback :error - :last-notify now-ns}) + :last-notify now-ms}) (when (and (= new-state :recovery) (= old-state :error)) @@ -88,7 +88,7 @@ (log/debug "poll-errors found" error-count "errors in system") (reset! healthy? false)) (reset! healthy? true)) - (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)] (when run-callback (cond (= run-callback :error) (on-system-error) diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj index 3480475..692b934 100644 --- a/src/com/github/ivarref/yoltq/ext_sys.clj +++ b/src/com/github/ivarref/yoltq/ext_sys.clj @@ -1,17 +1,18 @@ (ns com.github.ivarref.yoltq.ext-sys (:require [datomic.api :as d]) + (:refer-clojure :exclude [random-uuid]) (:import (java.util UUID))) -(def ^:dynamic *now-ns-atom* nil) +(def ^:dynamic *now-ms-atom* nil) (def ^:dynamic *squuid-atom* nil) (def ^:dynamic *random-atom* nil) -(defn now-ns [] - (if *now-ns-atom* - @*now-ns-atom* - (System/nanoTime))) +(defn now-ms [] + (if *now-ms-atom* + @*now-ms-atom* + (System/currentTimeMillis))) (defn squuid [] @@ -23,4 +24,4 @@ (defn random-uuid [] (if *random-atom* (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc)))) - (UUID/randomUUID))) \ No newline at end of file + (UUID/randomUUID))) diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 8b75fc3..b4eef8d 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -63,7 +63,7 @@ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ns)} + :com.github.ivarref.yoltq/init-time (u/now-ms)} (when-let [[q ext-id] (:depends-on opts)] (when-not (d/q '[:find ?e . :in $ ?ext-id @@ -138,8 +138,8 @@ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status] (if (= new-status u/status-done) - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)} - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)} + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})] start-time (System/nanoTime) {:keys [db-after]} @(d/transact conn tx)] (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) @@ -171,7 +171,7 @@ (log/debug "queue item" (str id) "for queue" queue-name "is now processing") (let [{:keys [retval exception]} (try - (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name]) + (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) (let [v (f payload)] {:retval v}) (catch Throwable t @@ -188,7 +188,7 @@ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :retval retval :success? true :allow-cas-failure? true))) (some? exception) @@ -198,14 +198,14 @@ (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :exception exception))) :else (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :retval retval :success? true)))))) (do (log/error "no handler for queue" queue-name) diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj index 28b158f..9cf81c7 100644 --- a/src/com/github/ivarref/yoltq/poller.clj +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -25,17 +25,16 @@ (if-not (contains? old q) (try (log/debug "polling queue" queue-name "for status" status) - (let [start-time (u/now-ns) + (let [start-time (u/now-ms) last-res (loop [prev-res nil] (when @running? (let [res (poll-once! cfg queue-name status)] + (log/debug "poll-once! returned" res) (if (and res (:success? res)) (recur res) prev-res))))] - (let [spent-ns (- (u/now-ns) start-time)] - (log/trace "done polling queue" q "in" - (format "%.1f" (double (/ spent-ns 1e6))) - "ms")) + (let [spent-ms (- (u/now-ms) start-time)] + (log/trace "done polling queue" q "in" spent-ms "ms")) last-res) (finally (swap! running-queues disj q))) @@ -44,6 +43,14 @@ (log/error t "poll-queue! crashed:" (ex-message t))) (finally))) +(comment + (def cfg @com.github.ivarref.yoltq/*config*)) + +(comment + (poll-queue! + (atom true) + @com.github.ivarref.yoltq/*config* + [:add-message-thread :init])) (defn poll-all-queues! [running? config-atom pool] (try @@ -54,4 +61,4 @@ [q-name status])))] (.execute pool (fn [] (poll-queue! running? @config-atom q)))))) (catch Throwable t - (log/error t "poll-all-queues! crashed:" (ex-message t))))) \ No newline at end of file + (log/error t "poll-all-queues! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj index f15ef7d..80d3718 100644 --- a/src/com/github/ivarref/yoltq/slow_executor_detector.clj +++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj @@ -7,7 +7,7 @@ (defn- do-show-slow-threads [{:keys [start-execute-time max-execute-time]}] (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time] - (when (> (ext/now-ns) (+ start-time max-execute-time)) + (when (> (ext/now-ms) (+ start-time max-execute-time)) (log/error "thread" (.getName thread) "spent too much time on" "queue item" (str queue-id) "for queue" queue-name @@ -25,4 +25,4 @@ (dotimes [_ 3] (when @running? (Thread/sleep 1000)))) (catch Throwable t - (log/error t "reap! crashed:" (ex-message t))))) \ No newline at end of file + (log/error t "reap! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index 6183216..ee9cd54 100644 --- a/src/com/github/ivarref/yoltq/test_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -47,7 +47,7 @@ (with-bindings {#'yq/*config* config# #'yq/*running?* (atom false) #'yq/*test-mode* true - #'ext/*now-ns-atom* (atom 0) + #'ext/*now-ms-atom* (atom 0) #'ext/*random-atom* (atom 0) #'ext/*squuid-atom* (atom 0)} (try diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index d551510..ad2444a 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -3,6 +3,7 @@ [clojure.edn :as edn] [com.github.ivarref.yoltq.ext-sys :as ext] [clojure.tools.logging :as log]) + (:refer-clojure :exclude [random-uuid]) (:import (datomic Connection) (java.time Duration))) @@ -13,10 +14,10 @@ (def status-error :error) -(defn duration->nanos [m] +(defn duration->millis [m] (reduce-kv (fn [o k v] (if (instance? Duration v) - (assoc o k (.toNanos v)) + (assoc o k (.toMillis v)) (assoc o k v))) {} m)) @@ -30,8 +31,8 @@ (ext/random-uuid)) -(defn now-ns [] - (ext/now-ns)) +(defn now-ms [] + (ext/now-ms)) (defn root-cause [e] @@ -75,7 +76,7 @@ :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]})) + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]})) (defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name] @@ -94,11 +95,11 @@ [?e :com.github.ivarref.yoltq/lock ?lock]] db queue-name - (- (now-ns) init-backoff-time)) + (- (now-ms) init-backoff-time)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] (prepare-processing db id queue-name old-lock :init)) - (log/trace "no new-items in :init status for queue" queue-name)))) + (log/debug "no new-items in :init status for queue" queue-name)))) (defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name] @@ -120,7 +121,7 @@ [?e :com.github.ivarref.yoltq/lock ?lock]] db queue-name - (- (now-ns) error-backoff-time) + (- (now-ms) error-backoff-time) (inc max-retries)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] @@ -131,7 +132,7 @@ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) - (let [now (or now (now-ns)) + (let [now (or now (now-ms)) max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries) db (or db (d/db conn))] (when-let [ids (->> (d/q '[:find ?id ?lock ?tries diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index 5427ff5..e4151c2 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -8,7 +8,8 @@ [com.github.ivarref.yoltq.impl :as i] [clojure.edn :as edn] [com.github.ivarref.yoltq.ext-sys :as ext]) - (:import (java.util UUID))) + (:import (java.util UUID) + (java.time Duration))) (logconfig/init-logging! @@ -39,10 +40,10 @@ (defn advance! [tp] - (assert (some? ext/*now-ns-atom*) "Expected to be running in test-mode!") - (swap! ext/*now-ns-atom* + (if (number? tp) + (assert (some? ext/*now-ms-atom*) "Expected to be running in test-mode!") + (swap! ext/*now-ms-atom* + (if (number? tp) tp - (.toNanos tp)))) + (.toMillis ^Duration tp)))) (defn done-count [] -- cgit v1.2.3 From 41c9e08d63176cf7c239574d1d07f2b302a2d3ec Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sun, 27 Mar 2022 21:33:00 +0200 Subject: Fix use current millis in the database, not nano offset --- src/com/github/ivarref/yoltq.clj | 11 ++- src/com/github/ivarref/yoltq/impl.clj | 6 +- src/com/github/ivarref/yoltq/migrate.clj | 58 ++++++++++++++++ test/com/github/ivarref/yoltq/migrate_test.clj | 92 ++++++++++++++++++++++++++ test/com/github/ivarref/yoltq/test_utils.clj | 7 +- test/com/github/ivarref/yoltq/virtual_test.clj | 14 +++- 6 files changed, 180 insertions(+), 8 deletions(-) create mode 100644 src/com/github/ivarref/yoltq/migrate.clj create mode 100644 test/com/github/ivarref/yoltq/migrate_test.clj (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 17aa40a..1a60a45 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -6,6 +6,7 @@ [com.github.ivarref.yoltq.poller :as poller] [com.github.ivarref.yoltq.error-poller :as errpoller] [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] + [com.github.ivarref.yoltq.migrate :as migrate] [com.github.ivarref.yoltq.utils :as u]) (:import (datomic Connection) (java.util.concurrent Executors TimeUnit ExecutorService) @@ -64,7 +65,11 @@ :system-error-poll-delay (Duration/ofMinutes 1) ; How often should the system invoke - :system-error-callback-backoff (Duration/ofHours 1)} + :system-error-callback-backoff (Duration/ofHours 1) + + ; Should old, possibly stalled jobs be automatically be migrated + ; as part of `start!`? + :auto-migrate? true} u/duration->millis)) @@ -104,7 +109,9 @@ (defn- do-start! [] - (let [{:keys [poll-delay pool-size system-error-poll-delay]} @*config*] + (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate?] :as cfg} @*config*] + (when auto-migrate? + (migrate/migrate! cfg)) (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size))) (let [pool @threadpool queue-listener-ready (promise)] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index b4eef8d..6b14ffc 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -20,7 +20,8 @@ #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long} #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long} #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long} - #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) + #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}]) (defn pr-str-safe [what x] @@ -63,7 +64,8 @@ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ms)} + :com.github.ivarref.yoltq/init-time (u/now-ms) + :com.github.ivarref.yoltq/version "2"} (when-let [[q ext-id] (:depends-on opts)] (when-not (d/q '[:find ?e . :in $ ?ext-id diff --git a/src/com/github/ivarref/yoltq/migrate.clj b/src/com/github/ivarref/yoltq/migrate.clj new file mode 100644 index 0000000..89fc286 --- /dev/null +++ b/src/com/github/ivarref/yoltq/migrate.clj @@ -0,0 +1,58 @@ +(ns com.github.ivarref.yoltq.migrate + (:require [datomic.api :as d] + [clojure.tools.logging :as log])) + +(defn to->v2-ent [{:keys [conn]} now-ms id] + (log/info "Migrating id" id) + (let [attr-val (fn [attr] + (when-let [old (d/q '[:find ?time . + :in $ ?e ?a + :where + [?e ?a ?time]] + (d/db conn) + [:com.github.ivarref.yoltq/id id] + attr)] + (let [now-ms (or now-ms + (.getTime (d/q '[:find (max ?txinst) . + :in $ ?e ?a + :where + [?e ?a _ ?tx true] + [?tx :db/txInstant ?txinst]] + (d/history (d/db conn)) + [:com.github.ivarref.yoltq/id id] + attr)))] + (log/info "Updating" id attr "to" now-ms) + [[:db/cas [:com.github.ivarref.yoltq/id id] + attr old now-ms]])))] + (vec (concat [[:db/cas [:com.github.ivarref.yoltq/id id] + :com.github.ivarref.yoltq/version nil "2"]] + (mapcat attr-val [:com.github.ivarref.yoltq/init-time + :com.github.ivarref.yoltq/processing-time + :com.github.ivarref.yoltq/done-time + :com.github.ivarref.yoltq/error-time]))))) + +(defn to->v2 [{:keys [conn loop? now-ms] + :or {loop? true} + :as cfg}] + (loop [tx-vec []] + (if-let [id (some->> (d/q '[:find [?id ...] + :in $ + :where + [?e :com.github.ivarref.yoltq/id ?id] + [(missing? $ ?e :com.github.ivarref.yoltq/version)]] + (d/db conn)) + (sort) + (not-empty) + (first))] + (let [tx (to->v2-ent cfg now-ms id)] + @(d/transact conn tx) + (if loop? + (recur (conj tx-vec tx)) + tx)) + (do + (log/info "No items left to migrate") + tx-vec)))) + + +(defn migrate! [cfg] + (to->v2 cfg)) diff --git a/test/com/github/ivarref/yoltq/migrate_test.clj b/test/com/github/ivarref/yoltq/migrate_test.clj new file mode 100644 index 0000000..0063631 --- /dev/null +++ b/test/com/github/ivarref/yoltq/migrate_test.clj @@ -0,0 +1,92 @@ +(ns com.github.ivarref.yoltq.migrate-test + (:require [clojure.test :refer [deftest is]] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.migrate :as m] + [com.github.ivarref.yoltq.impl :as impl] + [com.github.ivarref.yoltq.test-utils :as tu] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d])) + + +(deftest to-v2-migration + (with-bindings {#'ext/*squuid-atom* (atom 0)} + (let [conn (tu/empty-conn)] + @(d/transact conn impl/schema) + @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid) + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-processing + :com.github.ivarref.yoltq/init-time 1 + :com.github.ivarref.yoltq/processing-time 2}]) + @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid) + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/init-time 3}]) + (is (= [[[:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/version + nil + "2"] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/init-time + 1 + 1000] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/processing-time + 2 + 1000]] + [[:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000002"] + :com.github.ivarref.yoltq/version + nil + "2"] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000002"] + :com.github.ivarref.yoltq/init-time + 3 + 1000]]] + (m/migrate! {:conn conn + :now-ms 1000 + :loop? true}))) + (is (= [] + (m/migrate! {:conn conn + :now-ms 1000 + :loop? true})))))) + + +(deftest to-v2-migration-real-time + (with-bindings {#'ext/*squuid-atom* (atom 0)} + (let [conn (tu/empty-conn) + id (u/squuid)] + @(d/transact conn impl/schema) + @(d/transact conn [{:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/init-time 1}]) + (Thread/sleep 100) + @(d/transact conn [{:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/init-time 2}]) + (let [tx-times (->> (d/q '[:find [?txinst ...] + :in $ ?e + :where + [?e :com.github.ivarref.yoltq/init-time _ ?tx true] + [?tx :db/txInstant ?txinst]] + (d/history (d/db conn)) + [:com.github.ivarref.yoltq/id id]) + (sort) + (vec))] + (is (= 2 (count tx-times))) + (m/migrate! {:conn conn}) + (is (= (.getTime (last tx-times)) + (d/q '[:find ?init-time . + :in $ ?e + :where + [?e :com.github.ivarref.yoltq/init-time ?init-time]] + (d/db conn) + [:com.github.ivarref.yoltq/id id]))))))) diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index e4151c2..0c1b2f0 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -7,7 +7,8 @@ [clojure.string :as str] [com.github.ivarref.yoltq.impl :as i] [clojure.edn :as edn] - [com.github.ivarref.yoltq.ext-sys :as ext]) + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.pprint :as pp]) (:import (java.util UUID) (java.time Duration))) @@ -54,6 +55,10 @@ (d/db (:conn @yq/*config*)))) +(defn pp [x] + (pp/pprint x) + x) + (defn get-init [& args] (apply u/get-init @yq/*config* args)) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index acd3eb7..34c9026 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -1,6 +1,6 @@ (ns com.github.ivarref.yoltq.virtual-test (:require [datomic-schema.core] - [clojure.test :refer :all] + [clojure.test :refer [use-fixtures deftest is] :refer-macros [thrown?]] [com.github.ivarref.yoltq.test-queue :as tq] [com.github.ivarref.yoltq.test-utils :as u] [datomic.api :as d] @@ -8,7 +8,8 @@ [clojure.tools.logging :as log] [com.github.ivarref.yoltq.impl :as i] [com.github.ivarref.yoltq :as yq] - [taoensso.timbre :as timbre])) + [taoensso.timbre :as timbre] + [com.github.ivarref.yoltq.migrate :as migrate])) (use-fixtures :each tq/call-with-virtual-queue!) @@ -21,6 +22,13 @@ @(d/transact conn [(yq/put :q {:work 123})]) (is (= {:work 123} (tq/consume! :q))))) +(deftest happy-case-no-migration-for-new-entities + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))) + (is (= [] (migrate/migrate! @yq/*config*))))) (deftest happy-case-tx-report-q (let [conn (u/empty-conn)] @@ -341,4 +349,4 @@ (some? id))}) @(d/transact conn [(yq/put :q {:id "a"})]) (timbre/with-level :fatal - (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) \ No newline at end of file + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) -- cgit v1.2.3 From 7cf016c691fc08c81138fc592a7657087151c3ca Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Wed, 22 Jun 2022 10:26:16 +0200 Subject: Release 0.2.56 Fix line break issue? Added support for `:yoltq/queue-id` metadata on functions --- README.md | 37 +++++++++++++++++++------- deps.edn | 8 +++--- pom.xml | 8 +++--- release.sh | 6 +++-- src/com/github/ivarref/yoltq.clj | 32 +++++++++++++++------- test/com/github/ivarref/yoltq/virtual_test.clj | 33 +++++++++++++++++------ 6 files changed, 87 insertions(+), 37 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/README.md b/README.md index e5b2059..8ead585 100644 --- a/README.md +++ b/README.md @@ -333,22 +333,41 @@ easier. ## Change log -### 2022-03-29 v0.2.55 [diff](https://github.com/ivarref/yoltq/compare/v0.2.54...v0.2.55) +#### 2022-06-22 v0.2.56 [diff](https://github.com/ivarref/yoltq/compare/v0.2.55...v0.2.56) +Added support for `:yoltq/queue-id` metadata on functions. I.e. it's possible to write +the following: +```clojure +(defn my-consumer + {:yoltq/queue-id :some-queue} + [payload] + :work-work-work) + +(yq/add-consumer! #'my-consumer ; <-- will resolve to :some-queue + my-consumer) + +@(d/transact conn [(yq/put #'my-consumer ; <-- will resolve to :some-queue + {:id "a"})]) +``` + +The idea here is that it is simpler to jump to var definitions than going via keywords, +which essentially refers to a var/function anyway. + +#### 2022-03-29 v0.2.55 [diff](https://github.com/ivarref/yoltq/compare/v0.2.54...v0.2.55) Added: `unhealthy?` function which returns `true` if there are queues in error, or `false` otherwise. -### 2022-03-28 v0.2.54 [diff](https://github.com/ivarref/yoltq/compare/v0.2.51...v0.2.54) +#### 2022-03-28 v0.2.54 [diff](https://github.com/ivarref/yoltq/compare/v0.2.51...v0.2.54) Fixed: Schedules should now be using milliseconds and not nanoseconds. -### 2022-03-28 v0.2.51 [diff](https://github.com/ivarref/yoltq/compare/v0.2.48...v0.2.51) +#### 2022-03-28 v0.2.51 [diff](https://github.com/ivarref/yoltq/compare/v0.2.48...v0.2.51) * Don't OOM on migrating large amounts of data. * Respect `:auto-migrate? false`. -### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48) +#### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48) * Auto migration is done in the background. * Only poll for current version of jobs, thus no races for auto migration. -### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46) +#### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46) * Critical bugfix that in some cases can lead to stalled jobs. ``` Started using (System/currentTimeMillis) and not (System/nanoTime) @@ -357,7 +376,7 @@ when storing time in the database. * Bump Clojure to `1.11.0`. -### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) +#### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) * Added function `healthy?` that returns: ``` true if no errors @@ -381,13 +400,13 @@ when storing time in the database. {:qname :send-message, :status :init, :count 56}] ``` -### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) +#### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) Added `:valid-payload?` option for queue consumers. -### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) +#### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) Improved error reporting. -### 2021-09-24 v0.2.33 +#### 2021-09-24 v0.2.33 First publicly announced release. ## License diff --git a/deps.edn b/deps.edn index 8e769e1..6923881 100644 --- a/deps.edn +++ b/deps.edn @@ -1,12 +1,12 @@ -{:deps {org.clojure/tools.logging {:mvn/version "1.1.0"} - org.clojure/clojure {:mvn/version "1.11.0"}} +{:deps {org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"}} :paths ["src"] - :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} + :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} :test {:extra-paths ["test"] :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.1.2"} + com.taoensso/timbre {:mvn/version "5.2.1"} com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} diff --git a/pom.xml b/pom.xml index 9f591b9..c45ccd9 100644 --- a/pom.xml +++ b/pom.xml @@ -4,18 +4,18 @@ jar com.github.ivarref yoltq - 0.2.55 + 0.2.56 yoltq org.clojure clojure - 1.11.0 + 1.11.1 org.clojure tools.logging - 1.1.0 + 1.2.4 @@ -30,7 +30,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.55 + v0.2.56 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/release.sh b/release.sh index cf0f09f..d27d125 100755 --- a/release.sh +++ b/release.sh @@ -23,9 +23,11 @@ sed -i "s/HEAD/v$VERSION/g" ./README.md git add pom.xml README.md git commit -m "Release $VERSION" git reset --soft HEAD~2 -git commit -m"Release $VERSION\n$MSG" +git commit -m"Release $VERSION +$MSG" -git tag -a v"$VERSION" -m "Release v$VERSION\n$MSG" +git tag -a v"$VERSION" -m "Release v$VERSION +$MSG" git push --follow-tags --force clojure -X:deploy diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index bb7a43e..ba27d2c 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -1,16 +1,16 @@ (ns com.github.ivarref.yoltq - (:require [datomic.api :as d] - [clojure.tools.logging :as log] + (:require [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.error-poller :as errpoller] [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.migrate :as migrate] [com.github.ivarref.yoltq.poller :as poller] - [com.github.ivarref.yoltq.error-poller :as errpoller] + [com.github.ivarref.yoltq.report-queue :as rq] [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] - [com.github.ivarref.yoltq.migrate :as migrate] - [com.github.ivarref.yoltq.utils :as u]) + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) (:import (datomic Connection) - (java.util.concurrent Executors TimeUnit ExecutorService) - (java.time Duration))) + (java.time Duration) + (java.util.concurrent ExecutorService Executors TimeUnit))) (defonce ^:dynamic *config* (atom nil)) @@ -92,11 +92,23 @@ new-cfg))) +(defn get-queue-id + [queue-id-or-var] + (cond (and (var? queue-id-or-var) + (keyword? (:yoltq/queue-id (meta queue-id-or-var)))) + (:yoltq/queue-id (meta queue-id-or-var)) + + (keyword? queue-id-or-var) + queue-id-or-var + + :else + (throw (ex-info (str "Could not get queue-id for " queue-id-or-var) {:queue-id queue-id-or-var})))) + (defn add-consumer! ([queue-id f] (add-consumer! queue-id f {})) ([queue-id f opts] - (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f})))))) + (swap! *config* (fn [old-config] (assoc-in old-config [:handlers (get-queue-id queue-id)] (merge opts {:f f})))))) (defn put @@ -105,7 +117,7 @@ (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] (when (and *test-mode* bootstrap-poller!) (bootstrap-poller! conn)) - (i/put cfg queue-id payload opts)))) + (i/put cfg (get-queue-id queue-id) payload opts)))) (defn- do-start! [] diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 34c9026..e077517 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -1,15 +1,15 @@ (ns com.github.ivarref.yoltq.virtual-test - (:require [datomic-schema.core] - [clojure.test :refer [use-fixtures deftest is] :refer-macros [thrown?]] + (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] [com.github.ivarref.yoltq.test-queue :as tq] [com.github.ivarref.yoltq.test-utils :as u] - [datomic.api :as d] [com.github.ivarref.yoltq.utils :as uu] - [clojure.tools.logging :as log] - [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq :as yq] - [taoensso.timbre :as timbre] - [com.github.ivarref.yoltq.migrate :as migrate])) + [datomic-schema.core] + [datomic.api :as d] + [taoensso.timbre :as timbre])) (use-fixtures :each tq/call-with-virtual-queue!) @@ -350,3 +350,20 @@ @(d/transact conn [(yq/put :q {:id "a"})]) (timbre/with-level :fatal (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + + +(defn my-consumer + {:yoltq/queue-id :some-q} + [state payload] + (swap! state conj payload)) + +(deftest queue-id-can-be-var + (let [conn (u/empty-conn) + received (atom #{})] + (yq/init! {:conn conn}) + (yq/add-consumer! #'my-consumer (partial my-consumer received)) + @(d/transact conn [(yq/put #'my-consumer {:id "a"})]) + (tq/consume! :some-q) + (is (= #{{:id "a"}} @received)) + #_(timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) -- cgit v1.2.3 From 812a07b3b9f2d212f80499433b638fb5b4a78f70 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 18 Aug 2022 13:00:02 +0200 Subject: Release 0.2.60 Warn about not setting connection/socket-timeout when using clj-http https://github.com/ivarref/yoltq/issues/2 Add :healthy-allowed-error-time configuration option, default is 15 minutes --- README.md | 28 +++++++++++++++-- pom.xml | 4 +-- src/com/github/ivarref/yoltq.clj | 14 ++++++--- src/com/github/ivarref/yoltq/error_poller.clj | 36 +++++++++++++--------- .../com/github/ivarref/yoltq/error_poller_test.clj | 8 ++--- test/com/github/ivarref/yoltq/virtual_test.clj | 15 ++++++++- 6 files changed, 77 insertions(+), 28 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/README.md b/README.md index ade8650..05e7033 100644 --- a/README.md +++ b/README.md @@ -62,18 +62,25 @@ Imagine the following code: ```clojure (defn post-handler [user-input] (let [db-item (process user-input) - ext-ref (clj-http.client/post ext-service {...})] ; may throw exception + ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds + :socket-timeout 10000 ; timeout in milliseconds + ...})] ; may throw exception @(d/transact conn [(assoc db-item :some/ext-ref ext-ref)]))) ``` What if the POST request fails? Should it be retried? For how long? Should it be allowed to fail? How do you then process failures later? +PS: If you do not set connection/socket-timeout, there is a chance that +clj-http/client will wait for all eternity in the case of a dropped TCP connection. + The queue way to solve this would be: ```clojure (defn get-ext-ref [{:keys [id]}] - (let [ext-ref (clj-http.client/post ext-service {...})] ; may throw exception + (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds + :socket-timeout 10000 ; timeout in milliseconds + ...})] ; may throw exception @(d/transact conn [[:db/cas [:some/id id] :some/ext-ref nil @@ -82,7 +89,7 @@ The queue way to solve this would be: (yq/add-consumer! :get-ext-ref get-ext-ref {:allow-cas-failure? true}) (defn post-handler [user-input] - (let [{:some/keys [id] :as db-item} (process user-input) + (let [{:some/keys [id] :as db-item} (process user-input)] @(d/transact conn [db-item (yq/put :get-ext-ref {:id id})]))) ``` @@ -371,6 +378,21 @@ Note: I have not tried these libraries myself. ## Change log +#### 2022-08-18 v0.2.60 [diff](https://github.com/ivarref/yoltq/compare/v0.2.59...v0.2.60) +Improved: Added config option `:healthy-allowed-error-time`: +``` + ; If you are dealing with a flaky downstream service, you may not want + ; yoltq to mark itself as unhealthy on the first failure encounter with + ; the downstream service. Change this setting to let yoltq mark itself + ; as healthy even though a queue item has been failing for some time. + :healthy-allowed-error-time (Duration/ofMinutes 15) +``` + +#### 2022-08-15 v0.2.59 [diff](https://github.com/ivarref/yoltq/compare/v0.2.58...v0.2.59) +Fixed: +* Race condition that made the following possible: `stop!` would terminate the slow thread +watcher, and a stuck thread could keep `stop!` from completing! + #### 2022-06-30 v0.2.58 [diff](https://github.com/ivarref/yoltq/compare/v0.2.57...v0.2.58) Slightly more safe EDN printing and parsing. Recommended reading: diff --git a/pom.xml b/pom.xml index 187b8ad..719b0e7 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ jar com.github.ivarref yoltq - 0.2.59 + 0.2.60 yoltq @@ -30,7 +30,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.59 + v0.2.60 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 32693c3..89112a6 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -44,6 +44,12 @@ ; otherwise occur if competing with the tx-report-queue listener. :init-backoff-time (Duration/ofSeconds 60) + ; If you are dealing with a flaky downstream service, you may not want + ; yoltq to mark itself as unhealthy on the first failure encounter with + ; the downstream service. Change this setting to let yoltq mark itself + ; as healthy even though a queue item has been failing for some time. + :healthy-allowed-error-time (Duration/ofMinutes 15) + ; How frequent polling for init, error and hung jobs should be done. :poll-delay (Duration/ofSeconds 10) @@ -259,10 +265,10 @@ (let [conn (d/connect uri) started-consuming? (promise) n 1] - (init! {:conn conn - :error-backoff-time (Duration/ofSeconds 1) - :poll-delay (Duration/ofSeconds 1) - :max-execute-time (Duration/ofSeconds 3) + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 1) + :poll-delay (Duration/ofSeconds 1) + :max-execute-time (Duration/ofSeconds 3) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index ee6359e..dffff28 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -1,8 +1,8 @@ (ns com.github.ivarref.yoltq.error-poller - (:require [datomic.api :as d] - [com.github.ivarref.yoltq.utils :as u] + (:require [clojure.tools.logging :as log] [com.github.ivarref.yoltq.ext-sys :as ext] - [clojure.tools.logging :as log])) + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d])) (defn get-state [v] @@ -64,31 +64,39 @@ {:run-callback :recovery})))))) -(defn do-poll-errors [{:keys [conn system-error +(defn do-poll-errors [{:keys [conn + system-error on-system-error on-system-recovery - healthy?] + healthy? + healthy-allowed-error-time] :or {on-system-error (fn [] (log/error "There are yoltq queues which have errors") nil) on-system-recovery (fn [] (log/info "Yoltq recovered"))} - :as config}] + :as config} + now-ms] (assert (some? conn) "expected :conn to be present") (assert (some? system-error) "expected :system-error to be present") - (let [error-count (or (d/q '[:find (count ?e) . - :in $ ?status + (assert (nat-int? healthy-allowed-error-time) "expected :healthy-allowed-error-time to be present") + (let [max-init-time (- now-ms healthy-allowed-error-time) + error-count (or (d/q '[:find (count ?e) . + :in $ ?status ?max-init-time :where - [?e :com.github.ivarref.yoltq/status ?status]] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(<= ?init-time ?max-init-time)]] (d/db conn) - u/status-error) + u/status-error + max-init-time) 0)] (if (pos-int? error-count) (do (log/debug "poll-errors found" error-count "errors in system") (reset! healthy? false)) (reset! healthy? true)) - (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)] + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config now-ms error-count)] (when run-callback (cond (= run-callback :error) (on-system-error) @@ -99,18 +107,18 @@ :else (log/error "unhandled callback-type" run-callback)) (log/debug "run-callback is" run-callback)) - new-state))) + error-count))) (defn poll-errors [running? config-atom] (try (when @running? - (do-poll-errors @config-atom)) + (do-poll-errors @config-atom (ext/now-ms))) (catch Throwable t (log/error t "unexpected error in poll-errors:" (ex-message t)) nil))) (comment - (do-poll-errors @com.github.ivarref.yoltq/*config*)) + (do-poll-errors @com.github.ivarref.yoltq/*config* (ext/now-ms))) diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj index 18f0aa7..4d92b81 100644 --- a/test/com/github/ivarref/yoltq/error_poller_test.clj +++ b/test/com/github/ivarref/yoltq/error_poller_test.clj @@ -1,9 +1,9 @@ (ns com.github.ivarref.yoltq.error-poller-test - (:require [clojure.test :refer [deftest is]] - [com.github.ivarref.yoltq.error-poller :as ep] + (:require [clojure.edn :as edn] + [clojure.test :refer [deftest is]] [clojure.tools.logging :as log] - [com.github.ivarref.yoltq.log-init :as logconfig] - [clojure.edn :as edn])) + [com.github.ivarref.yoltq.error-poller :as ep] + [com.github.ivarref.yoltq.log-init :as logconfig])) (deftest error-poller diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index e077517..996792e 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -2,6 +2,8 @@ (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] [clojure.tools.logging :as log] [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.error-poller :as error-poller] + [com.github.ivarref.yoltq.ext-sys :as ext] [com.github.ivarref.yoltq.impl :as i] [com.github.ivarref.yoltq.migrate :as migrate] [com.github.ivarref.yoltq.test-queue :as tq] @@ -9,7 +11,8 @@ [com.github.ivarref.yoltq.utils :as uu] [datomic-schema.core] [datomic.api :as d] - [taoensso.timbre :as timbre])) + [taoensso.timbre :as timbre]) + (:import (java.time Duration))) (use-fixtures :each tq/call-with-virtual-queue!) @@ -367,3 +370,13 @@ (is (= #{{:id "a"}} @received)) #_(timbre/with-level :fatal (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + +(deftest healthy-allowed-error-time-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (throw (ex-info "" {})))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume-expect! :q :error) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms)))) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms))))) + (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms))))))) -- cgit v1.2.3 From 8f945d8c0189ad73d862c988faa511e0a7b017df Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 18 Nov 2022 14:12:50 +0100 Subject: Release 0.2.63: Add support for :encode and :decode function. Add :partition-fn. Fixes #1 --- README.md | 65 ++++++++++++++++- deps.edn | 6 +- pom.xml | 9 ++- src/com/github/ivarref/yoltq/impl.clj | 95 +++++++++++++++---------- src/com/github/ivarref/yoltq/utils.clj | 1 - test/com/github/ivarref/yoltq/virtual_test.clj | 98 ++++++++++++++++++++++---- 6 files changed, 218 insertions(+), 56 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/README.md b/README.md index 63b9ad3..a914cc7 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,8 @@ Inspecting `(yq/put :q {:work 123})]` you will see something like this: This is the queue job as it will be stored into the database. You can see that the payload, i.e. the second argument of `yq/put`, -is persisted into the database. Thus the payload must be `pr-str`-able. +is persisted into the database. Thus the payload must be `pr-str`-able (unless you have specified +custom `:encode` and `:decode` functions that override this). A queue job will initially have status `:init`. @@ -220,6 +221,47 @@ is shut down abruptly during processing of queue jobs. A queue job will remain in status `:error` once `:max-retries` (default: 100) have been reached. Ideally this will not happen. ¯\\\_(ツ)\_/¯ +### Custom encoding and decoding + +Yoltq will use `pr-str` and `clojure.edn/read-string` by default to encode and decode data. +You may specify `:encode` and `:decode` either globally or per queue to override this behaviour. +The `:encode` function must return a byte array or a string. + +For example if you want to use [nippy](https://github.com/ptaoussanis/nippy): +```clojure +(require '[taoensso.nippy :as nippy]) + +; Globally for all queues: +(yq/init! + {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + +; Or per queue: +(yq/add-consumer! + :q ; Queue to consume + (fn [payload] (println "got payload:" payload)) ; Queue consumer function + {:encode nippy/freeze + :decode nippy/thaw}) ; Queue options, here with :encode and :decode +``` + +### Partitions + +Yoltq supports specifying which [partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions) +queue entities should belong to. +The default function is: +```clojure +(defn default-partition-fn [_queue-name] + (keyword "yoltq" (str "queue_" (.getValue (java.time.Year/now))))) +``` +This is to say that there will be a single partition per year for yoltq. +Yoltq will take care of creating the partition if it does not exist. + +You may override this function, either globally or per queue, with the keyword `:partition-fn`. +E.g.: +```clojure +(yq/init! {:conn conn :partition-fn (fn [_queue-name] :my-partition)}) +``` ### All configuration options @@ -376,8 +418,29 @@ For Redis there is [carmine](https://github.com/ptaoussanis/carmine). Note: I have not tried these libraries myself. +## Other stuff + +If you liked this library, you may also like: + +* [conformity](https://github.com/avescodes/conformity): A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, data, or otherwise. +* [datomic-schema](https://github.com/ivarref/datomic-schema): Simplified writing of Datomic schemas (works with conformity). +* [double-trouble](https://github.com/ivarref/double-trouble): Handle duplicate Datomic transactions with ease. +* [gen-fn](https://github.com/ivarref/gen-fn): Generate Datomic function literals from regular Clojure namespaces. +* [rewriting-history](https://github.com/ivarref/rewriting-history): A library to rewrite Datomic history. + ## Change log +#### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63) +Added custom `:encode` and `:decode` support. + +Added support for specifying `:partifion-fn` to specify which partition a queue item should belong to. +It defaults to: +```clojure +(defn default-partition-fn [_queue-name] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) +``` +Yoltq takes care of creating the partition if it does not exist. + #### 2022-11-15 v0.2.62 [diff](https://github.com/ivarref/yoltq/compare/v0.2.61...v0.2.62) Added function `processing-time-stats`: diff --git a/deps.edn b/deps.edn index 6923881..e36885e 100644 --- a/deps.edn +++ b/deps.edn @@ -1,5 +1,6 @@ -{:deps {org.clojure/tools.logging {:mvn/version "1.2.4"} - org.clojure/clojure {:mvn/version "1.11.1"}} +{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"}} :paths ["src"] @@ -11,6 +12,7 @@ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} :jvm-opts ["-DDISABLE_SPY=true" "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] diff --git a/pom.xml b/pom.xml index 2c11984..463899d 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ jar com.github.ivarref yoltq - 0.2.62 + 0.2.63 yoltq @@ -12,6 +12,11 @@ clojure 1.11.1 + + com.github.ivarref + double-trouble + 0.1.102 + org.clojure tools.logging @@ -30,7 +35,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.62 + v0.2.63 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index c37b0e6..ac573d1 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -1,11 +1,12 @@ (ns com.github.ivarref.yoltq.impl - (:require [datomic.api :as d] - [clojure.tools.logging :as log] + (:require [clojure.edn :as edn] [clojure.string :as str] - [com.github.ivarref.yoltq.utils :as u] + [clojure.tools.logging :as log] + [com.github.ivarref.double-trouble :as dt] [com.github.ivarref.yoltq.ext-sys :as ext] - [clojure.edn :as edn])) - + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) + (:import (java.time Year))) (def schema [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} @@ -13,6 +14,7 @@ #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} @@ -41,13 +43,22 @@ (log/error "could not read-string" what ":" (ex-message e)) (throw e)))) +(defn default-partition-fn [_queue-keyword] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) -(defn put [{:keys [capture-bindings conn] :as config} +(defn put [{:keys [capture-bindings conn encode partition-fn] + :or {partition-fn default-partition-fn + encode (partial pr-str-safe :payload)} + :as config} queue-name payload opts] (if-let [q-config (get-in config [:handlers queue-name])] (let [id (u/squuid) + encode (get q-config :encode encode) + partition-fn (get q-config :partition-fn partition-fn) + partition (partition-fn queue-name) + _ (assert (keyword? partition) "Partition must be a keyword") depends-on (get q-config :depends-on (fn [_] nil)) valid-payload? (get q-config :valid-payload? (fn [_] true)) opts (merge @@ -58,32 +69,41 @@ (assoc o (symbol k) (deref k))) {} (or capture-bindings [])) - (pr-str-safe :capture-bindings))] - (when-not (valid-payload? payload) - (log/error "Payload was not valid. Payload was:" payload) - (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + (pr-str-safe :capture-bindings)) + _ (when-not (valid-payload? payload) + (log/error "Payload was not valid. Payload was:" payload) + (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + encoded (encode payload) + _ (when (not (or (bytes? encoded) (string? encoded))) + (log/error "Payload must be encoded to either a string or a byte array") + (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))] (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) - (merge - {:com.github.ivarref.yoltq/id id - :com.github.ivarref.yoltq/queue-name queue-name - :com.github.ivarref.yoltq/status u/status-init - :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload) - :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) - :com.github.ivarref.yoltq/lock (u/random-uuid) - :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ms) - :com.github.ivarref.yoltq/version "2"} - (when-let [[q ext-id] (:depends-on opts)] - (when-not (d/q '[:find ?e . - :in $ ?ext-id - :where - [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] - (d/db conn) - (pr-str-safe :depends-on [q ext-id])) - (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) - (when-let [ext-id (:id opts)] - {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))) + (do + (dt/ensure-partition! conn partition) + (merge + (if (bytes? encoded) + {:com.github.ivarref.yoltq/payload-bytes encoded} + {:com.github.ivarref.yoltq/payload encoded}) + {:db/id (d/tempid partition) + :com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/bindings str-bindings + :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ms) + :com.github.ivarref.yoltq/version "2"} + (when-let [[q ext-id] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] + (d/db conn) + (pr-str-safe :depends-on [q ext-id])) + (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) + (when-let [ext-id (:id opts)] + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) @@ -169,20 +189,23 @@ "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) -(defn execute! [{:keys [handlers mark-status-fn! start-execute-time collect-spent-time!] - :or {mark-status-fn! mark-status!} +(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!] + :or {mark-status-fn! mark-status! + decode edn/read-string} :as cfg} - {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}] + {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}] (when queue-item (if (= :error status) (assoc queue-item :failed? true) (if-let [queue (get handlers queue-name)] - (let [{:keys [f allow-cas-failure?]} queue] + (let [{:keys [f allow-cas-failure?]} queue + decode (get queue :decode decode)] (log/debug "queue item" (str id) "for queue" queue-name "is now processing") (let [{:keys [retval exception]} (try (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) - (let [v (f payload)] + (let [payload (decode (or payload payload-bytes)) + v (f payload)] {:retval v}) (catch Throwable t {:exception t}) diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index 39572a9..7665b6d 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -57,7 +57,6 @@ (defn get-queue-item [db id] (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) (dissoc :db/id) - (update :com.github.ivarref.yoltq/payload edn/read-string) (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {}))) (update :com.github.ivarref.yoltq/bindings (fn [s] diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 996792e..2800c21 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -1,18 +1,21 @@ (ns com.github.ivarref.yoltq.virtual-test - (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] - [clojure.tools.logging :as log] - [com.github.ivarref.yoltq :as yq] - [com.github.ivarref.yoltq.error-poller :as error-poller] - [com.github.ivarref.yoltq.ext-sys :as ext] - [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq.migrate :as migrate] - [com.github.ivarref.yoltq.test-queue :as tq] - [com.github.ivarref.yoltq.test-utils :as u] - [com.github.ivarref.yoltq.utils :as uu] - [datomic-schema.core] - [datomic.api :as d] - [taoensso.timbre :as timbre]) - (:import (java.time Duration))) + (:require + [clojure.string :as str] + [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.error-poller :as error-poller] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.test-queue :as tq] + [com.github.ivarref.yoltq.test-utils :as u] + [com.github.ivarref.yoltq.utils :as uu] + [datomic-schema.core] + [datomic.api :as d] + [taoensso.nippy :as nippy] + [taoensso.timbre :as timbre]) + (:import (java.time Duration LocalDateTime))) (use-fixtures :each tq/call-with-virtual-queue!) @@ -380,3 +383,70 @@ (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms)))) (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms))))) (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms))))))) + +(deftest global-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest queue-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:encode nippy/freeze + :decode nippy/thaw}) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest global-partition + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :partition-fn (fn [_queue-name] :my-part)}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest partition-per-queue + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:partition-fn (fn [_queue-name] :my-part)}) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest string-encode-decode + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :encode (fn [x] (str/join (reverse x))) + :decode (fn [x] (str/join (reverse x)))}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q "asdf")]) + (tq/consume! :q) + (is (= @got-work "asdf")))) -- cgit v1.2.3 From 85d13545275678a1077b9600fce136ae10dcb809 Mon Sep 17 00:00:00 2001 From: Stefan van den Oord Date: Fri, 14 Jun 2024 16:08:59 +0200 Subject: #3 Add optional batch name to queue jobs --- src/com/github/ivarref/yoltq.clj | 23 +++++++++++++++++++++++ src/com/github/ivarref/yoltq/impl.clj | 5 ++++- test/com/github/ivarref/yoltq/virtual_test.clj | 22 ++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 379d701..1ba286e 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -230,6 +230,29 @@ (sort-by (juxt :qname :status)) (vec)))) +(defn batch-progress [queue-name batch-name] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?bname ?status + :keys :e :qname :bname :status + :in $ ?qname ?bname + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/batch-name ?bname] + [?e :com.github.ivarref.yoltq/status ?status]] + db queue-name batch-name) + (mapv #(select-keys % [:qname :bname :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname bname status]} v]] + (array-map + :qname qname + :batch-name bname + :status status + :count v))) + (sort-by (juxt :qname :batch-name :status)) + (vec)))) + (defn get-errors [qname] (let [{:keys [conn]} @*config* db (d/db conn)] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index ac573d1..6d2aa3d 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -12,6 +12,7 @@ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/batch-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} @@ -103,7 +104,9 @@ (pr-str-safe :depends-on [q ext-id])) (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) (when-let [ext-id (:id opts)] - {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))) + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}) + (when-let [batch-name (:batch-name opts)] + {:com.github.ivarref.yoltq/batch-name batch-name})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 2800c21..7621b13 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -450,3 +450,25 @@ @(d/transact conn [(yq/put :q "asdf")]) (tq/consume! :q) (is (= @got-work "asdf")))) + +(deftest batch-of-jobs-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q1 identity) + (yq/add-consumer! :q2 identity) + @(d/transact conn [(yq/put :q1 {:work 123} {:batch-name :b1}) + (yq/put :q1 {:work 456} {:batch-name :b2}) + (yq/put :q2 {:work 789} {:batch-name :b1})]) + (is (= [{:qname :q1 + :batch-name :b1 + :status :init + :count 1}] + (yq/batch-progress :q1 :b1))) + + (is (= {:work 123} (tq/consume! :q1))) + + (is (= [{:qname :q1 + :batch-name :b1 + :status :done + :count 1}] + (yq/batch-progress :q1 :b1))))) -- cgit v1.2.3 From ae49a7ec82ecd3988e0f7825b0adead1dc77c911 Mon Sep 17 00:00:00 2001 From: ire Date: Tue, 13 May 2025 21:39:07 +0200 Subject: Fix tx-report-queue sharing #7 --- README.md | 34 +++++++ deps.edn | 54 +++++------ src/com/github/ivarref/yoltq.clj | 86 ++++++++++++++--- src/com/github/ivarref/yoltq/report_queue.clj | 133 ++++++++++++++++++++++++-- test/com/github/ivarref/yoltq/log_init.clj | 2 + 5 files changed, 258 insertions(+), 51 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/README.md b/README.md index c5f2bdb..f84a336 100644 --- a/README.md +++ b/README.md @@ -434,6 +434,40 @@ If you liked this library, you may also like: ## Change log +#### 2025-05-13 v0.2.?? [diff](https://github.com/ivarref/yoltq/compare/v0.2.64...HEAD) +Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will +then not grab the datomic report queue, but use the one provided: + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(yq/init! {:conn conn + :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq) + ; ^^ can be any `java.util.concurrent.BlockingQueue` value + }) + +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id)) + +``` + +Added multicast support for `datomic.api/tx-report-queue`: +```clojure +(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1)) +; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` + +(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +``` + +`yq/get-tx-report-queue-multicast!` returns, like +`datomic.api/tx-report-queue`, +`java.util.concurrent.BlockingQueue` and starts a background thread that does +the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!` +returns the same `BlockingQueue`. + +Changed the default for `max-retries` from `10000` to `9223372036854775807`. + +Fixed reflection warnings. + #### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64) Added support for `max-retries` being `0`, meaning the job should be retried forever (or at least 9223372036854775807 times). diff --git a/deps.edn b/deps.edn index e36885e..1e3fa9d 100644 --- a/deps.edn +++ b/deps.edn @@ -1,33 +1,31 @@ -{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} - org.clojure/tools.logging {:mvn/version "1.2.4"} - org.clojure/clojure {:mvn/version "1.11.1"}} +{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"} + com.datomic/peer {:mvn/version "1.0.7364"}} - :paths ["src"] + :paths ["src"] - :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} - :test {:extra-paths ["test"] - :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.2.1"} - com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} - clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} - com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} - org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} - com.taoensso/nippy {:mvn/version "3.2.0"} - io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} - :jvm-opts ["-DDISABLE_SPY=true" - "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] - :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + :aliases {:test {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :exec-fn cognitect.test-runner.api/test + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} - :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" - :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} - :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" + :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} + :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} - :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} - :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} - :exec-fn deps-deploy.deps-deploy/deploy - :exec-args {:installer :remote - :sign-releases? false - :artifact "target/out.jar"}}} - - :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}} + :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} + :exec-fn deps-deploy.deps-deploy/deploy + :exec-args {:installer :remote + :sign-releases? false + :artifact "target/out.jar"}}}} \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index a7dcddf..32298b7 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -12,7 +12,7 @@ (:import (datomic Connection) (java.lang.management ManagementFactory) (java.time Duration Instant ZoneOffset ZonedDateTime) - (java.util.concurrent ExecutorService Executors ScheduledExecutorService TimeUnit))) + (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit))) (defonce ^:dynamic *config* (atom nil)) (defonce threadpool (atom nil)) @@ -26,7 +26,7 @@ ; If you want no limit on the number of retries, specify ; the value `0`. That will set the effective retry limit to ; 9223372036854775807 times. - :max-retries 10000 + :max-retries 9223372036854775807 ; Minimum amount of time to wait before a failed queue job is retried :error-backoff-time (Duration/ofSeconds 5) @@ -86,6 +86,9 @@ (defn init! [{:keys [conn tx-report-queue] :as cfg}] (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (when (some? tx-report-queue) + (assert (instance? BlockingQueue tx-report-queue) + (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue"))) (locking threadpool @(d/transact conn i/schema) (let [new-cfg (swap! *config* @@ -96,9 +99,6 @@ :system-error (atom {}) :healthy? (atom nil) :slow? (atom nil) - :get-tx-report-queue (fn [] - (or tx-report-queue - (d/tx-report-queue conn))) :slow-thread-watcher-done? (promise)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) @@ -144,12 +144,37 @@ (reset! *running?* true) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) - (.execute ^ScheduledExecutorService pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.execute ^ScheduledExecutorService pool + (fn [] + (try + (log/debug "report-queue-listener starting") + (rq/report-queue-listener *running?* queue-listener-ready pool *config*) + (finally + (log/debug "report-queue-listener exiting") + (deliver queue-listener-ready :finally))))) (future (try (slow-executor/show-slow-threads pool *config*) (finally (deliver slow-thread-watcher-done? :done)))) - @queue-listener-ready))) + (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)] + (cond (= :timeout q-listener-retval) + (do + (log/error "Timed out waiting for report-queue-listener to start") + (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start"))) + + (= :finally q-listener-retval) + (do + (log/error "report-queue-listener did not start") + (throw (IllegalStateException. "report-queue-listener did not start"))) + + (= :ready q-listener-retval) + (do + (log/debug "report-queue-listener is ready")) + + :else + (do + (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))) + (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))))))))) (defn start! [] @@ -359,14 +384,13 @@ :min (apply min values))}))) (into (sorted-map))))) +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + The multicaster is started on demand. `conn` and `id` identifies the consumer. - -(defn add-tx-report-queue! - ([conn] - (add-tx-report-queue! conn :default)) - ([conn id] - (if @*config* - :...))) + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + [conn id] + (rq/get-tx-report-queue-multicast! conn id)) (comment (do @@ -401,3 +425,37 @@ @started-consuming? (stop!) nil))))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq.migrate"} :warn] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true))) + (log/info "begin start! ...") + (start!) + (log/info "begin start! ... Done") + (Thread/sleep 2000) + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop!) + (log/info "stop! done") + nil)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 20e0a93..9cddc93 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -3,8 +3,8 @@ [com.github.ivarref.yoltq.impl :as i] [datomic.api :as d] [clojure.tools.logging :as log]) - (:import (datomic Datom) - (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit))) + (:import (datomic Connection Datom) + (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) (defn process-poll-result! [cfg id-ident poll-result consumer] @@ -28,18 +28,24 @@ (i/take! cfg) (i/execute! cfg))))) (catch Throwable t - (log/error t "unexpected error in process-poll-result!"))))))))) + (log/error t "Unexpected error in process-poll-result!"))))))))) (defn report-queue-listener [running? ready? ^ScheduledExecutorService pool config-atom] - (let [conn (:conn @config-atom) - ^BlockingQueue q (d/tx-report-queue conn) + (let [cfg @config-atom + conn (:conn cfg) + tx-report-queue-given (contains? cfg :tx-report-queue) + ^BlockingQueue q (if tx-report-queue-given + (get cfg :tx-report-queue) + (d/tx-report-queue conn)) id-ident (d/q '[:find ?e . :where [?e :db/ident :com.github.ivarref.yoltq/id]] (d/db conn))] + (assert (instance? BlockingQueue q)) + (log/debug "tx-report-queue-given:" tx-report-queue-given) (try (while @running? (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] @@ -49,9 +55,118 @@ (fn [f] (when @running? (.execute ^ScheduledExecutorService pool f))))) - (deliver ready? true)) + (deliver ready? :ready)) (catch Throwable t - (log/error t "unexpected error in report-queue-listener")) + (log/error t "Unexpected error in report-queue-listener:" (.getMessage t))) (finally - (log/debug "remove tx-report-queue") - (d/remove-tx-report-queue conn))))) \ No newline at end of file + (if tx-report-queue-given + (log/debug "Remove tx-report-queue handled elsewhere") + (do + (log/debug "Remove tx-report-queue") + (d/remove-tx-report-queue conn))))))) + +(defonce ^:private multicast-state-lock (Object.)) + +(defonce ^:private multicast-state (atom {})) + +(defn- start-multicaster! [conn] + (let [multicaster-ready? (promise)] + (future + (log/debug "Multicaster starting for conn" conn) + (try + (let [input-queue (d/tx-report-queue conn)] + (loop [] + (when-let [mcast-state (get @multicast-state conn)] + (when-let [dest-queues (vals mcast-state)] + (let [element (.poll ^BlockingQueue input-queue 1 TimeUnit/SECONDS)] + (deliver multicaster-ready? :ready) + (when (some? element) + (doseq [q dest-queues] + (let [ok-offer (.offer ^BlockingQueue q element 30 TimeUnit/MINUTES)] + (when (false? ok-offer) + (log/error "Failed to offer item in multicaster for connection" conn)))))) + (recur))))) + (catch Throwable t + (deliver multicaster-ready? :error) + (log/error t "Unexpected error in multicaster:" (.getMessage t))) + (finally + (d/remove-tx-report-queue conn) + (log/debug "Multicaster exiting for conn" conn)))) + multicaster-ready?)) + +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + The multicaster is started on demand. `conn` and `id` identifies the consumer. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + [conn id] + (assert (instance? Connection conn)) + (assert (keyword? id)) + (locking multicast-state-lock + (assert (map? @multicast-state)) + (if-let [existing-q (get-in @multicast-state [conn id])] + (do + (log/debug "returning existing queue for id" id) + (assert (instance? BlockingQueue existing-q)) + existing-q) + (let [needs-multicaster? (not (contains? @multicast-state conn)) + new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [conn id] (LinkedBlockingQueue.))))] + (when needs-multicaster? + (let [multicaster-promise (start-multicaster! conn) + multicaster-result (deref multicaster-promise (* 30 60000) :timeout)] + (cond (= multicaster-result :timeout) + (do + (log/error "Timeout waiting for multicaster to start") + (throw (RuntimeException. "Timeout waiting for multicaster to start"))) + (= multicaster-result :error) + (do + (log/error "Multicaster failed to start") + (throw (RuntimeException. "Multicaster failed to start"))) + (= multicaster-result :ready) + (log/debug "Multicaster is ready") + + :else + (do + (log/error "Unexpected state from multicaster:" multicaster-result) + (throw (RuntimeException. (str "Unexpected state from multicaster: " multicaster-result))))))) + (let [new-q (get-in new-state [conn id])] + (assert (instance? BlockingQueue new-q)) + new-q))))) + +(defn stop-all-multicasters! [] + (reset! multicast-state {})) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) + +(comment + (defn drain! [^BlockingQueue q] + (loop [cnt 0] + (if (nil? (.poll q 1 TimeUnit/SECONDS)) + cnt + (recur (inc cnt)))))) + +(comment + (let [q-1 (get-tx-report-queue-multicast! conn :q1) + q-2 (get-tx-report-queue-multicast! conn :q2)])) + +(comment + (drain! (get-tx-report-queue-multicast! conn :q1))) + +(comment + (do + @(d/transact conn [{:db/doc "demo"}]) + :yay)) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj index 1aa6c02..f3fb6dc 100644 --- a/test/com/github/ivarref/yoltq/log_init.clj +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -3,6 +3,8 @@ [taoensso.timbre :as timbre] [clojure.string :as str])) +(set! *warn-on-reflection* true) + (def level-colors {;:warn colors/red :error colors/red}) -- cgit v1.2.3 From 4797e559410bce644c40b05fa9a321171a781e78 Mon Sep 17 00:00:00 2001 From: ire Date: Tue, 20 May 2025 22:43:39 +0200 Subject: Improve tx-report-queue sharing #7 --- src/com/github/ivarref/yoltq/report_queue.clj | 342 +++++++++++++++++++++----- test/com/github/ivarref/yoltq/log_init.clj | 2 +- 2 files changed, 283 insertions(+), 61 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 9cddc93..239de12 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -6,7 +6,6 @@ (:import (datomic Connection Datom) (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) - (defn process-poll-result! [cfg id-ident poll-result consumer] (let [{:keys [tx-data db-after]} poll-result] (when-let [new-ids (->> tx-data @@ -30,7 +29,6 @@ (catch Throwable t (log/error t "Unexpected error in process-poll-result!"))))))))) - (defn report-queue-listener [running? ready? ^ScheduledExecutorService pool @@ -65,80 +63,253 @@ (log/debug "Remove tx-report-queue") (d/remove-tx-report-queue conn))))))) -(defonce ^:private multicast-state-lock (Object.)) +; https://stackoverflow.com/a/14488425 +(defn- dissoc-in + "Dissociates an entry from a nested associative structure returning a new + nested structure. keys is a sequence of keys. Any empty maps that result + will not be present in the new structure." + [m [k & ks :as keys]] + (if ks + (if-let [nextmap (get m k)] + (let [newmap (dissoc-in nextmap ks)] + (if (seq newmap) + (assoc m k newmap) + (dissoc m k))) + m) + (dissoc m k))) + +(defn- queues-to-shutdown [old-state new-state] + (assert (map? old-state)) + (assert (map? new-state)) + (doseq [x (vals new-state)] + (assert (vector? x))) + (doseq [x (vals old-state)] + (assert (vector? x))) + (let [new-qs (into #{} (mapv second (vals new-state)))] + (reduce + (fn [o [send-end-token? old-q]] + ;(assert (boolean? send-end-token?)) + ;(assert (instance? BlockingQueue old-q)) + (if (contains? new-qs old-q) + o + (conj o [send-end-token? old-q]))) + [] + (vals old-state)))) + +(comment + (queues-to-shutdown {:a [true 999] :b [false 777]} + {:a [true 123] :b [true 777]})) +(defn- multicast-once [conn work-item old-state new-state] + (assert (map? old-state)) + (assert (map? new-state)) + (doseq [[send-end-token? q-to-shutdown] (queues-to-shutdown old-state new-state)] + (if send-end-token? + (do + #_(log/debug "offering :end token") + (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)) + (do + #_(log/debug "not offering :end token")))) + (when (seq new-state) + (if (some? work-item) + (reduce-kv + (fn [m id [send-end-token? q]] + (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)] + (if (true? ok-offer) + (assoc m id [send-end-token? q]) + (log/warn "Failed to offer item in multicaster for connection" conn "and queue id" id)))) + {} + new-state) + new-state))) + +(defonce ^:private multicast-state-lock (Object.)) +(defonce ^:private consumer-state-lock (Object.)) (defonce ^:private multicast-state (atom {})) +(defonce ^:private thread-count (atom 0)) + +(defn- multicaster-loop [init-state conn ready?] + (let [input-queue (d/tx-report-queue conn)] + (deliver ready? true) + (loop [old-state init-state] + (let [work-item (.poll ^BlockingQueue input-queue 16 TimeUnit/MILLISECONDS) + new-state (locking multicast-state-lock + ; writer to `multicast-state` must be protected by `multicast-state-lock` + ; it should block minimally / spend minimum amount of time + (swap! multicast-state (fn [old-state] (update-in old-state [:iter-count conn] (fnil inc 0)))) + (if-let [new-state (multicast-once conn work-item old-state (get-in @multicast-state [:queues conn] {}))] + new-state + (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))) + (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec))) + (d/remove-tx-report-queue conn) + nil)))] + (if new-state + (recur new-state) + nil))))) (defn- start-multicaster! [conn] - (let [multicaster-ready? (promise)] + (let [ready? (promise)] (future (log/debug "Multicaster starting for conn" conn) (try - (let [input-queue (d/tx-report-queue conn)] - (loop [] - (when-let [mcast-state (get @multicast-state conn)] - (when-let [dest-queues (vals mcast-state)] - (let [element (.poll ^BlockingQueue input-queue 1 TimeUnit/SECONDS)] - (deliver multicaster-ready? :ready) - (when (some? element) - (doseq [q dest-queues] - (let [ok-offer (.offer ^BlockingQueue q element 30 TimeUnit/MINUTES)] - (when (false? ok-offer) - (log/error "Failed to offer item in multicaster for connection" conn)))))) - (recur))))) + (swap! thread-count inc) + (let [new-state (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] (fnil inc 0))))] + (assert (= 1 (get-in new-state [:thread-count conn]))) + ; "parent" thread holds `multicast-state-lock` and + ; waits for `ready?` promise, so effectively this new thread also holds + ; the lock until `ready?` is delivered. That is: it is safe + ; for this thread to modify multicast-state regardless of what other threads are doing + (multicaster-loop (get-in new-state [:queues conn]) conn ready?)) (catch Throwable t - (deliver multicaster-ready? :error) - (log/error t "Unexpected error in multicaster:" (.getMessage t))) + (log/error t "Unexpected error in multicaster:" (.getMessage t)) + (log/error "Multicaster exiting for conn")) (finally - (d/remove-tx-report-queue conn) + (swap! thread-count dec) (log/debug "Multicaster exiting for conn" conn)))) - multicaster-ready?)) + @ready?)) + +(defn- wait-multicast-thread-step [conn] + ; `get-tx-report-queue-multicast!` should return only when the multicaster thread + ; has picked up the new queue. + ; + ; Otherwise the following could happen: + ; 1. multicast thread is sleeping + ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` + ; 3: user-thread (or somebody else) calls `stop-multicaster`. + ; The multicast-state atom is now identical as it was in 1 + ; 4: multicast thread is scheduled and does _not_ detect any state change. + ; And therefore the multicast thread does _not_ send out an :end token as one would expect. + ; + ; Once [:iter-count conn] has changed, we know that the multicaster thread + ; will see the new queue. + ; We are still holding the consumer-state-lock, so no other thread + ; can do any stop-multicasting that would/could corrupt the state. + ; We can then be sure that the queue will receive the `:end` token when/if + ; the queue is stopped. + (let [start-ms (System/currentTimeMillis) + iter-count (get-in @multicast-state [:iter-count conn] -1)] + (loop [spin-count 0] + (if (not= iter-count (get-in @multicast-state [:iter-count conn])) + nil + (do + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count)))))))))) (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. The multicaster is started on demand. `conn` and `id` identifies the consumer. Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." - [conn id] - (assert (instance? Connection conn)) - (assert (keyword? id)) - (locking multicast-state-lock - (assert (map? @multicast-state)) - (if-let [existing-q (get-in @multicast-state [conn id])] - (do - (log/debug "returning existing queue for id" id) - (assert (instance? BlockingQueue existing-q)) - existing-q) - (let [needs-multicaster? (not (contains? @multicast-state conn)) - new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [conn id] (LinkedBlockingQueue.))))] - (when needs-multicaster? - (let [multicaster-promise (start-multicaster! conn) - multicaster-result (deref multicaster-promise (* 30 60000) :timeout)] - (cond (= multicaster-result :timeout) - (do - (log/error "Timeout waiting for multicaster to start") - (throw (RuntimeException. "Timeout waiting for multicaster to start"))) - (= multicaster-result :error) - (do - (log/error "Multicaster failed to start") - (throw (RuntimeException. "Multicaster failed to start"))) - (= multicaster-result :ready) - (log/debug "Multicaster is ready") + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (keyword? id)) + (locking consumer-state-lock + (let [the-q + (locking multicast-state-lock + (assert (map? @multicast-state)) + (if-let [existing-q (get-in @multicast-state [:queues conn id])] + (do + (swap! multicast-state + (fn [old-state] + (update-in old-state [:queues conn id] (fn [[end-token? q]] + (if (not= end-token? send-end-token?) + (log/debug "flipped `send-end-token?`") + (log/debug "identical `send-end-token?`")) + [send-end-token? q])))) + (log/debug "Returning existing queue for id" id) + (assert (instance? BlockingQueue (second existing-q))) + (second existing-q)) + (let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn])) + new-q (LinkedBlockingQueue.) + new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))] + (if needs-multicaster? + (do + (start-multicaster! conn) + (log/debug "Multicaster thread started. Returning new queue for id" id) + new-q) + (do + (log/debug "Multicaster thread already exists. Returning new queue for id" id) + new-q)))))] + ; wait for multicaster thread to pick up current Queue + (wait-multicast-thread-step conn) + the-q)))) - :else +(defn wait-multicast-threads-exit [[old-state new-state]] + (assert (map? old-state)) + (assert (map? new-state)) + (assert (map? (get old-state :queues {}))) + (assert (map? (get new-state :queues {}))) + (assert (map? (get old-state :thread-count {}))) + (assert (map? (get new-state :thread-count {}))) + (locking consumer-state-lock + ; No new multicast threads will be launched inside this block. + ; The lock is already held by parent function. + ; + ; Why do we need to _wait_ for multicaster thread(s) to exit after + ; removing all queue ids for a given connection? + ; Otherwise the following could happen: + ; 1. multicaster thread is sleeping + ; 2. user calls stop-multicaster! + ; One would expect that multicaster thread would exit, but it is still sleeping + ; 3. user calls get-tx-report-queue-multicast! with the same conn + ; The state is now empty, so a new multicaster thread is spawned. + ; 4. Now there is two multicaster threads for the same connection! + ; ... and since the datomic report queue can be shared between threads + ; it will seemingly work, but when the end event is sent, it will be + ; sent by multiple threads. + (let [old-conns (into #{} (keys (get old-state :queues {}))) + new-conns (into #{} (keys (get new-state :queues {})))] + (doseq [old-conn old-conns] + (when-not (contains? new-conns old-conn) + (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)] + (assert (= 1 old-threadcount)) + (let [start-ms (System/currentTimeMillis)] + (loop [] + (if (= 0 (get-in @multicast-state [:thread-count old-conn])) + :ok (do - (log/error "Unexpected state from multicaster:" multicaster-result) - (throw (RuntimeException. (str "Unexpected state from multicaster: " multicaster-result))))))) - (let [new-q (get-in new-state [conn id])] - (assert (instance? BlockingQueue new-q)) - new-q))))) + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread to exit")) + (do + (Thread/sleep 16) + (recur)))))))))))))) + +(defn stop-multicaster-id! [conn id] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (swap-vals! multicast-state (fn [old-state] + (let [new-state (dissoc-in old-state [:queues conn id])] + (if (= {} (get-in new-state [:queues conn])) + (dissoc-in old-state [:queues conn]) + new-state)))))))) + +(defn stop-multicaster! [conn] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))))))) (defn stop-all-multicasters! [] - (reset! multicast-state {})) + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {}))))))) (comment (do (require 'com.github.ivarref.yoltq.log-init) + (defn drain! [^BlockingQueue q] + (loop [items []] + (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)] + (recur (conj items elem)) + items))) (com.github.ivarref.yoltq.log-init/init-logging! [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] [#{"com.github.ivarref.yoltq.report-queue"} :debug] @@ -153,20 +324,71 @@ conn)))) (comment - (defn drain! [^BlockingQueue q] - (loop [cnt 0] - (if (nil? (.poll q 1 TimeUnit/SECONDS)) - cnt - (recur (inc cnt)))))) + (do + (require 'com.github.ivarref.yoltq.log-init) + (defn drain! [^BlockingQueue q] + (loop [items []] + (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)] + (recur (conj items elem)) + items))) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (log/info "********************************") + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)) + (log/info "stop-all!") + (stop-all-multicasters!) + (assert (= 0 @thread-count)) + (let [q1 (get-tx-report-queue-multicast! conn :q1 false) + q2 (get-tx-report-queue-multicast! conn :q2 false) + _ (get-tx-report-queue-multicast! conn :q1 true)] + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + (log/info "begin drain q1") + (stop-multicaster-id! conn :q1) + (println "thread count" @thread-count) + (let [qitems-2 (drain! q2) + qitems-1 (drain! q1)] + (assert (= :end (last qitems-1))) + (println "drain count q1:" (count qitems-1)) + (println "drain count q2:" (count qitems-2)))))) + +(comment + (do + (let [q (get-tx-report-queue-multicast! conn :q1 true)] + (log/debug "stopping id :q1") + (stop-multicaster-id! conn :q1) + (let [drained (drain! q)] + (println "drained:" drained) + (assert (= [:end] drained))) + @multicast-state))) (comment - (let [q-1 (get-tx-report-queue-multicast! conn :q1) - q-2 (get-tx-report-queue-multicast! conn :q2)])) + (stop-all-multicasters!)) (comment - (drain! (get-tx-report-queue-multicast! conn :q1))) + (do + (let [q (get-tx-report-queue-multicast! conn :q2 false)] + (println "drain count:" (count (drain! q))) + @multicast-state + nil))) + +(comment + (get-tx-report-queue-multicast! conn :q1 false) + (get-tx-report-queue-multicast! conn :q1 true)) (comment (do + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) @(d/transact conn [{:db/doc "demo"}]) :yay)) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj index f3fb6dc..7eae557 100644 --- a/test/com/github/ivarref/yoltq/log_init.clj +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -48,7 +48,7 @@ (color-f (force msg_)) - #_maybe-stacktrace)))) + maybe-stacktrace)))) (catch Throwable t -- cgit v1.2.3 From 8b46092126baea5cd73465f5d544cdb0f75547b6 Mon Sep 17 00:00:00 2001 From: Stefan van den Oord Date: Tue, 16 Sep 2025 10:36:07 +0200 Subject: Rename batch -> job-group --- src/com/github/ivarref/yoltq.clj | 14 +++++++------- src/com/github/ivarref/yoltq/impl.clj | 6 +++--- test/com/github/ivarref/yoltq/virtual_test.clj | 16 ++++++++-------- 3 files changed, 18 insertions(+), 18 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index ccd9062..88a7c31 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -257,27 +257,27 @@ (sort-by (juxt :qname :status)) (vec)))) -(defn batch-progress [queue-name batch-name] +(defn job-group-progress [queue-name job-group-name] (let [{:keys [conn]} @*config* db (d/db conn)] - (->> (d/q '[:find ?e ?qname ?bname ?status + (->> (d/q '[:find ?e ?qname ?jgname ?status :keys :e :qname :bname :status - :in $ ?qname ?bname + :in $ ?qname ?jgname :where [?e :com.github.ivarref.yoltq/queue-name ?qname] - [?e :com.github.ivarref.yoltq/batch-name ?bname] + [?e :com.github.ivarref.yoltq/job-group-name ?jgname] [?e :com.github.ivarref.yoltq/status ?status]] - db queue-name batch-name) + db queue-name job-group-name) (mapv #(select-keys % [:qname :bname :status])) (mapv (fn [qitem] {qitem 1})) (reduce (partial merge-with +) {}) (mapv (fn [[{:keys [qname bname status]} v]] (array-map :qname qname - :batch-name bname + :job-group-name bname :status status :count v))) - (sort-by (juxt :qname :batch-name :status)) + (sort-by (juxt :qname :job-group-name :status)) (vec)))) (defn get-errors [qname] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 6d2aa3d..e77655b 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -12,7 +12,7 @@ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} - #:db{:ident :com.github.ivarref.yoltq/batch-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/job-group-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} @@ -105,8 +105,8 @@ (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) (when-let [ext-id (:id opts)] {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}) - (when-let [batch-name (:batch-name opts)] - {:com.github.ivarref.yoltq/batch-name batch-name})))) + (when-let [job-group-name (:job-group-name opts)] + {:com.github.ivarref.yoltq/job-group-name job-group-name})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 7621b13..d245aaa 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -451,24 +451,24 @@ (tq/consume! :q) (is (= @got-work "asdf")))) -(deftest batch-of-jobs-test +(deftest job-group-test (let [conn (u/empty-conn)] (yq/init! {:conn conn}) (yq/add-consumer! :q1 identity) (yq/add-consumer! :q2 identity) - @(d/transact conn [(yq/put :q1 {:work 123} {:batch-name :b1}) - (yq/put :q1 {:work 456} {:batch-name :b2}) - (yq/put :q2 {:work 789} {:batch-name :b1})]) + @(d/transact conn [(yq/put :q1 {:work 123} {:job-group-name :b1}) + (yq/put :q1 {:work 456} {:job-group-name :b2}) + (yq/put :q2 {:work 789} {:job-group-name :b1})]) (is (= [{:qname :q1 - :batch-name :b1 + :job-group-name :b1 :status :init :count 1}] - (yq/batch-progress :q1 :b1))) + (yq/job-group-progress :q1 :b1))) (is (= {:work 123} (tq/consume! :q1))) (is (= [{:qname :q1 - :batch-name :b1 + :job-group-name :b1 :status :done :count 1}] - (yq/batch-progress :q1 :b1))))) + (yq/job-group-progress :q1 :b1))))) -- cgit v1.2.3 From 698ab89d3a48fd6c42f0abbb1fb6b6c9e8d4d53a Mon Sep 17 00:00:00 2001 From: Stefan van den Oord Date: Tue, 16 Sep 2025 11:10:02 +0200 Subject: Improve naming: job-group is a keyword, so don't include "-name" --- src/com/github/ivarref/yoltq.clj | 20 ++++++++++---------- src/com/github/ivarref/yoltq/impl.clj | 6 +++--- test/com/github/ivarref/yoltq/virtual_test.clj | 14 +++++++------- 3 files changed, 20 insertions(+), 20 deletions(-) (limited to 'test/com/github/ivarref/yoltq') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 88a7c31..8c8ca7a 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -257,27 +257,27 @@ (sort-by (juxt :qname :status)) (vec)))) -(defn job-group-progress [queue-name job-group-name] +(defn job-group-progress [queue-name job-group] (let [{:keys [conn]} @*config* db (d/db conn)] - (->> (d/q '[:find ?e ?qname ?jgname ?status - :keys :e :qname :bname :status - :in $ ?qname ?jgname + (->> (d/q '[:find ?e ?qname ?job-group ?status + :keys :e :qname :job-group :status + :in $ ?qname ?job-group :where [?e :com.github.ivarref.yoltq/queue-name ?qname] - [?e :com.github.ivarref.yoltq/job-group-name ?jgname] + [?e :com.github.ivarref.yoltq/job-group ?job-group] [?e :com.github.ivarref.yoltq/status ?status]] - db queue-name job-group-name) - (mapv #(select-keys % [:qname :bname :status])) + db queue-name job-group) + (mapv #(select-keys % [:qname :job-group :status])) (mapv (fn [qitem] {qitem 1})) (reduce (partial merge-with +) {}) - (mapv (fn [[{:keys [qname bname status]} v]] + (mapv (fn [[{:keys [qname job-group status]} v]] (array-map :qname qname - :job-group-name bname + :job-group job-group :status status :count v))) - (sort-by (juxt :qname :job-group-name :status)) + (sort-by (juxt :qname :job-group :status)) (vec)))) (defn get-errors [qname] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index e77655b..ffb1ad8 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -12,7 +12,7 @@ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} - #:db{:ident :com.github.ivarref.yoltq/job-group-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/job-group, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} @@ -105,8 +105,8 @@ (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) (when-let [ext-id (:id opts)] {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}) - (when-let [job-group-name (:job-group-name opts)] - {:com.github.ivarref.yoltq/job-group-name job-group-name})))) + (when-let [job-group (:job-group opts)] + {:com.github.ivarref.yoltq/job-group job-group})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index d245aaa..a2ed269 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -456,19 +456,19 @@ (yq/init! {:conn conn}) (yq/add-consumer! :q1 identity) (yq/add-consumer! :q2 identity) - @(d/transact conn [(yq/put :q1 {:work 123} {:job-group-name :b1}) - (yq/put :q1 {:work 456} {:job-group-name :b2}) - (yq/put :q2 {:work 789} {:job-group-name :b1})]) + @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1}) + (yq/put :q1 {:work 456} {:job-group :group2}) + (yq/put :q2 {:work 789} {:job-group :group1})]) (is (= [{:qname :q1 - :job-group-name :b1 + :job-group :group1 :status :init :count 1}] - (yq/job-group-progress :q1 :b1))) + (yq/job-group-progress :q1 :group1))) (is (= {:work 123} (tq/consume! :q1))) (is (= [{:qname :q1 - :job-group-name :b1 + :job-group :group1 :status :done :count 1}] - (yq/job-group-progress :q1 :b1))))) + (yq/job-group-progress :q1 :group1))))) -- cgit v1.2.3