diff options
| author | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-17 22:16:14 +0200 |
|---|---|---|
| committer | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-17 22:16:14 +0200 |
| commit | 2a236e6d90410821370761434fad45b13621fbdf (patch) | |
| tree | 2950201cc14368bffbd3aedb6fcefb71cc6c90af /src | |
| parent | Use [#'taoensso.timbre/*context*] as default :capture-bindings if present (diff) | |
| download | fiinha-2a236e6d90410821370761434fad45b13621fbdf.tar.gz fiinha-2a236e6d90410821370761434fad45b13621fbdf.tar.xz | |
Add consume-twice! test function for verifying idempotence
Diffstat (limited to 'src')
| -rw-r--r-- | src/com/github/ivarref/yoltq/virtual_queue.clj | 37 |
1 files changed, 36 insertions, 1 deletions
diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj index 71c7b6d..db429a8 100644 --- a/src/com/github/ivarref/yoltq/virtual_queue.clj +++ b/src/com/github/ivarref/yoltq/virtual_queue.clj @@ -160,4 +160,39 @@ (:exception res#)))) (catch Throwable t# (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) - (test/is nil (str "No job found for queue " ~queue-name))))
\ No newline at end of file + (test/is false (str "No job found for queue " ~queue-name)))) +tx-spent-time! + +(defmacro consume! [queue-name] + `(consume-expect! ~queue-name :done)) + + +(defn mark-fails! [{:keys [conn]} + {:com.github.ivarref.yoltq/keys [id lock tries]} + _] + (try + (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]] + @(d/transact conn tx) + nil) + (catch Throwable t + (log/error t "unexpected error in mark-status!: " (ex-message t)) + nil))) + + +(defmacro consume-twice! [queue-name] + `(if-let [job# (get-tx-q-job ~queue-name)] + (try + (with-bindings (:com.github.ivarref.yoltq/bindings job#) + (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#)) + (i/take! @yq/*config*) + (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!))) + (consume! ~queue-name)) + (catch Throwable t# + (log/error t# "unexpected error in consume-twice!:" (ex-message t#)))) + (test/is false (str "No job found for queue " ~queue-name))))
\ No newline at end of file |
