aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvar Refsdal <ivar.refsdal@nsd.no>2021-09-23 11:30:03 +0200
committerIvar Refsdal <ivar.refsdal@nsd.no>2021-09-23 11:30:03 +0200
commit79cc3f448949d755c59265e2316408d037be20cb (patch)
tree19f239c3bbfc63acf2715e8817334b2323880d93
parentTo test-queue namespace (diff)
downloadfiinha-79cc3f448949d755c59265e2316408d037be20cb.tar.gz
fiinha-79cc3f448949d755c59265e2316408d037be20cb.tar.xz
Add force-retry!
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj2
-rw-r--r--src/com/github/ivarref/yoltq/test_queue.clj49
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj3
3 files changed, 23 insertions, 31 deletions
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index cb99b08..9c95cff 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -67,7 +67,7 @@
(cond
(= :db.error/cas-failed error)
(do
- (log/info ":db.error/cas-failed for queue item" (str id) "and attribute" (:a m))
+ (log/info "take! :db.error/cas-failed for queue item" (str id) "and attribute" (:a m))
(when cas-failures
(swap! cas-failures inc))
nil)
diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj
index 6aeb959..4c4f903 100644
--- a/src/com/github/ivarref/yoltq/test_queue.clj
+++ b/src/com/github/ivarref/yoltq/test_queue.clj
@@ -42,6 +42,7 @@
config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#)
:init-backoff-time 0
:hung-log-level :warn
+ :prev-consumed (atom {})
:tx-queue txq#})]
(with-bindings {#'yq/*config* config#
#'yq/*running?* (atom false)
@@ -154,44 +155,34 @@
(:com.github.ivarref.yoltq/status job#))
(i/take! @yq/*config*)
(i/execute! @yq/*config*))]
+ (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#)
(test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#)))
(if (:retval res#)
(:retval res#)
(:exception res#))))
(catch Throwable t#
- (log/error t# "unexpected error in consume-expect:" (ex-message t#))))
+ (log/error t# "unexpected error in consume-expect:" (ex-message t#))
+ (throw t#)))
(test/is false (str "No job found for queue " ~queue-name))))
(defmacro consume! [queue-name]
`(consume-expect! ~queue-name :done))
-(defn mark-fails! [{:keys [conn]}
- {:com.github.ivarref.yoltq/keys [id lock tries]}
- _]
- (try
- (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)]
- [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
- [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]]
- @(d/transact conn tx)
- nil)
- (catch Throwable t
- (log/error t "unexpected error in mark-status!: " (ex-message t))
- nil)))
-
-
(defmacro force-retry! [queue-name]
- `(if-let [job# (get-tx-q-job ~queue-name)]
- (try
- (with-bindings (:com.github.ivarref.yoltq/bindings job#)
- (some->> (u/prepare-processing (d/db (:conn @yq/*config*))
- (:com.github.ivarref.yoltq/id job#)
- ~queue-name
- (:com.github.ivarref.yoltq/lock job#)
- (:com.github.ivarref.yoltq/status job#))
- (i/take! @yq/*config*)
- (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!)))
- (consume! ~queue-name))
- (catch Throwable t#
- (log/error t# "unexpected error in consume-twice!:" (ex-message t#))))
- (test/is false (str "No job found for queue " ~queue-name)))) \ No newline at end of file
+ `(if-let [job# (some-> @yq/*config* :prev-consumed deref (get ~queue-name))]
+ (let [db-res# @(d/transact (:conn @yq/*config*) [{:com.github.ivarref.yoltq/id (:com.github.ivarref.yoltq/id job#)
+ :com.github.ivarref.yoltq/status :init}])
+ res# (some->> (u/prepare-processing (:db-after db-res#)
+ (:com.github.ivarref.yoltq/id job#)
+ ~queue-name
+ (:com.github.ivarref.yoltq/lock job#)
+ :init)
+ (i/take! @yq/*config*)
+ (i/execute! @yq/*config*))]
+ (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#)
+ (test/is (= :done (:com.github.ivarref.yoltq/status res#)))
+ (if (:retval res#)
+ (:retval res#)
+ (:exception res#)))
+ (test/is false "Expected to have previously consumed something. Was nil.")))
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
index 442acac..2b67e5e 100644
--- a/test/com/github/ivarref/yoltq/virtual_test.clj
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -282,4 +282,5 @@
(fn [_] (swap! cnt inc))))
@(d/transact conn [(yq/put :q nil)])
(is (= 1 (tq/consume! :q)))
- #_(is (= 2 (tq/force-retry! :q)))))
+ (is (= 2 (tq/force-retry! :q)))
+ (is (= 3 (tq/force-retry! :q)))))