aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/com/github/ivarref/yoltq.clj10
-rw-r--r--src/com/github/ivarref/yoltq/poller.clj12
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj11
-rw-r--r--src/com/github/ivarref/yoltq/utils.clj58
-rw-r--r--src/com/github/ivarref/yoltq/virtual_queue.clj23
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj32
6 files changed, 98 insertions, 48 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index d3eefef..6341e41 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -58,6 +58,8 @@
; contain the stacktrace of the stuck threads.
:pool-size 4
+ :capture-bindings []
+
; How often should the system be polled for failed queue jobs
:system-error-poll-delay (Duration/ofMinutes 1)
@@ -159,17 +161,17 @@
(d/create-database uri)
(let [ok-items (atom [])
conn (d/connect uri)
- n 100]
+ n 1]
(init! {:conn conn
:error-backoff-time (Duration/ofSeconds 1)
:poll-delay (Duration/ofSeconds 1)})
(add-consumer! :q (fn [payload]
- (when (> (Math/random) 0.5)
- (throw (ex-info "oops" {})))
+ #_(when (> (Math/random) 0.5)
+ (throw (ex-info "oops" {})))
(if (= n (count (swap! received conj (:work payload))))
(log/info "... and we are done!")
(log/info "got payload" payload "total ok:" (count @received)))))
(start!)
(dotimes [x n]
- @(d/transact conn [(put :q {:work x})]))
+ @(d/transact conn [(put :q {:work 123})]))
nil)))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj
index ad9d32a..1f4e65d 100644
--- a/src/com/github/ivarref/yoltq/poller.clj
+++ b/src/com/github/ivarref/yoltq/poller.clj
@@ -5,10 +5,14 @@
(defn poll-once! [cfg q status]
- (case status
- :init (some->> (u/get-init cfg q) (i/take! cfg) (i/execute! cfg))
- :error (some->> (u/get-error cfg q) (i/take! cfg) (i/execute! cfg))
- :hung (some->> (u/get-hung cfg q) (i/take! cfg) (i/execute! cfg))))
+ (when-let [item (case status
+ :init (u/get-init cfg q)
+ :error (u/get-error cfg q)
+ :hung (u/get-hung cfg q))]
+ (with-bindings (get item :bindings {})
+ (some->> item
+ (i/take! cfg)
+ (i/execute! cfg)))))
(defn poll-queue! [running?
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index a40d29a..c6559bf 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -19,11 +19,12 @@
(doseq [id new-ids]
(consumer (fn []
(try
- (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name]} (u/get-queue-item db-after id)]
- (some->>
- (u/prepare-processing id queue-name lock status)
- (i/take! cfg)
- (i/execute! cfg)))
+ (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name bindings]} (u/get-queue-item db-after id)]
+ (with-bindings (or bindings {})
+ (some->>
+ (u/prepare-processing db-after id queue-name lock status)
+ (i/take! cfg)
+ (i/execute! cfg))))
(catch Throwable t
(log/error t "unexpected error in process-poll-result!")))))))))
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
index c96d1dc..9501343 100644
--- a/src/com/github/ivarref/yoltq/utils.clj
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -56,14 +56,22 @@
(-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id])
(dissoc :db/id)
(update :com.github.ivarref.yoltq/payload edn/read-string)
- (update :com.github.ivarref.yoltq/bindings edn/read-string)))
+ (update :com.github.ivarref.yoltq/bindings
+ (fn [s]
+ (when s
+ (->> s
+ (edn/read-string)
+ (reduce-kv (fn [o k v]
+ (assoc o (resolve k) v))
+ {})))))))
-(defn prepare-processing [id queue-name old-lock old-status]
+(defn prepare-processing [db id queue-name old-lock old-status]
(let [new-lock (random-uuid)]
{:id id
:lock new-lock
:queue-name queue-name
+ :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {})
:tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing]
{:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]}))
@@ -73,29 +81,31 @@
(assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
(str (if (nil? conn) "nil" conn))
"\nConfig was: " (str cfg)))
- (if-let [ids (->> (d/q '[:find ?id ?lock
- :in $ ?queue-name ?backoff
- :where
- [?e :com.github.ivarref.yoltq/status :init]
- [?e :com.github.ivarref.yoltq/queue-name ?queue-name]
- [?e :com.github.ivarref.yoltq/init-time ?init-time]
- [(>= ?backoff ?init-time)]
- [?e :com.github.ivarref.yoltq/id ?id]
- [?e :com.github.ivarref.yoltq/lock ?lock]]
- (or db (d/db conn))
- queue-name
- (- (now-ns) init-backoff-time))
- (not-empty))]
- (let [[id old-lock] (rand-nth (into [] ids))]
- (prepare-processing id queue-name old-lock :init))
- (log/trace "no new-items in :init status for queue" queue-name)))
+ (let [db (or db (d/db conn))]
+ (if-let [ids (->> (d/q '[:find ?id ?lock
+ :in $ ?queue-name ?backoff
+ :where
+ [?e :com.github.ivarref.yoltq/status :init]
+ [?e :com.github.ivarref.yoltq/queue-name ?queue-name]
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]
+ [(>= ?backoff ?init-time)]
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [?e :com.github.ivarref.yoltq/lock ?lock]]
+ db
+ queue-name
+ (- (now-ns) init-backoff-time))
+ (not-empty))]
+ (let [[id old-lock] (rand-nth (into [] ids))]
+ (prepare-processing db id queue-name old-lock :init))
+ (log/trace "no new-items in :init status for queue" queue-name))))
(defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name]
(assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
(str (if (nil? conn) "nil" conn))
"\nConfig was: " (str cfg)))
- (let [max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)]
+ (let [db (or db (d/db conn))
+ max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)]
(when-let [ids (->> (d/q '[:find ?id ?lock
:in $ ?queue-name ?backoff ?max-tries
:where
@@ -107,13 +117,13 @@
[(> ?max-tries ?tries)]
[?e :com.github.ivarref.yoltq/id ?id]
[?e :com.github.ivarref.yoltq/lock ?lock]]
- (or db (d/db conn))
+ db
queue-name
(- (now-ns) error-backoff-time)
(inc max-retries))
(not-empty))]
(let [[id old-lock] (rand-nth (into [] ids))]
- (prepare-processing id queue-name old-lock :error)))))
+ (prepare-processing db id queue-name old-lock :error)))))
(defn get-hung [{:keys [conn db now hung-backoff-time max-retries] :as cfg} queue-name]
@@ -121,7 +131,8 @@
(str (if (nil? conn) "nil" conn))
"\nConfig was: " (str cfg)))
(let [now (or now (now-ns))
- max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)]
+ max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)
+ db (or db (d/db conn))]
(when-let [ids (->> (d/q '[:find ?id ?lock ?tries
:in $ ?qname ?backoff
:where
@@ -132,7 +143,7 @@
[?e :com.github.ivarref.yoltq/tries ?tries]
[?e :com.github.ivarref.yoltq/id ?id]
[?e :com.github.ivarref.yoltq/lock ?lock]]
- (or db (d/db conn))
+ db
queue-name
(- now hung-backoff-time))
(not-empty))]
@@ -144,6 +155,7 @@
:queue-name queue-name
:was-hung? true
:to-error? to-error?
+ :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {})
:tx (if (not to-error?)
[[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj
index f133bde..71c7b6d 100644
--- a/src/com/github/ivarref/yoltq/virtual_queue.clj
+++ b/src/com/github/ivarref/yoltq/virtual_queue.clj
@@ -146,17 +146,18 @@
(defmacro consume-expect! [queue-name expected-status]
`(if-let [job# (get-tx-q-job ~queue-name)]
(try
- (let [res# (some->>
- (u/prepare-processing (:com.github.ivarref.yoltq/id job#)
- ~queue-name
- (:com.github.ivarref.yoltq/lock job#)
- (:com.github.ivarref.yoltq/status job#))
- (i/take! @yq/*config*)
- (i/execute! @yq/*config*))]
- (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#)))
- (if (:retval res#)
- (:retval res#)
- (:exception res#)))
+ (with-bindings (:com.github.ivarref.yoltq/bindings job#)
+ (let [res# (some->> (u/prepare-processing (d/db (:conn @yq/*config*))
+ (:com.github.ivarref.yoltq/id job#)
+ ~queue-name
+ (:com.github.ivarref.yoltq/lock job#)
+ (:com.github.ivarref.yoltq/status job#))
+ (i/take! @yq/*config*)
+ (i/execute! @yq/*config*))]
+ (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#)))
+ (if (:retval res#)
+ (:retval res#)
+ (:exception res#))))
(catch Throwable t#
(log/error t# "unexpected error in consume-expect:" (ex-message t#))))
(test/is nil (str "No job found for queue " ~queue-name)))) \ No newline at end of file
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
index 41d2461..575dc1b 100644
--- a/test/com/github/ivarref/yoltq/virtual_test.clj
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -8,7 +8,9 @@
[com.github.ivarref.yoltq.utils :as uu]
[clojure.tools.logging :as log]
[com.github.ivarref.yoltq.impl :as i]
- [com.github.ivarref.yoltq :as yq]))
+ [com.github.ivarref.yoltq :as yq]
+ [clojure.pprint :as pprint]
+ [clojure.edn :as edn]))
(use-fixtures :each vq/call-with-virtual-queue!)
@@ -230,3 +232,31 @@
(is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q)))))
+(deftest consume-expect-test
+ (let [conn (u/empty-conn)
+ seen (atom #{})]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q (fn [payload]
+ (when (= #{1 2} (swap! seen conj payload))
+ (throw (ex-info "oops" {})))
+ payload))
+
+ @(d/transact conn [(dq/put :q 1)])
+ @(d/transact conn [(dq/put :q 2)])
+
+ (is (= 1 (vq/consume-expect! :q :done)))
+ (vq/consume-expect! :q :error)))
+
+
+(def ^:dynamic *some-binding* nil)
+
+
+(deftest binding-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :bindings [#'*some-binding*]})
+ (dq/add-consumer! :q (fn [_] *some-binding*))
+ (binding [*some-binding* 1] @(d/transact conn [(dq/put :q nil)]))
+ #_(binding [*some-binding* 2] @(d/transact conn [(dq/put :q nil)]))
+ #_@(d/transact conn [(dq/put :q nil)])
+ (is (= 1 (vq/consume-expect! :q :done)))))