From ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sat, 4 Sep 2021 13:23:07 +0200 Subject: Initial commit Add release script Release 0.1.3 Use com.github.ivarref.yoltq namespace Use com.github.ivarref.yoltq namespace --- src/com/github/ivarref/yoltq/impl.clj | 147 ++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 src/com/github/ivarref/yoltq/impl.clj (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj new file mode 100644 index 0000000..2acc83d --- /dev/null +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -0,0 +1,147 @@ +(ns com.github.ivarref.yoltq.impl + (:require [datomic.api :as d] + [clojure.tools.logging :as log] + [clojure.string :as str] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.ext-sys :as ext])) + + +(def schema + [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} + #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) + + +(defn put [config queue-name payload] + (if-let [_ (get-in config [:handlers queue-name])] + (let [id (u/squuid)] + (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) + {:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/payload (pr-str payload) + :com.github.ivarref.yoltq/bindings (pr-str {}) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ns)}) + (do + (log/error "Did not find registered handler for queue" queue-name) + (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) + + +(defn take! [{:keys [conn cas-failures hung-log-level] + :or {hung-log-level :error}} + {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}] + (when queue-item-info + (try + (cond to-error? + (log/logp hung-log-level "queue-item" (str id) "was hung and retried too many times. Giving up!") + + was-hung? + (log/logp hung-log-level "queue-item" (str id) "was hung, retrying ...") + + :else + nil) + (let [{:keys [db-after]} @(d/transact conn tx) + {:com.github.ivarref.yoltq/keys [status] :as q-item} (u/get-queue-item db-after id)] + (log/debug "queue item" (str id) "for queue" queue-name "now has status" status) + q-item) + (catch Throwable t + (let [{:db/keys [error] :as m} (u/db-error-map t)] + (cond + (= :db.error/cas-failed error) + (do + (log/info ":db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) + (when cas-failures + (swap! cas-failures inc)) + nil) + + :else + (do + (log/error t "Unexpected failure for queue item" (str id) ":" (ex-message t)) + nil))))))) + + +(defn mark-status! [{:keys [conn]} + {:com.github.ivarref.yoltq/keys [id lock tries]} + new-status] + (try + (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status] + (if (= new-status u/status-done) + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)} + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})] + {:keys [db-after]} @(d/transact conn tx)] + (u/get-queue-item db-after id)) + (catch Throwable t + (log/error t "unexpected error in mark-status!: " (ex-message t)) + nil))) + + +(defn fmt [id queue-name new-status tries spent-ns] + (str/join " " ["queue-item" (str id) + "for queue" queue-name + "now has status" new-status + "after" tries (if (= 1 tries) + "try" + "tries") + "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) + + +(defn execute! [{:keys [handlers mark-status-fn! start-execute-time] + :or {mark-status-fn! mark-status!} + :as cfg} + {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}] + (when queue-item + (if (= :error status) + (assoc queue-item :failed? true) + (if-let [queue (get handlers queue-name)] + (let [{:keys [f allow-cas-failure?]} queue] + (log/debug "queue item" (str id) "for queue" queue-name "is now processing") + (let [{:keys [retval exception]} + (try + (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name]) + (let [v (f payload)] + {:retval v}) + (catch Throwable t + {:exception t}) + (finally + (swap! start-execute-time dissoc (Thread/currentThread)))) + {:db/keys [error] :as m} (u/db-error-map exception)] + (cond + (and (some? exception) + allow-cas-failure? + (= :db.error/cas-failed error) + (or (true? allow-cas-failure?) + (allow-cas-failure? (:a m)))) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (assoc q-item :retval retval :success? true :allow-cas-failure? true))) + + (some? exception) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-error)] + (let [{:com.github.ivarref.yoltq/keys [init-time error-time tries]} q-item + level (if (>= tries 3) :error :warn)] + (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) + (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) + (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) + (assoc q-item :exception exception))) + + :else + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (assoc q-item :retval retval :success? true)))))) + (do + (log/error "no handler for queue" queue-name) + nil))))) -- cgit v1.2.3 From f2b96daef274415c8e3ba74ce492ef9c9d183711 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Tue, 14 Sep 2021 19:16:26 +0200 Subject: Add ability to measure time spent on transacting vs. total time spent. Using transactor with a real postgres database, one CPU and an no-op identity queue consumer, transacting accounts for about 99.5% of the total time used. --- src/com/github/ivarref/yoltq.clj | 4 ++-- src/com/github/ivarref/yoltq/impl.clj | 15 +++++++++++---- 2 files changed, 13 insertions(+), 6 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 565c01d..2eb39e8 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -91,11 +91,11 @@ (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f})))))) -(defn put [id payload] +(defn put [queue-id payload] (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] (when (and *test-mode* bootstrap-poller!) (bootstrap-poller! conn)) - (i/put cfg id payload))) + (i/put cfg queue-id payload))) (defn- do-start! [] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 2acc83d..6a4f105 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -37,7 +37,7 @@ (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) -(defn take! [{:keys [conn cas-failures hung-log-level] +(defn take! [{:keys [conn cas-failures hung-log-level tx-spent-time!] :or {hung-log-level :error}} {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}] (when queue-item-info @@ -50,7 +50,9 @@ :else nil) - (let [{:keys [db-after]} @(d/transact conn tx) + (let [start-time (System/nanoTime) + {:keys [db-after]} @(d/transact conn tx) + _ (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) {:com.github.ivarref.yoltq/keys [status] :as q-item} (u/get-queue-item db-after id)] (log/debug "queue item" (str id) "for queue" queue-name "now has status" status) q-item) @@ -70,7 +72,7 @@ nil))))))) -(defn mark-status! [{:keys [conn]} +(defn mark-status! [{:keys [conn tx-spent-time!]} {:com.github.ivarref.yoltq/keys [id lock tries]} new-status] (try @@ -80,7 +82,9 @@ (if (= new-status u/status-done) {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)} {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})] + start-time (System/nanoTime) {:keys [db-after]} @(d/transact conn tx)] + (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) (u/get-queue-item db-after id)) (catch Throwable t (log/error t "unexpected error in mark-status!: " (ex-message t)) @@ -97,7 +101,7 @@ "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) -(defn execute! [{:keys [handlers mark-status-fn! start-execute-time] +(defn execute! [{:keys [handlers mark-status-fn! start-execute-time collect-spent-time!] :or {mark-status-fn! mark-status!} :as cfg} {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}] @@ -126,6 +130,7 @@ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) (assoc q-item :retval retval :success? true :allow-cas-failure? true))) (some? exception) @@ -135,12 +140,14 @@ (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) + (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) (assoc q-item :exception exception))) :else (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) (assoc q-item :retval retval :success? true)))))) (do (log/error "no handler for queue" queue-name) -- cgit v1.2.3 From 538f0111dfb02da0f875b5777c97684d451be73a Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 17 Sep 2021 14:09:10 +0200 Subject: Save bindings on put --- src/com/github/ivarref/yoltq/impl.clj | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 6a4f105..cb99b08 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -20,15 +20,21 @@ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) -(defn put [config queue-name payload] +(defn put [{:keys [capture-bindings] :as config} + queue-name payload] (if-let [_ (get-in config [:handlers queue-name])] - (let [id (u/squuid)] + (let [id (u/squuid) + str-bindings (->> (reduce (fn [o k] + (assoc o (symbol k) (deref k))) + {} + (or capture-bindings [])) + (pr-str))] (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) {:com.github.ivarref.yoltq/id id :com.github.ivarref.yoltq/queue-name queue-name :com.github.ivarref.yoltq/status u/status-init :com.github.ivarref.yoltq/payload (pr-str payload) - :com.github.ivarref.yoltq/bindings (pr-str {}) + :com.github.ivarref.yoltq/bindings str-bindings :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 :com.github.ivarref.yoltq/init-time (u/now-ns)}) -- cgit v1.2.3 From 79cc3f448949d755c59265e2316408d037be20cb Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 23 Sep 2021 11:30:03 +0200 Subject: Add force-retry! --- src/com/github/ivarref/yoltq/impl.clj | 2 +- src/com/github/ivarref/yoltq/test_queue.clj | 49 +++++++++++--------------- test/com/github/ivarref/yoltq/virtual_test.clj | 3 +- 3 files changed, 23 insertions(+), 31 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index cb99b08..9c95cff 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -67,7 +67,7 @@ (cond (= :db.error/cas-failed error) (do - (log/info ":db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) + (log/info "take! :db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) (when cas-failures (swap! cas-failures inc)) nil) diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index 6aeb959..4c4f903 100644 --- a/src/com/github/ivarref/yoltq/test_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -42,6 +42,7 @@ config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) :init-backoff-time 0 :hung-log-level :warn + :prev-consumed (atom {}) :tx-queue txq#})] (with-bindings {#'yq/*config* config# #'yq/*running?* (atom false) @@ -154,44 +155,34 @@ (:com.github.ivarref.yoltq/status job#)) (i/take! @yq/*config*) (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) (if (:retval res#) (:retval res#) (:exception res#)))) (catch Throwable t# - (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) + (log/error t# "unexpected error in consume-expect:" (ex-message t#)) + (throw t#))) (test/is false (str "No job found for queue " ~queue-name)))) (defmacro consume! [queue-name] `(consume-expect! ~queue-name :done)) -(defn mark-fails! [{:keys [conn]} - {:com.github.ivarref.yoltq/keys [id lock tries]} - _] - (try - (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]] - @(d/transact conn tx) - nil) - (catch Throwable t - (log/error t "unexpected error in mark-status!: " (ex-message t)) - nil))) - - (defmacro force-retry! [queue-name] - `(if-let [job# (get-tx-q-job ~queue-name)] - (try - (with-bindings (:com.github.ivarref.yoltq/bindings job#) - (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) - (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!))) - (consume! ~queue-name)) - (catch Throwable t# - (log/error t# "unexpected error in consume-twice!:" (ex-message t#)))) - (test/is false (str "No job found for queue " ~queue-name)))) \ No newline at end of file + `(if-let [job# (some-> @yq/*config* :prev-consumed deref (get ~queue-name))] + (let [db-res# @(d/transact (:conn @yq/*config*) [{:com.github.ivarref.yoltq/id (:com.github.ivarref.yoltq/id job#) + :com.github.ivarref.yoltq/status :init}]) + res# (some->> (u/prepare-processing (:db-after db-res#) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + :init) + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) + (test/is (= :done (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#))) + (test/is false "Expected to have previously consumed something. Was nil."))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 442acac..2b67e5e 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -282,4 +282,5 @@ (fn [_] (swap! cnt inc)))) @(d/transact conn [(yq/put :q nil)]) (is (= 1 (tq/consume! :q))) - #_(is (= 2 (tq/force-retry! :q))))) + (is (= 2 (tq/force-retry! :q))) + (is (= 3 (tq/force-retry! :q))))) -- cgit v1.2.3 From dc2e14b4e1e91e6fefecc01c312a44c0033640c9 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 23 Sep 2021 13:01:23 +0200 Subject: Basic depends-on works for test queue --- src/com/github/ivarref/yoltq.clj | 12 +++--- src/com/github/ivarref/yoltq/impl.clj | 51 +++++++++++++++++++++----- src/com/github/ivarref/yoltq/test_queue.clj | 27 ++++++++------ src/com/github/ivarref/yoltq/utils.clj | 1 + test/com/github/ivarref/yoltq/test_utils.clj | 2 +- test/com/github/ivarref/yoltq/virtual_test.clj | 27 ++++++++++++++ 6 files changed, 92 insertions(+), 28 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 58efca1..3164020 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -94,11 +94,13 @@ (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f})))))) -(defn put [queue-id payload] - (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] - (when (and *test-mode* bootstrap-poller!) - (bootstrap-poller! conn)) - (i/put cfg queue-id payload))) +(defn put + ([queue-id payload] (put queue-id payload {})) + ([queue-id payload opts] + (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] + (when (and *test-mode* bootstrap-poller!) + (bootstrap-poller! conn)) + (i/put cfg queue-id payload opts)))) (defn- do-start! [] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 9c95cff..9811c93 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -8,9 +8,11 @@ (def schema [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} + #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true} @@ -20,8 +22,10 @@ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) -(defn put [{:keys [capture-bindings] :as config} - queue-name payload] +(defn put [{:keys [capture-bindings conn] :as config} + queue-name + payload + opts] (if-let [_ (get-in config [:handlers queue-name])] (let [id (u/squuid) str-bindings (->> (reduce (fn [o k] @@ -30,19 +34,46 @@ (or capture-bindings [])) (pr-str))] (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) - {:com.github.ivarref.yoltq/id id - :com.github.ivarref.yoltq/queue-name queue-name - :com.github.ivarref.yoltq/status u/status-init - :com.github.ivarref.yoltq/payload (pr-str payload) - :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/lock (u/random-uuid) - :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ns)}) + (merge + {:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/payload (pr-str payload) + :com.github.ivarref.yoltq/bindings str-bindings + :com.github.ivarref.yoltq/opts (pr-str (or opts {})) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ns)} + (when-let [[q ext-id] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] + (d/db conn) + (pr-str [q ext-id])) + (throw (ex-info ":depends-on not found in database" opts)))) + (when-let [ext-id (:id opts)] + {:com.github.ivarref.yoltq/ext-id (pr-str [queue-name ext-id])}))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) +(defn depends-on-waiting? [{:keys [conn]} + {:keys [id]}] + (let [db (d/db conn)] + (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db id)] + (when-let [[q id :as depends-on] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id] + [?e :com.github.ivarref.yoltq/status :done]] + db + (pr-str [q id])) + {:depends-on depends-on}))))) + + (defn take! [{:keys [conn cas-failures hung-log-level tx-spent-time!] :or {hung-log-level :error}} {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}] diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index 4c4f903..6183216 100644 --- a/src/com/github/ivarref/yoltq/test_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -148,18 +148,21 @@ `(if-let [job# (get-tx-q-job ~queue-name)] (try (with-bindings (:com.github.ivarref.yoltq/bindings job#) - (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) - (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! @yq/*config*))] - (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) - (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) - (if (:retval res#) - (:retval res#) - (:exception res#)))) + (let [prep# (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#))] + (if-let [depends-on# (i/depends-on-waiting? @yq/*config* prep#)] + depends-on# + (let [res# (some->> prep# + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) + (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#)))))) (catch Throwable t# (log/error t# "unexpected error in consume-expect:" (ex-message t#)) (throw t#))) diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index 9501343..d551510 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -56,6 +56,7 @@ (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) (dissoc :db/id) (update :com.github.ivarref.yoltq/payload edn/read-string) + (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {}))) (update :com.github.ivarref.yoltq/bindings (fn [s] (when s diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index df56460..5427ff5 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -35,7 +35,7 @@ (defn put-transact! [id payload] - @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload)])) + @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload {})])) (defn advance! [tp] diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 2b67e5e..789e5b4 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -284,3 +284,30 @@ (is (= 1 (tq/consume! :q))) (is (= 2 (tq/force-retry! :q))) (is (= 3 (tq/force-retry! :q))))) + + +(deftest ext-id-no-duplicates + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q nil {:id "123"})]) + (is (thrown? Exception @(d/transact conn [(yq/put :q nil {:id "123"})]))))) + + +(deftest depends-on + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity) + @(d/transact conn [(yq/put :a "a" {:id "1"})]) + (is (thrown? Exception @(d/transact conn [(yq/put :b "b" {:depends-on [:a "0"]})]))) + @(d/transact conn [(yq/put :b "b" {:depends-on [:a "1"]})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "1"]} (tq/consume! :b))) + (is (= {:depends-on [:a "1"]} (tq/consume! :b))) + + (is (= "a" (tq/consume! :a))) + (is (= "b" (tq/consume! :b))) + (is (= "b" (tq/force-retry! :b))))) + -- cgit v1.2.3 From cc9cc0ed52ca2d4fa82f2fe7dc5f17e61ced26f4 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 23 Sep 2021 13:12:12 +0200 Subject: Basic depends-on seems to work --- src/com/github/ivarref/yoltq/impl.clj | 5 +++-- src/com/github/ivarref/yoltq/poller.clj | 8 +++++--- src/com/github/ivarref/yoltq/report_queue.clj | 10 ++++++---- 3 files changed, 14 insertions(+), 9 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 9811c93..a315545 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -60,9 +60,9 @@ (defn depends-on-waiting? [{:keys [conn]} - {:keys [id]}] + q-item] (let [db (d/db conn)] - (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db id)] + (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db (:id q-item))] (when-let [[q id :as depends-on] (:depends-on opts)] (when-not (d/q '[:find ?e . :in $ ?ext-id @@ -71,6 +71,7 @@ [?e :com.github.ivarref.yoltq/status :done]] db (pr-str [q id])) + (log/info "queue item" (str (:id q-item)) "is waiting on" depends-on) {:depends-on depends-on}))))) diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj index 1f4e65d..28b158f 100644 --- a/src/com/github/ivarref/yoltq/poller.clj +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -10,9 +10,11 @@ :error (u/get-error cfg q) :hung (u/get-hung cfg q))] (with-bindings (get item :bindings {}) - (some->> item - (i/take! cfg) - (i/execute! cfg))))) + (if (i/depends-on-waiting? cfg item) + nil + (some->> item + (i/take! cfg) + (i/execute! cfg)))))) (defn poll-queue! [running? diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index c6559bf..20e0a93 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -21,10 +21,12 @@ (try (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name bindings]} (u/get-queue-item db-after id)] (with-bindings (or bindings {}) - (some->> - (u/prepare-processing db-after id queue-name lock status) - (i/take! cfg) - (i/execute! cfg)))) + (if (i/depends-on-waiting? cfg {:id id}) + nil + (some->> + (u/prepare-processing db-after id queue-name lock status) + (i/take! cfg) + (i/execute! cfg))))) (catch Throwable t (log/error t "unexpected error in process-poll-result!"))))))))) -- cgit v1.2.3 From f2bc137283616b46aad9519cacade93969af3fdb Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 24 Sep 2021 10:42:56 +0200 Subject: Be paranoid when persisting with pr-str --- src/com/github/ivarref/yoltq/impl.clj | 23 +++++++++++++++++------ test/com/github/ivarref/yoltq/virtual_test.clj | 8 ++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index a315545..adc169d 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -3,7 +3,8 @@ [clojure.tools.logging :as log] [clojure.string :as str] [com.github.ivarref.yoltq.utils :as u] - [com.github.ivarref.yoltq.ext-sys :as ext])) + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.edn :as edn])) (def schema @@ -22,6 +23,16 @@ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) +(defn pr-str-safe [what x] + (try + (if (= x (edn/read-string (pr-str x))) + (pr-str x) + (throw (ex-info (str "Could not read-string " what) {:input x}))) + (catch Exception e + (log/error "could not read-string" what ":" (ex-message e)) + (throw e)))) + + (defn put [{:keys [capture-bindings conn] :as config} queue-name payload @@ -32,15 +43,15 @@ (assoc o (symbol k) (deref k))) {} (or capture-bindings [])) - (pr-str))] + (pr-str-safe :capture-bindings))] (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) (merge {:com.github.ivarref.yoltq/id id :com.github.ivarref.yoltq/queue-name queue-name :com.github.ivarref.yoltq/status u/status-init - :com.github.ivarref.yoltq/payload (pr-str payload) + :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload) :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/opts (pr-str (or opts {})) + :com.github.ivarref.yoltq/opts (pr-str-safe :opts (or opts {})) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 :com.github.ivarref.yoltq/init-time (u/now-ns)} @@ -50,10 +61,10 @@ :where [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] (d/db conn) - (pr-str [q ext-id])) + (pr-str-safe :depends-on [q ext-id])) (throw (ex-info ":depends-on not found in database" opts)))) (when-let [ext-id (:id opts)] - {:com.github.ivarref.yoltq/ext-id (pr-str [queue-name ext-id])}))) + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 93ad0b6..fdbf6b3 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -307,3 +307,11 @@ (is (= {:id "a1"} (tq/consume! :a))) (is (= {:id "b1"} (tq/consume! :b))))) + + +(deftest verify-can-read-string + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})])))))) -- cgit v1.2.3 From e142149a4282a669f3f95cb52f708d234a8ded23 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 24 Sep 2021 10:59:06 +0200 Subject: Support :depends-on on queue level --- src/com/github/ivarref/yoltq/impl.clj | 11 ++++++++--- test/com/github/ivarref/yoltq/virtual_test.clj | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index adc169d..50441ff 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -37,8 +37,9 @@ queue-name payload opts] - (if-let [_ (get-in config [:handlers queue-name])] + (if-let [q-config (get-in config [:handlers queue-name])] (let [id (u/squuid) + depends-on (get q-config :depends-on (fn [_] nil)) str-bindings (->> (reduce (fn [o k] (assoc o (symbol k) (deref k))) {} @@ -51,11 +52,15 @@ :com.github.ivarref.yoltq/status u/status-init :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload) :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/opts (pr-str-safe :opts (or opts {})) + :com.github.ivarref.yoltq/opts (pr-str-safe :opts + (merge + (when-let [deps (depends-on payload)] + {:depends-on deps}) + (or opts {}))) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 :com.github.ivarref.yoltq/init-time (u/now-ns)} - (when-let [[q ext-id] (:depends-on opts)] + (when-let [[q ext-id] (or (:depends-on opts) (depends-on payload))] (when-not (d/q '[:find ?e . :in $ ?ext-id :where diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index fdbf6b3..3f7365f 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -309,6 +309,21 @@ (is (= {:id "b1"} (tq/consume! :b))))) +(deftest depends-on-queue-level + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity {:depends-on (fn [{:keys [id]}] [:a id])}) + @(d/transact conn [(yq/put :a {:id "1"} {:id "1"})]) + @(d/transact conn [(yq/put :b {:id "1"})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "1"]} (tq/consume! :b))) + + (is (= {:id "1"} (tq/consume! :a))) + (is (= {:id "1"} (tq/consume! :b))))) + + (deftest verify-can-read-string (let [conn (u/empty-conn)] (yq/init! {:conn conn}) -- cgit v1.2.3 From 384a3b72eeb6f4b00a70b8eeeeeab1934288485e Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Mon, 27 Sep 2021 08:35:27 +0200 Subject: Better error reporting --- src/com/github/ivarref/yoltq/impl.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 50441ff..f97dcc4 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -67,7 +67,7 @@ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] (d/db conn) (pr-str-safe :depends-on [q ext-id])) - (throw (ex-info ":depends-on not found in database" opts)))) + (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) (when-let [ext-id (:id opts)] {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))) (do -- cgit v1.2.3 From c62632771b11736eac616e00d576c349e54b6a73 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Mon, 27 Sep 2021 09:39:54 +0200 Subject: Simplify --- src/com/github/ivarref/yoltq/impl.clj | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index f97dcc4..02cc102 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -40,6 +40,10 @@ (if-let [q-config (get-in config [:handlers queue-name])] (let [id (u/squuid) depends-on (get q-config :depends-on (fn [_] nil)) + opts (merge + (when-let [deps (depends-on payload)] + {:depends-on deps}) + (or opts {})) str-bindings (->> (reduce (fn [o k] (assoc o (symbol k) (deref k))) {} @@ -52,15 +56,11 @@ :com.github.ivarref.yoltq/status u/status-init :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload) :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/opts (pr-str-safe :opts - (merge - (when-let [deps (depends-on payload)] - {:depends-on deps}) - (or opts {}))) + :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 :com.github.ivarref.yoltq/init-time (u/now-ns)} - (when-let [[q ext-id] (or (:depends-on opts) (depends-on payload))] + (when-let [[q ext-id] (:depends-on opts)] (when-not (d/q '[:find ?e . :in $ ?ext-id :where -- cgit v1.2.3 From 79acba1b716685bb601e05a2e9824eefd19d1f5d Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Mon, 27 Sep 2021 14:36:24 +0200 Subject: Add :valid-payload? function --- README.md | 2 ++ src/com/github/ivarref/yoltq/impl.clj | 4 ++++ test/com/github/ivarref/yoltq/virtual_test.clj | 11 +++++++++++ 3 files changed, 17 insertions(+) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/README.md b/README.md index 314c779..7e49431 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,8 @@ the payload. It can be added like this: ; An optional map of queue opts {:allow-cas-failure? true ; Treat [:db.cas ...] failures as success. This is one way for the ; consumer function to ensure idempotence. + :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload. Should return truthy for valid payloads. + ; The default function always returns true. :max-retries 10}) ; Specify maximum number of times an item will be retried. Default: 100 ``` diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 02cc102..8b75fc3 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -40,6 +40,7 @@ (if-let [q-config (get-in config [:handlers queue-name])] (let [id (u/squuid) depends-on (get q-config :depends-on (fn [_] nil)) + valid-payload? (get q-config :valid-payload? (fn [_] true)) opts (merge (when-let [deps (depends-on payload)] {:depends-on deps}) @@ -49,6 +50,9 @@ {} (or capture-bindings [])) (pr-str-safe :capture-bindings))] + (when-not (valid-payload? payload) + (log/error "Payload was not valid. Payload was:" payload) + (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) (merge {:com.github.ivarref.yoltq/id id diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 8f7b454..acd3eb7 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -331,3 +331,14 @@ (yq/add-consumer! :a identity) (timbre/with-level :fatal (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})])))))) + + +(deftest payload-verifier + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity + {:valid-payload? (fn [{:keys [id]}] + (some? id))}) + @(d/transact conn [(yq/put :q {:id "a"})]) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) \ No newline at end of file -- cgit v1.2.3 From 6c26a3b6871286510bb8e9770ee7f7e3abf97abe Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sun, 27 Mar 2022 18:39:44 +0200 Subject: Start use current millis in the database, not nano offset --- .gitignore | 3 +- README.md | 38 ++++++++++++++-------- deps.edn | 2 +- src/com/github/ivarref/yoltq.clj | 8 ++--- src/com/github/ivarref/yoltq/error_poller.clj | 10 +++--- src/com/github/ivarref/yoltq/ext_sys.clj | 13 ++++---- src/com/github/ivarref/yoltq/impl.clj | 14 ++++---- src/com/github/ivarref/yoltq/poller.clj | 19 +++++++---- .../ivarref/yoltq/slow_executor_detector.clj | 4 +-- src/com/github/ivarref/yoltq/test_queue.clj | 2 +- src/com/github/ivarref/yoltq/utils.clj | 19 ++++++----- test/com/github/ivarref/yoltq/test_utils.clj | 9 ++--- 12 files changed, 81 insertions(+), 60 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/.gitignore b/.gitignore index cb9a7ca..c82fdd7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ tree.txt .stage-url.txt *.pom.asc *.pom -temp/ \ No newline at end of file +temp/ +.clj-kondo/ diff --git a/README.md b/README.md index 45ba8c4..9c5669c 100644 --- a/README.md +++ b/README.md @@ -333,12 +333,21 @@ easier. ## Change log -### 2022-03-27 [v0.2.41](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) +### 20..-..-.. vHEAD [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...HEAD) +* Critical bugfix. +``` +Started using (System/currentTimeMillis) and not (System/nanoTime) +when storing time in the database. +``` + +* Bump Clojure to `1.11.0`. + +### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) * Added function `healthy?` that returns: ``` - true if no errors - false if one or more errors - nil if error-poller is yet to be executed. + true if no errors + false if one or more errors + nil if error-poller is yet to be executed. ``` * Added default functions for `:on-system-error` and `:on-system-recovery` @@ -348,22 +357,23 @@ easier. * Added function `queue-stats` that returns a nicely "formatted" vector of queue stats, for example: ``` -(queue-stats) -=> -[{:qname :add-message-thread, :status :done, :count 10274} - {:qname :add-message-thread, :status :init, :count 30} - {:qname :add-message-thread, :status :processing, :count 1} - {:qname :send-message, :status :done, :count 21106} - {:qname :send-message, :status :init, :count 56}] + (queue-stats) + => + [{:qname :add-message-thread, :status :done, :count 10274} + {:qname :add-message-thread, :status :init, :count 30} + {:qname :add-message-thread, :status :processing, :count 1} + {:qname :send-message, :status :done, :count 21106} + {:qname :send-message, :status :init, :count 56}] ``` -### 2021-09-27 [v0.2.39](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) +### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) Added `:valid-payload?` option for queue consumers. -### 2021-09-27 [v0.2.37](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) +### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) Improved error reporting. -### 2021-09-24 v0.2.33: First publicly announced release. +### 2021-09-24 v0.2.33 +First publicly announced release. ## License diff --git a/deps.edn b/deps.edn index d0f0a26..8e769e1 100644 --- a/deps.edn +++ b/deps.edn @@ -1,5 +1,5 @@ {:deps {org.clojure/tools.logging {:mvn/version "1.1.0"} - org.clojure/clojure {:mvn/version "1.10.3"}} + org.clojure/clojure {:mvn/version "1.11.0"}} :paths ["src"] diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 03a364f..17aa40a 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -19,7 +19,7 @@ (def default-opts (-> {; Default number of times a queue job will be retried before giving up - ; Can be overridden on a per consumer basis with + ; Can be overridden on a per-consumer basis with ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200}) :max-retries 100 @@ -34,7 +34,7 @@ :hung-backoff-time (Duration/ofMinutes 30) ; Most queue jobs in init state will be consumed by the tx-report-queue listener. - ; However in the case where a init job was added right before the application + ; However, in the case where an init job was added right before the application ; was shut down and did not have time to be processed by the tx-report-queue listener, ; it will be consumer by the init poller. This init poller backs off by ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could @@ -66,7 +66,7 @@ ; How often should the system invoke :system-error-callback-backoff (Duration/ofHours 1)} - u/duration->nanos)) + u/duration->millis)) (defn init! [{:keys [conn] :as cfg}] @@ -83,7 +83,7 @@ default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) cfg) - u/duration->nanos)))] + u/duration->millis)))] new-cfg))) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index 1268482..ee6359e 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -22,7 +22,7 @@ state :recovery}} {:keys [system-error-min-count system-error-callback-backoff] :or {system-error-min-count 3}} - now-ns + now-ms error-count] (let [new-errors (->> (conj errors error-count) (take-last system-error-min-count) @@ -50,14 +50,14 @@ (when (and (= old-state :recovery) (= new-state :error)) {:run-callback :error - :last-notify now-ns}) + :last-notify now-ms}) (when (and (= new-state :error) (= old-state :error) - (> now-ns + (> now-ms (+ last-notify system-error-callback-backoff))) {:run-callback :error - :last-notify now-ns}) + :last-notify now-ms}) (when (and (= new-state :recovery) (= old-state :error)) @@ -88,7 +88,7 @@ (log/debug "poll-errors found" error-count "errors in system") (reset! healthy? false)) (reset! healthy? true)) - (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)] (when run-callback (cond (= run-callback :error) (on-system-error) diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj index 3480475..692b934 100644 --- a/src/com/github/ivarref/yoltq/ext_sys.clj +++ b/src/com/github/ivarref/yoltq/ext_sys.clj @@ -1,17 +1,18 @@ (ns com.github.ivarref.yoltq.ext-sys (:require [datomic.api :as d]) + (:refer-clojure :exclude [random-uuid]) (:import (java.util UUID))) -(def ^:dynamic *now-ns-atom* nil) +(def ^:dynamic *now-ms-atom* nil) (def ^:dynamic *squuid-atom* nil) (def ^:dynamic *random-atom* nil) -(defn now-ns [] - (if *now-ns-atom* - @*now-ns-atom* - (System/nanoTime))) +(defn now-ms [] + (if *now-ms-atom* + @*now-ms-atom* + (System/currentTimeMillis))) (defn squuid [] @@ -23,4 +24,4 @@ (defn random-uuid [] (if *random-atom* (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc)))) - (UUID/randomUUID))) \ No newline at end of file + (UUID/randomUUID))) diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 8b75fc3..b4eef8d 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -63,7 +63,7 @@ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ns)} + :com.github.ivarref.yoltq/init-time (u/now-ms)} (when-let [[q ext-id] (:depends-on opts)] (when-not (d/q '[:find ?e . :in $ ?ext-id @@ -138,8 +138,8 @@ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status] (if (= new-status u/status-done) - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)} - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)} + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})] start-time (System/nanoTime) {:keys [db-after]} @(d/transact conn tx)] (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) @@ -171,7 +171,7 @@ (log/debug "queue item" (str id) "for queue" queue-name "is now processing") (let [{:keys [retval exception]} (try - (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name]) + (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) (let [v (f payload)] {:retval v}) (catch Throwable t @@ -188,7 +188,7 @@ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :retval retval :success? true :allow-cas-failure? true))) (some? exception) @@ -198,14 +198,14 @@ (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :exception exception))) :else (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :retval retval :success? true)))))) (do (log/error "no handler for queue" queue-name) diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj index 28b158f..9cf81c7 100644 --- a/src/com/github/ivarref/yoltq/poller.clj +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -25,17 +25,16 @@ (if-not (contains? old q) (try (log/debug "polling queue" queue-name "for status" status) - (let [start-time (u/now-ns) + (let [start-time (u/now-ms) last-res (loop [prev-res nil] (when @running? (let [res (poll-once! cfg queue-name status)] + (log/debug "poll-once! returned" res) (if (and res (:success? res)) (recur res) prev-res))))] - (let [spent-ns (- (u/now-ns) start-time)] - (log/trace "done polling queue" q "in" - (format "%.1f" (double (/ spent-ns 1e6))) - "ms")) + (let [spent-ms (- (u/now-ms) start-time)] + (log/trace "done polling queue" q "in" spent-ms "ms")) last-res) (finally (swap! running-queues disj q))) @@ -44,6 +43,14 @@ (log/error t "poll-queue! crashed:" (ex-message t))) (finally))) +(comment + (def cfg @com.github.ivarref.yoltq/*config*)) + +(comment + (poll-queue! + (atom true) + @com.github.ivarref.yoltq/*config* + [:add-message-thread :init])) (defn poll-all-queues! [running? config-atom pool] (try @@ -54,4 +61,4 @@ [q-name status])))] (.execute pool (fn [] (poll-queue! running? @config-atom q)))))) (catch Throwable t - (log/error t "poll-all-queues! crashed:" (ex-message t))))) \ No newline at end of file + (log/error t "poll-all-queues! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj index f15ef7d..80d3718 100644 --- a/src/com/github/ivarref/yoltq/slow_executor_detector.clj +++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj @@ -7,7 +7,7 @@ (defn- do-show-slow-threads [{:keys [start-execute-time max-execute-time]}] (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time] - (when (> (ext/now-ns) (+ start-time max-execute-time)) + (when (> (ext/now-ms) (+ start-time max-execute-time)) (log/error "thread" (.getName thread) "spent too much time on" "queue item" (str queue-id) "for queue" queue-name @@ -25,4 +25,4 @@ (dotimes [_ 3] (when @running? (Thread/sleep 1000)))) (catch Throwable t - (log/error t "reap! crashed:" (ex-message t))))) \ No newline at end of file + (log/error t "reap! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index 6183216..ee9cd54 100644 --- a/src/com/github/ivarref/yoltq/test_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -47,7 +47,7 @@ (with-bindings {#'yq/*config* config# #'yq/*running?* (atom false) #'yq/*test-mode* true - #'ext/*now-ns-atom* (atom 0) + #'ext/*now-ms-atom* (atom 0) #'ext/*random-atom* (atom 0) #'ext/*squuid-atom* (atom 0)} (try diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index d551510..ad2444a 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -3,6 +3,7 @@ [clojure.edn :as edn] [com.github.ivarref.yoltq.ext-sys :as ext] [clojure.tools.logging :as log]) + (:refer-clojure :exclude [random-uuid]) (:import (datomic Connection) (java.time Duration))) @@ -13,10 +14,10 @@ (def status-error :error) -(defn duration->nanos [m] +(defn duration->millis [m] (reduce-kv (fn [o k v] (if (instance? Duration v) - (assoc o k (.toNanos v)) + (assoc o k (.toMillis v)) (assoc o k v))) {} m)) @@ -30,8 +31,8 @@ (ext/random-uuid)) -(defn now-ns [] - (ext/now-ns)) +(defn now-ms [] + (ext/now-ms)) (defn root-cause [e] @@ -75,7 +76,7 @@ :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]})) + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]})) (defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name] @@ -94,11 +95,11 @@ [?e :com.github.ivarref.yoltq/lock ?lock]] db queue-name - (- (now-ns) init-backoff-time)) + (- (now-ms) init-backoff-time)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] (prepare-processing db id queue-name old-lock :init)) - (log/trace "no new-items in :init status for queue" queue-name)))) + (log/debug "no new-items in :init status for queue" queue-name)))) (defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name] @@ -120,7 +121,7 @@ [?e :com.github.ivarref.yoltq/lock ?lock]] db queue-name - (- (now-ns) error-backoff-time) + (- (now-ms) error-backoff-time) (inc max-retries)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] @@ -131,7 +132,7 @@ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) - (let [now (or now (now-ns)) + (let [now (or now (now-ms)) max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries) db (or db (d/db conn))] (when-let [ids (->> (d/q '[:find ?id ?lock ?tries diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index 5427ff5..e4151c2 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -8,7 +8,8 @@ [com.github.ivarref.yoltq.impl :as i] [clojure.edn :as edn] [com.github.ivarref.yoltq.ext-sys :as ext]) - (:import (java.util UUID))) + (:import (java.util UUID) + (java.time Duration))) (logconfig/init-logging! @@ -39,10 +40,10 @@ (defn advance! [tp] - (assert (some? ext/*now-ns-atom*) "Expected to be running in test-mode!") - (swap! ext/*now-ns-atom* + (if (number? tp) + (assert (some? ext/*now-ms-atom*) "Expected to be running in test-mode!") + (swap! ext/*now-ms-atom* + (if (number? tp) tp - (.toNanos tp)))) + (.toMillis ^Duration tp)))) (defn done-count [] -- cgit v1.2.3 From 41c9e08d63176cf7c239574d1d07f2b302a2d3ec Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sun, 27 Mar 2022 21:33:00 +0200 Subject: Fix use current millis in the database, not nano offset --- src/com/github/ivarref/yoltq.clj | 11 ++- src/com/github/ivarref/yoltq/impl.clj | 6 +- src/com/github/ivarref/yoltq/migrate.clj | 58 ++++++++++++++++ test/com/github/ivarref/yoltq/migrate_test.clj | 92 ++++++++++++++++++++++++++ test/com/github/ivarref/yoltq/test_utils.clj | 7 +- test/com/github/ivarref/yoltq/virtual_test.clj | 14 +++- 6 files changed, 180 insertions(+), 8 deletions(-) create mode 100644 src/com/github/ivarref/yoltq/migrate.clj create mode 100644 test/com/github/ivarref/yoltq/migrate_test.clj (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 17aa40a..1a60a45 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -6,6 +6,7 @@ [com.github.ivarref.yoltq.poller :as poller] [com.github.ivarref.yoltq.error-poller :as errpoller] [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] + [com.github.ivarref.yoltq.migrate :as migrate] [com.github.ivarref.yoltq.utils :as u]) (:import (datomic Connection) (java.util.concurrent Executors TimeUnit ExecutorService) @@ -64,7 +65,11 @@ :system-error-poll-delay (Duration/ofMinutes 1) ; How often should the system invoke - :system-error-callback-backoff (Duration/ofHours 1)} + :system-error-callback-backoff (Duration/ofHours 1) + + ; Should old, possibly stalled jobs be automatically be migrated + ; as part of `start!`? + :auto-migrate? true} u/duration->millis)) @@ -104,7 +109,9 @@ (defn- do-start! [] - (let [{:keys [poll-delay pool-size system-error-poll-delay]} @*config*] + (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate?] :as cfg} @*config*] + (when auto-migrate? + (migrate/migrate! cfg)) (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size))) (let [pool @threadpool queue-listener-ready (promise)] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index b4eef8d..6b14ffc 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -20,7 +20,8 @@ #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long} #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long} #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long} - #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) + #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}]) (defn pr-str-safe [what x] @@ -63,7 +64,8 @@ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ms)} + :com.github.ivarref.yoltq/init-time (u/now-ms) + :com.github.ivarref.yoltq/version "2"} (when-let [[q ext-id] (:depends-on opts)] (when-not (d/q '[:find ?e . :in $ ?ext-id diff --git a/src/com/github/ivarref/yoltq/migrate.clj b/src/com/github/ivarref/yoltq/migrate.clj new file mode 100644 index 0000000..89fc286 --- /dev/null +++ b/src/com/github/ivarref/yoltq/migrate.clj @@ -0,0 +1,58 @@ +(ns com.github.ivarref.yoltq.migrate + (:require [datomic.api :as d] + [clojure.tools.logging :as log])) + +(defn to->v2-ent [{:keys [conn]} now-ms id] + (log/info "Migrating id" id) + (let [attr-val (fn [attr] + (when-let [old (d/q '[:find ?time . + :in $ ?e ?a + :where + [?e ?a ?time]] + (d/db conn) + [:com.github.ivarref.yoltq/id id] + attr)] + (let [now-ms (or now-ms + (.getTime (d/q '[:find (max ?txinst) . + :in $ ?e ?a + :where + [?e ?a _ ?tx true] + [?tx :db/txInstant ?txinst]] + (d/history (d/db conn)) + [:com.github.ivarref.yoltq/id id] + attr)))] + (log/info "Updating" id attr "to" now-ms) + [[:db/cas [:com.github.ivarref.yoltq/id id] + attr old now-ms]])))] + (vec (concat [[:db/cas [:com.github.ivarref.yoltq/id id] + :com.github.ivarref.yoltq/version nil "2"]] + (mapcat attr-val [:com.github.ivarref.yoltq/init-time + :com.github.ivarref.yoltq/processing-time + :com.github.ivarref.yoltq/done-time + :com.github.ivarref.yoltq/error-time]))))) + +(defn to->v2 [{:keys [conn loop? now-ms] + :or {loop? true} + :as cfg}] + (loop [tx-vec []] + (if-let [id (some->> (d/q '[:find [?id ...] + :in $ + :where + [?e :com.github.ivarref.yoltq/id ?id] + [(missing? $ ?e :com.github.ivarref.yoltq/version)]] + (d/db conn)) + (sort) + (not-empty) + (first))] + (let [tx (to->v2-ent cfg now-ms id)] + @(d/transact conn tx) + (if loop? + (recur (conj tx-vec tx)) + tx)) + (do + (log/info "No items left to migrate") + tx-vec)))) + + +(defn migrate! [cfg] + (to->v2 cfg)) diff --git a/test/com/github/ivarref/yoltq/migrate_test.clj b/test/com/github/ivarref/yoltq/migrate_test.clj new file mode 100644 index 0000000..0063631 --- /dev/null +++ b/test/com/github/ivarref/yoltq/migrate_test.clj @@ -0,0 +1,92 @@ +(ns com.github.ivarref.yoltq.migrate-test + (:require [clojure.test :refer [deftest is]] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.migrate :as m] + [com.github.ivarref.yoltq.impl :as impl] + [com.github.ivarref.yoltq.test-utils :as tu] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d])) + + +(deftest to-v2-migration + (with-bindings {#'ext/*squuid-atom* (atom 0)} + (let [conn (tu/empty-conn)] + @(d/transact conn impl/schema) + @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid) + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-processing + :com.github.ivarref.yoltq/init-time 1 + :com.github.ivarref.yoltq/processing-time 2}]) + @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid) + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/init-time 3}]) + (is (= [[[:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/version + nil + "2"] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/init-time + 1 + 1000] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/processing-time + 2 + 1000]] + [[:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000002"] + :com.github.ivarref.yoltq/version + nil + "2"] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000002"] + :com.github.ivarref.yoltq/init-time + 3 + 1000]]] + (m/migrate! {:conn conn + :now-ms 1000 + :loop? true}))) + (is (= [] + (m/migrate! {:conn conn + :now-ms 1000 + :loop? true})))))) + + +(deftest to-v2-migration-real-time + (with-bindings {#'ext/*squuid-atom* (atom 0)} + (let [conn (tu/empty-conn) + id (u/squuid)] + @(d/transact conn impl/schema) + @(d/transact conn [{:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/init-time 1}]) + (Thread/sleep 100) + @(d/transact conn [{:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/init-time 2}]) + (let [tx-times (->> (d/q '[:find [?txinst ...] + :in $ ?e + :where + [?e :com.github.ivarref.yoltq/init-time _ ?tx true] + [?tx :db/txInstant ?txinst]] + (d/history (d/db conn)) + [:com.github.ivarref.yoltq/id id]) + (sort) + (vec))] + (is (= 2 (count tx-times))) + (m/migrate! {:conn conn}) + (is (= (.getTime (last tx-times)) + (d/q '[:find ?init-time . + :in $ ?e + :where + [?e :com.github.ivarref.yoltq/init-time ?init-time]] + (d/db conn) + [:com.github.ivarref.yoltq/id id]))))))) diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index e4151c2..0c1b2f0 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -7,7 +7,8 @@ [clojure.string :as str] [com.github.ivarref.yoltq.impl :as i] [clojure.edn :as edn] - [com.github.ivarref.yoltq.ext-sys :as ext]) + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.pprint :as pp]) (:import (java.util UUID) (java.time Duration))) @@ -54,6 +55,10 @@ (d/db (:conn @yq/*config*)))) +(defn pp [x] + (pp/pprint x) + x) + (defn get-init [& args] (apply u/get-init @yq/*config* args)) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index acd3eb7..34c9026 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -1,6 +1,6 @@ (ns com.github.ivarref.yoltq.virtual-test (:require [datomic-schema.core] - [clojure.test :refer :all] + [clojure.test :refer [use-fixtures deftest is] :refer-macros [thrown?]] [com.github.ivarref.yoltq.test-queue :as tq] [com.github.ivarref.yoltq.test-utils :as u] [datomic.api :as d] @@ -8,7 +8,8 @@ [clojure.tools.logging :as log] [com.github.ivarref.yoltq.impl :as i] [com.github.ivarref.yoltq :as yq] - [taoensso.timbre :as timbre])) + [taoensso.timbre :as timbre] + [com.github.ivarref.yoltq.migrate :as migrate])) (use-fixtures :each tq/call-with-virtual-queue!) @@ -21,6 +22,13 @@ @(d/transact conn [(yq/put :q {:work 123})]) (is (= {:work 123} (tq/consume! :q))))) +(deftest happy-case-no-migration-for-new-entities + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))) + (is (= [] (migrate/migrate! @yq/*config*))))) (deftest happy-case-tx-report-q (let [conn (u/empty-conn)] @@ -341,4 +349,4 @@ (some? id))}) @(d/transact conn [(yq/put :q {:id "a"})]) (timbre/with-level :fatal - (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) \ No newline at end of file + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) -- cgit v1.2.3 From 4fd3c882e5dbe905711d4aaf8f0e4fe52369cef7 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Thu, 30 Jun 2022 09:52:49 +0200 Subject: Release 0.2.58 Document limitations, alternatives. Improve pr-str usage --- README.md | 48 +++++++++++++++++++++++++++++++++-- pom.xml | 4 +-- src/com/github/ivarref/yoltq/impl.clj | 12 +++++++-- 3 files changed, 58 insertions(+), 6 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/README.md b/README.md index c62328c..ade8650 100644 --- a/README.md +++ b/README.md @@ -211,7 +211,7 @@ is shut down abruptly during processing of queue jobs. ### Giving up A queue job will remain in status `:error` once `:max-retries` (default: 100) have been reached. -Ideally this will not happen. +Ideally this will not happen. ¯\\\_(ツ)\_/¯ ### All configuration options @@ -285,6 +285,22 @@ Other than this there is no attempt at ordering the execution of queue jobs. In fact the opposite is done in the poller to guard against the case that a single failing queue job could effectively take down the entire retry polling job. +## Retrying jobs in the REPL + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) + +; List jobs that are in state error: +(yq/get-errors :q) + +; This will retry a single job that is in error, regardless +; of how many times it has been retried earlier. +; If the job fails, you will get the full stacktrace on the REPL. +(yq/retry-one-error! :q) +; Returns a map containing the new state of the job. +; Returns nil if there are no (more) jobs in state error for this queue. +``` + # Testing For testing you will probably want determinism over an extra threadpool @@ -331,8 +347,35 @@ These dynamic bindings will be in place when yoltq logs errors, warnings etc. about failing consumer functions, possibly making troubleshooting easier. +## Limitations + +Datomic does not have anything like `for update skip locked`. +Thus consuming a queue should be limited to a single JVM process. +This library will take queue jobs by compare-and-swapping a lock+state, +process the item and then compare-and-swapping the lock+new-state. +It does so eagerly, thus if you have multiple JVM consumers you will +most likely get many locking conflicts. It should work, but it's far +from optimal. + +## Alternatives + +I did not find any alternatives for Datomic. + +If I were using PostgreSQL or any other database that supports +`for update skip locked`, I'd use a queue that uses this. +For Clojure there is [proletarian](https://github.com/msolli/proletarian). + +For Redis there is [carmine](https://github.com/ptaoussanis/carmine). + +Note: I have not tried these libraries myself. + ## Change log +#### 2022-06-30 v0.2.58 [diff](https://github.com/ivarref/yoltq/compare/v0.2.57...v0.2.58) +Slightly more safe EDN printing and parsing. +Recommended reading: +[Pitfalls and bumps in Clojure's Extensible Data Notation (EDN)](https://nitor.com/en/articles/pitfalls-and-bumps-clojures-extensible-data-notation-edn) + #### 2022-06-29 v0.2.57 [diff](https://github.com/ivarref/yoltq/compare/v0.2.56...v0.2.57) Added `(get-errors qname)` and `(retry-one-error! qname)`. @@ -341,6 +384,7 @@ Improved: This was done in order to push new code while a queue was in error in an earlier version of the code. In this way rolling upgrades are possible regardless if there are queue errors. +Can you tell that this issue hit me? ¯\\\_(ツ)\_/¯ #### 2022-06-22 v0.2.56 [diff](https://github.com/ivarref/yoltq/compare/v0.2.55...v0.2.56) Added support for `:yoltq/queue-id` metadata on functions. I.e. it's possible to write @@ -420,7 +464,7 @@ First publicly announced release. ## License -Copyright © 2021 Ivar Refsdal +Copyright © 2021-2022 Ivar Refsdal This program and the accompanying materials are made available under the terms of the Eclipse Public License 2.0 which is available at diff --git a/pom.xml b/pom.xml index 2d992c9..cb293b7 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ jar com.github.ivarref yoltq - 0.2.57 + 0.2.58 yoltq @@ -30,7 +30,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.57 + v0.2.58 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 6b14ffc..c37b0e6 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -23,11 +23,19 @@ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long} #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}]) +(defn pr-str-inner [x] + (binding [*print-dup* false + *print-meta* false + *print-readably* true + *print-length* nil + *print-level* nil + *print-namespace-maps* false] + (pr-str x))) (defn pr-str-safe [what x] (try - (if (= x (edn/read-string (pr-str x))) - (pr-str x) + (if (= x (edn/read-string (pr-str-inner x))) + (pr-str-inner x) (throw (ex-info (str "Could not read-string " what) {:input x}))) (catch Exception e (log/error "could not read-string" what ":" (ex-message e)) -- cgit v1.2.3 From 8f945d8c0189ad73d862c988faa511e0a7b017df Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 18 Nov 2022 14:12:50 +0100 Subject: Release 0.2.63: Add support for :encode and :decode function. Add :partition-fn. Fixes #1 --- README.md | 65 ++++++++++++++++- deps.edn | 6 +- pom.xml | 9 ++- src/com/github/ivarref/yoltq/impl.clj | 95 +++++++++++++++---------- src/com/github/ivarref/yoltq/utils.clj | 1 - test/com/github/ivarref/yoltq/virtual_test.clj | 98 ++++++++++++++++++++++---- 6 files changed, 218 insertions(+), 56 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/README.md b/README.md index 63b9ad3..a914cc7 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,8 @@ Inspecting `(yq/put :q {:work 123})]` you will see something like this: This is the queue job as it will be stored into the database. You can see that the payload, i.e. the second argument of `yq/put`, -is persisted into the database. Thus the payload must be `pr-str`-able. +is persisted into the database. Thus the payload must be `pr-str`-able (unless you have specified +custom `:encode` and `:decode` functions that override this). A queue job will initially have status `:init`. @@ -220,6 +221,47 @@ is shut down abruptly during processing of queue jobs. A queue job will remain in status `:error` once `:max-retries` (default: 100) have been reached. Ideally this will not happen. ¯\\\_(ツ)\_/¯ +### Custom encoding and decoding + +Yoltq will use `pr-str` and `clojure.edn/read-string` by default to encode and decode data. +You may specify `:encode` and `:decode` either globally or per queue to override this behaviour. +The `:encode` function must return a byte array or a string. + +For example if you want to use [nippy](https://github.com/ptaoussanis/nippy): +```clojure +(require '[taoensso.nippy :as nippy]) + +; Globally for all queues: +(yq/init! + {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + +; Or per queue: +(yq/add-consumer! + :q ; Queue to consume + (fn [payload] (println "got payload:" payload)) ; Queue consumer function + {:encode nippy/freeze + :decode nippy/thaw}) ; Queue options, here with :encode and :decode +``` + +### Partitions + +Yoltq supports specifying which [partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions) +queue entities should belong to. +The default function is: +```clojure +(defn default-partition-fn [_queue-name] + (keyword "yoltq" (str "queue_" (.getValue (java.time.Year/now))))) +``` +This is to say that there will be a single partition per year for yoltq. +Yoltq will take care of creating the partition if it does not exist. + +You may override this function, either globally or per queue, with the keyword `:partition-fn`. +E.g.: +```clojure +(yq/init! {:conn conn :partition-fn (fn [_queue-name] :my-partition)}) +``` ### All configuration options @@ -376,8 +418,29 @@ For Redis there is [carmine](https://github.com/ptaoussanis/carmine). Note: I have not tried these libraries myself. +## Other stuff + +If you liked this library, you may also like: + +* [conformity](https://github.com/avescodes/conformity): A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, data, or otherwise. +* [datomic-schema](https://github.com/ivarref/datomic-schema): Simplified writing of Datomic schemas (works with conformity). +* [double-trouble](https://github.com/ivarref/double-trouble): Handle duplicate Datomic transactions with ease. +* [gen-fn](https://github.com/ivarref/gen-fn): Generate Datomic function literals from regular Clojure namespaces. +* [rewriting-history](https://github.com/ivarref/rewriting-history): A library to rewrite Datomic history. + ## Change log +#### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63) +Added custom `:encode` and `:decode` support. + +Added support for specifying `:partifion-fn` to specify which partition a queue item should belong to. +It defaults to: +```clojure +(defn default-partition-fn [_queue-name] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) +``` +Yoltq takes care of creating the partition if it does not exist. + #### 2022-11-15 v0.2.62 [diff](https://github.com/ivarref/yoltq/compare/v0.2.61...v0.2.62) Added function `processing-time-stats`: diff --git a/deps.edn b/deps.edn index 6923881..e36885e 100644 --- a/deps.edn +++ b/deps.edn @@ -1,5 +1,6 @@ -{:deps {org.clojure/tools.logging {:mvn/version "1.2.4"} - org.clojure/clojure {:mvn/version "1.11.1"}} +{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"}} :paths ["src"] @@ -11,6 +12,7 @@ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} :jvm-opts ["-DDISABLE_SPY=true" "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] diff --git a/pom.xml b/pom.xml index 2c11984..463899d 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ jar com.github.ivarref yoltq - 0.2.62 + 0.2.63 yoltq @@ -12,6 +12,11 @@ clojure 1.11.1 + + com.github.ivarref + double-trouble + 0.1.102 + org.clojure tools.logging @@ -30,7 +35,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.62 + v0.2.63 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index c37b0e6..ac573d1 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -1,11 +1,12 @@ (ns com.github.ivarref.yoltq.impl - (:require [datomic.api :as d] - [clojure.tools.logging :as log] + (:require [clojure.edn :as edn] [clojure.string :as str] - [com.github.ivarref.yoltq.utils :as u] + [clojure.tools.logging :as log] + [com.github.ivarref.double-trouble :as dt] [com.github.ivarref.yoltq.ext-sys :as ext] - [clojure.edn :as edn])) - + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) + (:import (java.time Year))) (def schema [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} @@ -13,6 +14,7 @@ #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} @@ -41,13 +43,22 @@ (log/error "could not read-string" what ":" (ex-message e)) (throw e)))) +(defn default-partition-fn [_queue-keyword] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) -(defn put [{:keys [capture-bindings conn] :as config} +(defn put [{:keys [capture-bindings conn encode partition-fn] + :or {partition-fn default-partition-fn + encode (partial pr-str-safe :payload)} + :as config} queue-name payload opts] (if-let [q-config (get-in config [:handlers queue-name])] (let [id (u/squuid) + encode (get q-config :encode encode) + partition-fn (get q-config :partition-fn partition-fn) + partition (partition-fn queue-name) + _ (assert (keyword? partition) "Partition must be a keyword") depends-on (get q-config :depends-on (fn [_] nil)) valid-payload? (get q-config :valid-payload? (fn [_] true)) opts (merge @@ -58,32 +69,41 @@ (assoc o (symbol k) (deref k))) {} (or capture-bindings [])) - (pr-str-safe :capture-bindings))] - (when-not (valid-payload? payload) - (log/error "Payload was not valid. Payload was:" payload) - (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + (pr-str-safe :capture-bindings)) + _ (when-not (valid-payload? payload) + (log/error "Payload was not valid. Payload was:" payload) + (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + encoded (encode payload) + _ (when (not (or (bytes? encoded) (string? encoded))) + (log/error "Payload must be encoded to either a string or a byte array") + (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))] (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) - (merge - {:com.github.ivarref.yoltq/id id - :com.github.ivarref.yoltq/queue-name queue-name - :com.github.ivarref.yoltq/status u/status-init - :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload) - :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) - :com.github.ivarref.yoltq/lock (u/random-uuid) - :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ms) - :com.github.ivarref.yoltq/version "2"} - (when-let [[q ext-id] (:depends-on opts)] - (when-not (d/q '[:find ?e . - :in $ ?ext-id - :where - [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] - (d/db conn) - (pr-str-safe :depends-on [q ext-id])) - (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) - (when-let [ext-id (:id opts)] - {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))) + (do + (dt/ensure-partition! conn partition) + (merge + (if (bytes? encoded) + {:com.github.ivarref.yoltq/payload-bytes encoded} + {:com.github.ivarref.yoltq/payload encoded}) + {:db/id (d/tempid partition) + :com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/bindings str-bindings + :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ms) + :com.github.ivarref.yoltq/version "2"} + (when-let [[q ext-id] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] + (d/db conn) + (pr-str-safe :depends-on [q ext-id])) + (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) + (when-let [ext-id (:id opts)] + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) @@ -169,20 +189,23 @@ "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) -(defn execute! [{:keys [handlers mark-status-fn! start-execute-time collect-spent-time!] - :or {mark-status-fn! mark-status!} +(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!] + :or {mark-status-fn! mark-status! + decode edn/read-string} :as cfg} - {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}] + {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}] (when queue-item (if (= :error status) (assoc queue-item :failed? true) (if-let [queue (get handlers queue-name)] - (let [{:keys [f allow-cas-failure?]} queue] + (let [{:keys [f allow-cas-failure?]} queue + decode (get queue :decode decode)] (log/debug "queue item" (str id) "for queue" queue-name "is now processing") (let [{:keys [retval exception]} (try (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) - (let [v (f payload)] + (let [payload (decode (or payload payload-bytes)) + v (f payload)] {:retval v}) (catch Throwable t {:exception t}) diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index 39572a9..7665b6d 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -57,7 +57,6 @@ (defn get-queue-item [db id] (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) (dissoc :db/id) - (update :com.github.ivarref.yoltq/payload edn/read-string) (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {}))) (update :com.github.ivarref.yoltq/bindings (fn [s] diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 996792e..2800c21 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -1,18 +1,21 @@ (ns com.github.ivarref.yoltq.virtual-test - (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] - [clojure.tools.logging :as log] - [com.github.ivarref.yoltq :as yq] - [com.github.ivarref.yoltq.error-poller :as error-poller] - [com.github.ivarref.yoltq.ext-sys :as ext] - [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq.migrate :as migrate] - [com.github.ivarref.yoltq.test-queue :as tq] - [com.github.ivarref.yoltq.test-utils :as u] - [com.github.ivarref.yoltq.utils :as uu] - [datomic-schema.core] - [datomic.api :as d] - [taoensso.timbre :as timbre]) - (:import (java.time Duration))) + (:require + [clojure.string :as str] + [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.error-poller :as error-poller] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.test-queue :as tq] + [com.github.ivarref.yoltq.test-utils :as u] + [com.github.ivarref.yoltq.utils :as uu] + [datomic-schema.core] + [datomic.api :as d] + [taoensso.nippy :as nippy] + [taoensso.timbre :as timbre]) + (:import (java.time Duration LocalDateTime))) (use-fixtures :each tq/call-with-virtual-queue!) @@ -380,3 +383,70 @@ (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms)))) (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms))))) (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms))))))) + +(deftest global-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest queue-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:encode nippy/freeze + :decode nippy/thaw}) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest global-partition + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :partition-fn (fn [_queue-name] :my-part)}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest partition-per-queue + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:partition-fn (fn [_queue-name] :my-part)}) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest string-encode-decode + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :encode (fn [x] (str/join (reverse x))) + :decode (fn [x] (str/join (reverse x)))}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q "asdf")]) + (tq/consume! :q) + (is (= @got-work "asdf")))) -- cgit v1.2.3 From 85d13545275678a1077b9600fce136ae10dcb809 Mon Sep 17 00:00:00 2001 From: Stefan van den Oord Date: Fri, 14 Jun 2024 16:08:59 +0200 Subject: #3 Add optional batch name to queue jobs --- src/com/github/ivarref/yoltq.clj | 23 +++++++++++++++++++++++ src/com/github/ivarref/yoltq/impl.clj | 5 ++++- test/com/github/ivarref/yoltq/virtual_test.clj | 22 ++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 379d701..1ba286e 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -230,6 +230,29 @@ (sort-by (juxt :qname :status)) (vec)))) +(defn batch-progress [queue-name batch-name] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?bname ?status + :keys :e :qname :bname :status + :in $ ?qname ?bname + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/batch-name ?bname] + [?e :com.github.ivarref.yoltq/status ?status]] + db queue-name batch-name) + (mapv #(select-keys % [:qname :bname :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname bname status]} v]] + (array-map + :qname qname + :batch-name bname + :status status + :count v))) + (sort-by (juxt :qname :batch-name :status)) + (vec)))) + (defn get-errors [qname] (let [{:keys [conn]} @*config* db (d/db conn)] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index ac573d1..6d2aa3d 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -12,6 +12,7 @@ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/batch-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} @@ -103,7 +104,9 @@ (pr-str-safe :depends-on [q ext-id])) (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) (when-let [ext-id (:id opts)] - {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))) + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}) + (when-let [batch-name (:batch-name opts)] + {:com.github.ivarref.yoltq/batch-name batch-name})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 2800c21..7621b13 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -450,3 +450,25 @@ @(d/transact conn [(yq/put :q "asdf")]) (tq/consume! :q) (is (= @got-work "asdf")))) + +(deftest batch-of-jobs-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q1 identity) + (yq/add-consumer! :q2 identity) + @(d/transact conn [(yq/put :q1 {:work 123} {:batch-name :b1}) + (yq/put :q1 {:work 456} {:batch-name :b2}) + (yq/put :q2 {:work 789} {:batch-name :b1})]) + (is (= [{:qname :q1 + :batch-name :b1 + :status :init + :count 1}] + (yq/batch-progress :q1 :b1))) + + (is (= {:work 123} (tq/consume! :q1))) + + (is (= [{:qname :q1 + :batch-name :b1 + :status :done + :count 1}] + (yq/batch-progress :q1 :b1))))) -- cgit v1.2.3 From 8b46092126baea5cd73465f5d544cdb0f75547b6 Mon Sep 17 00:00:00 2001 From: Stefan van den Oord Date: Tue, 16 Sep 2025 10:36:07 +0200 Subject: Rename batch -> job-group --- src/com/github/ivarref/yoltq.clj | 14 +++++++------- src/com/github/ivarref/yoltq/impl.clj | 6 +++--- test/com/github/ivarref/yoltq/virtual_test.clj | 16 ++++++++-------- 3 files changed, 18 insertions(+), 18 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index ccd9062..88a7c31 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -257,27 +257,27 @@ (sort-by (juxt :qname :status)) (vec)))) -(defn batch-progress [queue-name batch-name] +(defn job-group-progress [queue-name job-group-name] (let [{:keys [conn]} @*config* db (d/db conn)] - (->> (d/q '[:find ?e ?qname ?bname ?status + (->> (d/q '[:find ?e ?qname ?jgname ?status :keys :e :qname :bname :status - :in $ ?qname ?bname + :in $ ?qname ?jgname :where [?e :com.github.ivarref.yoltq/queue-name ?qname] - [?e :com.github.ivarref.yoltq/batch-name ?bname] + [?e :com.github.ivarref.yoltq/job-group-name ?jgname] [?e :com.github.ivarref.yoltq/status ?status]] - db queue-name batch-name) + db queue-name job-group-name) (mapv #(select-keys % [:qname :bname :status])) (mapv (fn [qitem] {qitem 1})) (reduce (partial merge-with +) {}) (mapv (fn [[{:keys [qname bname status]} v]] (array-map :qname qname - :batch-name bname + :job-group-name bname :status status :count v))) - (sort-by (juxt :qname :batch-name :status)) + (sort-by (juxt :qname :job-group-name :status)) (vec)))) (defn get-errors [qname] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 6d2aa3d..e77655b 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -12,7 +12,7 @@ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} - #:db{:ident :com.github.ivarref.yoltq/batch-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/job-group-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} @@ -105,8 +105,8 @@ (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) (when-let [ext-id (:id opts)] {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}) - (when-let [batch-name (:batch-name opts)] - {:com.github.ivarref.yoltq/batch-name batch-name})))) + (when-let [job-group-name (:job-group-name opts)] + {:com.github.ivarref.yoltq/job-group-name job-group-name})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 7621b13..d245aaa 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -451,24 +451,24 @@ (tq/consume! :q) (is (= @got-work "asdf")))) -(deftest batch-of-jobs-test +(deftest job-group-test (let [conn (u/empty-conn)] (yq/init! {:conn conn}) (yq/add-consumer! :q1 identity) (yq/add-consumer! :q2 identity) - @(d/transact conn [(yq/put :q1 {:work 123} {:batch-name :b1}) - (yq/put :q1 {:work 456} {:batch-name :b2}) - (yq/put :q2 {:work 789} {:batch-name :b1})]) + @(d/transact conn [(yq/put :q1 {:work 123} {:job-group-name :b1}) + (yq/put :q1 {:work 456} {:job-group-name :b2}) + (yq/put :q2 {:work 789} {:job-group-name :b1})]) (is (= [{:qname :q1 - :batch-name :b1 + :job-group-name :b1 :status :init :count 1}] - (yq/batch-progress :q1 :b1))) + (yq/job-group-progress :q1 :b1))) (is (= {:work 123} (tq/consume! :q1))) (is (= [{:qname :q1 - :batch-name :b1 + :job-group-name :b1 :status :done :count 1}] - (yq/batch-progress :q1 :b1))))) + (yq/job-group-progress :q1 :b1))))) -- cgit v1.2.3 From 698ab89d3a48fd6c42f0abbb1fb6b6c9e8d4d53a Mon Sep 17 00:00:00 2001 From: Stefan van den Oord Date: Tue, 16 Sep 2025 11:10:02 +0200 Subject: Improve naming: job-group is a keyword, so don't include "-name" --- src/com/github/ivarref/yoltq.clj | 20 ++++++++++---------- src/com/github/ivarref/yoltq/impl.clj | 6 +++--- test/com/github/ivarref/yoltq/virtual_test.clj | 14 +++++++------- 3 files changed, 20 insertions(+), 20 deletions(-) (limited to 'src/com/github/ivarref/yoltq/impl.clj') diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 88a7c31..8c8ca7a 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -257,27 +257,27 @@ (sort-by (juxt :qname :status)) (vec)))) -(defn job-group-progress [queue-name job-group-name] +(defn job-group-progress [queue-name job-group] (let [{:keys [conn]} @*config* db (d/db conn)] - (->> (d/q '[:find ?e ?qname ?jgname ?status - :keys :e :qname :bname :status - :in $ ?qname ?jgname + (->> (d/q '[:find ?e ?qname ?job-group ?status + :keys :e :qname :job-group :status + :in $ ?qname ?job-group :where [?e :com.github.ivarref.yoltq/queue-name ?qname] - [?e :com.github.ivarref.yoltq/job-group-name ?jgname] + [?e :com.github.ivarref.yoltq/job-group ?job-group] [?e :com.github.ivarref.yoltq/status ?status]] - db queue-name job-group-name) - (mapv #(select-keys % [:qname :bname :status])) + db queue-name job-group) + (mapv #(select-keys % [:qname :job-group :status])) (mapv (fn [qitem] {qitem 1})) (reduce (partial merge-with +) {}) - (mapv (fn [[{:keys [qname bname status]} v]] + (mapv (fn [[{:keys [qname job-group status]} v]] (array-map :qname qname - :job-group-name bname + :job-group job-group :status status :count v))) - (sort-by (juxt :qname :job-group-name :status)) + (sort-by (juxt :qname :job-group :status)) (vec)))) (defn get-errors [qname] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index e77655b..ffb1ad8 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -12,7 +12,7 @@ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} - #:db{:ident :com.github.ivarref.yoltq/job-group-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/job-group, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} @@ -105,8 +105,8 @@ (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) (when-let [ext-id (:id opts)] {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}) - (when-let [job-group-name (:job-group-name opts)] - {:com.github.ivarref.yoltq/job-group-name job-group-name})))) + (when-let [job-group (:job-group opts)] + {:com.github.ivarref.yoltq/job-group job-group})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index d245aaa..a2ed269 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -456,19 +456,19 @@ (yq/init! {:conn conn}) (yq/add-consumer! :q1 identity) (yq/add-consumer! :q2 identity) - @(d/transact conn [(yq/put :q1 {:work 123} {:job-group-name :b1}) - (yq/put :q1 {:work 456} {:job-group-name :b2}) - (yq/put :q2 {:work 789} {:job-group-name :b1})]) + @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1}) + (yq/put :q1 {:work 456} {:job-group :group2}) + (yq/put :q2 {:work 789} {:job-group :group1})]) (is (= [{:qname :q1 - :job-group-name :b1 + :job-group :group1 :status :init :count 1}] - (yq/job-group-progress :q1 :b1))) + (yq/job-group-progress :q1 :group1))) (is (= {:work 123} (tq/consume! :q1))) (is (= [{:qname :q1 - :job-group-name :b1 + :job-group :group1 :status :done :count 1}] - (yq/job-group-progress :q1 :b1))))) + (yq/job-group-progress :q1 :group1))))) -- cgit v1.2.3