diff options
| author | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-17 14:25:08 +0200 |
|---|---|---|
| committer | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-17 14:25:08 +0200 |
| commit | ad8a41bd7d9e6fed77f633a75ef36410b7afbef1 (patch) | |
| tree | 6ddc2760f1bb6ab3abd88cfef428a161b28da875 | |
| parent | Save bindings on put (diff) | |
| download | fiinha-ad8a41bd7d9e6fed77f633a75ef36410b7afbef1.tar.gz fiinha-ad8a41bd7d9e6fed77f633a75ef36410b7afbef1.tar.xz | |
Start add bindings ...
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 10 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/poller.clj | 12 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/report_queue.clj | 11 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/utils.clj | 58 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/virtual_queue.clj | 23 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/virtual_test.clj | 32 |
6 files changed, 98 insertions, 48 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index d3eefef..6341e41 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -58,6 +58,8 @@ ; contain the stacktrace of the stuck threads. :pool-size 4 + :capture-bindings [] + ; How often should the system be polled for failed queue jobs :system-error-poll-delay (Duration/ofMinutes 1) @@ -159,17 +161,17 @@ (d/create-database uri) (let [ok-items (atom []) conn (d/connect uri) - n 100] + n 1] (init! {:conn conn :error-backoff-time (Duration/ofSeconds 1) :poll-delay (Duration/ofSeconds 1)}) (add-consumer! :q (fn [payload] - (when (> (Math/random) 0.5) - (throw (ex-info "oops" {}))) + #_(when (> (Math/random) 0.5) + (throw (ex-info "oops" {}))) (if (= n (count (swap! received conj (:work payload)))) (log/info "... and we are done!") (log/info "got payload" payload "total ok:" (count @received))))) (start!) (dotimes [x n] - @(d/transact conn [(put :q {:work x})])) + @(d/transact conn [(put :q {:work 123})])) nil))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj index ad9d32a..1f4e65d 100644 --- a/src/com/github/ivarref/yoltq/poller.clj +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -5,10 +5,14 @@ (defn poll-once! [cfg q status] - (case status - :init (some->> (u/get-init cfg q) (i/take! cfg) (i/execute! cfg)) - :error (some->> (u/get-error cfg q) (i/take! cfg) (i/execute! cfg)) - :hung (some->> (u/get-hung cfg q) (i/take! cfg) (i/execute! cfg)))) + (when-let [item (case status + :init (u/get-init cfg q) + :error (u/get-error cfg q) + :hung (u/get-hung cfg q))] + (with-bindings (get item :bindings {}) + (some->> item + (i/take! cfg) + (i/execute! cfg))))) (defn poll-queue! [running? diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index a40d29a..c6559bf 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -19,11 +19,12 @@ (doseq [id new-ids] (consumer (fn [] (try - (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name]} (u/get-queue-item db-after id)] - (some->> - (u/prepare-processing id queue-name lock status) - (i/take! cfg) - (i/execute! cfg))) + (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name bindings]} (u/get-queue-item db-after id)] + (with-bindings (or bindings {}) + (some->> + (u/prepare-processing db-after id queue-name lock status) + (i/take! cfg) + (i/execute! cfg)))) (catch Throwable t (log/error t "unexpected error in process-poll-result!"))))))))) diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index c96d1dc..9501343 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -56,14 +56,22 @@ (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) (dissoc :db/id) (update :com.github.ivarref.yoltq/payload edn/read-string) - (update :com.github.ivarref.yoltq/bindings edn/read-string))) + (update :com.github.ivarref.yoltq/bindings + (fn [s] + (when s + (->> s + (edn/read-string) + (reduce-kv (fn [o k v] + (assoc o (resolve k) v)) + {}))))))) -(defn prepare-processing [id queue-name old-lock old-status] +(defn prepare-processing [db id queue-name old-lock old-status] (let [new-lock (random-uuid)] {:id id :lock new-lock :queue-name queue-name + :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]})) @@ -73,29 +81,31 @@ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) - (if-let [ids (->> (d/q '[:find ?id ?lock - :in $ ?queue-name ?backoff - :where - [?e :com.github.ivarref.yoltq/status :init] - [?e :com.github.ivarref.yoltq/queue-name ?queue-name] - [?e :com.github.ivarref.yoltq/init-time ?init-time] - [(>= ?backoff ?init-time)] - [?e :com.github.ivarref.yoltq/id ?id] - [?e :com.github.ivarref.yoltq/lock ?lock]] - (or db (d/db conn)) - queue-name - (- (now-ns) init-backoff-time)) - (not-empty))] - (let [[id old-lock] (rand-nth (into [] ids))] - (prepare-processing id queue-name old-lock :init)) - (log/trace "no new-items in :init status for queue" queue-name))) + (let [db (or db (d/db conn))] + (if-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff + :where + [?e :com.github.ivarref.yoltq/status :init] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(>= ?backoff ?init-time)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock]] + db + queue-name + (- (now-ns) init-backoff-time)) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing db id queue-name old-lock :init)) + (log/trace "no new-items in :init status for queue" queue-name)))) (defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name] (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) - (let [max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] + (let [db (or db (d/db conn)) + max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] (when-let [ids (->> (d/q '[:find ?id ?lock :in $ ?queue-name ?backoff ?max-tries :where @@ -107,13 +117,13 @@ [(> ?max-tries ?tries)] [?e :com.github.ivarref.yoltq/id ?id] [?e :com.github.ivarref.yoltq/lock ?lock]] - (or db (d/db conn)) + db queue-name (- (now-ns) error-backoff-time) (inc max-retries)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] - (prepare-processing id queue-name old-lock :error))))) + (prepare-processing db id queue-name old-lock :error))))) (defn get-hung [{:keys [conn db now hung-backoff-time max-retries] :as cfg} queue-name] @@ -121,7 +131,8 @@ (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) (let [now (or now (now-ns)) - max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] + max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries) + db (or db (d/db conn))] (when-let [ids (->> (d/q '[:find ?id ?lock ?tries :in $ ?qname ?backoff :where @@ -132,7 +143,7 @@ [?e :com.github.ivarref.yoltq/tries ?tries] [?e :com.github.ivarref.yoltq/id ?id] [?e :com.github.ivarref.yoltq/lock ?lock]] - (or db (d/db conn)) + db queue-name (- now hung-backoff-time)) (not-empty))] @@ -144,6 +155,7 @@ :queue-name queue-name :was-hung? true :to-error? to-error? + :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) :tx (if (not to-error?) [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj index f133bde..71c7b6d 100644 --- a/src/com/github/ivarref/yoltq/virtual_queue.clj +++ b/src/com/github/ivarref/yoltq/virtual_queue.clj @@ -146,17 +146,18 @@ (defmacro consume-expect! [queue-name expected-status] `(if-let [job# (get-tx-q-job ~queue-name)] (try - (let [res# (some->> - (u/prepare-processing (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! @yq/*config*))] - (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) - (if (:retval res#) - (:retval res#) - (:exception res#))) + (with-bindings (:com.github.ivarref.yoltq/bindings job#) + (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#)) + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#)))) (catch Throwable t# (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) (test/is nil (str "No job found for queue " ~queue-name))))
\ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 41d2461..575dc1b 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -8,7 +8,9 @@ [com.github.ivarref.yoltq.utils :as uu] [clojure.tools.logging :as log] [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq :as yq])) + [com.github.ivarref.yoltq :as yq] + [clojure.pprint :as pprint] + [clojure.edn :as edn])) (use-fixtures :each vq/call-with-virtual-queue!) @@ -230,3 +232,31 @@ (is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q))))) +(deftest consume-expect-test + (let [conn (u/empty-conn) + seen (atom #{})] + (dq/init! {:conn conn}) + (dq/add-consumer! :q (fn [payload] + (when (= #{1 2} (swap! seen conj payload)) + (throw (ex-info "oops" {}))) + payload)) + + @(d/transact conn [(dq/put :q 1)]) + @(d/transact conn [(dq/put :q 2)]) + + (is (= 1 (vq/consume-expect! :q :done))) + (vq/consume-expect! :q :error))) + + +(def ^:dynamic *some-binding* nil) + + +(deftest binding-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :bindings [#'*some-binding*]}) + (dq/add-consumer! :q (fn [_] *some-binding*)) + (binding [*some-binding* 1] @(d/transact conn [(dq/put :q nil)])) + #_(binding [*some-binding* 2] @(d/transact conn [(dq/put :q nil)])) + #_@(d/transact conn [(dq/put :q nil)]) + (is (= 1 (vq/consume-expect! :q :done))))) |
