diff options
| author | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-23 11:30:03 +0200 |
|---|---|---|
| committer | Ivar Refsdal <ivar.refsdal@nsd.no> | 2021-09-23 11:30:03 +0200 |
| commit | 79cc3f448949d755c59265e2316408d037be20cb (patch) | |
| tree | 19f239c3bbfc63acf2715e8817334b2323880d93 /src | |
| parent | To test-queue namespace (diff) | |
| download | fiinha-79cc3f448949d755c59265e2316408d037be20cb.tar.gz fiinha-79cc3f448949d755c59265e2316408d037be20cb.tar.xz | |
Add force-retry!
Diffstat (limited to 'src')
| -rw-r--r-- | src/com/github/ivarref/yoltq/impl.clj | 2 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/test_queue.clj | 49 |
2 files changed, 21 insertions, 30 deletions
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index cb99b08..9c95cff 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -67,7 +67,7 @@ (cond (= :db.error/cas-failed error) (do - (log/info ":db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) + (log/info "take! :db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) (when cas-failures (swap! cas-failures inc)) nil) diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index 6aeb959..4c4f903 100644 --- a/src/com/github/ivarref/yoltq/test_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -42,6 +42,7 @@ config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) :init-backoff-time 0 :hung-log-level :warn + :prev-consumed (atom {}) :tx-queue txq#})] (with-bindings {#'yq/*config* config# #'yq/*running?* (atom false) @@ -154,44 +155,34 @@ (:com.github.ivarref.yoltq/status job#)) (i/take! @yq/*config*) (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) (if (:retval res#) (:retval res#) (:exception res#)))) (catch Throwable t# - (log/error t# "unexpected error in consume-expect:" (ex-message t#)))) + (log/error t# "unexpected error in consume-expect:" (ex-message t#)) + (throw t#))) (test/is false (str "No job found for queue " ~queue-name)))) (defmacro consume! [queue-name] `(consume-expect! ~queue-name :done)) -(defn mark-fails! [{:keys [conn]} - {:com.github.ivarref.yoltq/keys [id lock tries]} - _] - (try - (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] - [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing :init]]] - @(d/transact conn tx) - nil) - (catch Throwable t - (log/error t "unexpected error in mark-status!: " (ex-message t)) - nil))) - - (defmacro force-retry! [queue-name] - `(if-let [job# (get-tx-q-job ~queue-name)] - (try - (with-bindings (:com.github.ivarref.yoltq/bindings job#) - (some->> (u/prepare-processing (d/db (:conn @yq/*config*)) - (:com.github.ivarref.yoltq/id job#) - ~queue-name - (:com.github.ivarref.yoltq/lock job#) - (:com.github.ivarref.yoltq/status job#)) - (i/take! @yq/*config*) - (i/execute! (assoc @yq/*config* :mark-status-fn! mark-fails!))) - (consume! ~queue-name)) - (catch Throwable t# - (log/error t# "unexpected error in consume-twice!:" (ex-message t#)))) - (test/is false (str "No job found for queue " ~queue-name))))
\ No newline at end of file + `(if-let [job# (some-> @yq/*config* :prev-consumed deref (get ~queue-name))] + (let [db-res# @(d/transact (:conn @yq/*config*) [{:com.github.ivarref.yoltq/id (:com.github.ivarref.yoltq/id job#) + :com.github.ivarref.yoltq/status :init}]) + res# (some->> (u/prepare-processing (:db-after db-res#) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + :init) + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) + (test/is (= :done (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#))) + (test/is false "Expected to have previously consumed something. Was nil."))) |
