aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvar Refsdal <ivar.refsdal@nsd.no>2021-09-17 22:16:14 +0200
committerIvar Refsdal <ivar.refsdal@nsd.no>2021-09-17 22:16:14 +0200
commit2a236e6d90410821370761434fad45b13621fbdf (patch)
tree2950201cc14368bffbd3aedb6fcefb71cc6c90af /src
parentUse [#'taoensso.timbre/*context*] as default :capture-bindings if present (diff)
downloadfiinha-2a236e6d90410821370761434fad45b13621fbdf.tar.gz
fiinha-2a236e6d90410821370761434fad45b13621fbdf.tar.xz
Add consume-twice! test function for verifying idempotence
Diffstat (limited to 'src')
-rw-r--r--src/com/github/ivarref/yoltq/virtual_queue.clj37
1 files changed, 36 insertions, 1 deletions
diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj
index 71c7b6d..db429a8 100644
--- a/src/com/github/ivarref/yoltq/virtual_queue.clj
+++ b/src/com/github/ivarref/yoltq/virtual_queue.clj
@@ -160,4 +160,39 @@
(:exception res#))))
(catch Throwable t#
(log/error t# "unexpected error in consume-expect:" (ex-message t#))))
- (test/is nil (str "No job found for queue " ~queue-name)))) \ No newline at end of file
+ (test/is false (str "No job found for queue " ~queue-name))))
+tx-spent-time!
+
+(defmacro consume! [queue-name]
+ `(consume-expect! ~queue-name :done))
+
+
+(defn mark-fails! [{:keys [conn]}
+ {:com.github.ivarref.yoltq/keys [id lock tries]}
+ _]
+ (try
+ (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]]
+ @(d/transact conn tx)
+ nil)
+ (catch Throwable t
+ (log/error t "unexpected error in mark-status!: " (ex-message t))
+ nil)))
+
+
+(defmacro consume-twice! [queue-name]
+ `(if-let [job# (get-tx-q-job ~queue-name)]
+ (try
+ (with-bindings (:com.github.ivarref.yoltq/bindings job#)
+ (some->> (u/prepare-processing (d/db (:conn @yq/*config*))
+ (:com.github.ivarref.yoltq/id job#)
+ ~queue-name
+ (:com.github.ivarref.yoltq/lock job#)
+ (:com.github.ivarref.yoltq/status job#))
+ (i/take! @yq/*config*)
+ (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!)))
+ (consume! ~queue-name))
+ (catch Throwable t#
+ (log/error t# "unexpected error in consume-twice!:" (ex-message t#))))
+ (test/is false (str "No job found for queue " ~queue-name)))) \ No newline at end of file