diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq/error_poller.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq/error_poller.clj | 36 |
1 files changed, 22 insertions, 14 deletions
diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index ee6359e..dffff28 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -1,8 +1,8 @@ (ns com.github.ivarref.yoltq.error-poller - (:require [datomic.api :as d] - [com.github.ivarref.yoltq.utils :as u] + (:require [clojure.tools.logging :as log] [com.github.ivarref.yoltq.ext-sys :as ext] - [clojure.tools.logging :as log])) + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d])) (defn get-state [v] @@ -64,31 +64,39 @@ {:run-callback :recovery})))))) -(defn do-poll-errors [{:keys [conn system-error +(defn do-poll-errors [{:keys [conn + system-error on-system-error on-system-recovery - healthy?] + healthy? + healthy-allowed-error-time] :or {on-system-error (fn [] (log/error "There are yoltq queues which have errors") nil) on-system-recovery (fn [] (log/info "Yoltq recovered"))} - :as config}] + :as config} + now-ms] (assert (some? conn) "expected :conn to be present") (assert (some? system-error) "expected :system-error to be present") - (let [error-count (or (d/q '[:find (count ?e) . - :in $ ?status + (assert (nat-int? healthy-allowed-error-time) "expected :healthy-allowed-error-time to be present") + (let [max-init-time (- now-ms healthy-allowed-error-time) + error-count (or (d/q '[:find (count ?e) . + :in $ ?status ?max-init-time :where - [?e :com.github.ivarref.yoltq/status ?status]] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(<= ?init-time ?max-init-time)]] (d/db conn) - u/status-error) + u/status-error + max-init-time) 0)] (if (pos-int? error-count) (do (log/debug "poll-errors found" error-count "errors in system") (reset! healthy? false)) (reset! healthy? true)) - (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)] + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config now-ms error-count)] (when run-callback (cond (= run-callback :error) (on-system-error) @@ -99,18 +107,18 @@ :else (log/error "unhandled callback-type" run-callback)) (log/debug "run-callback is" run-callback)) - new-state))) + error-count))) (defn poll-errors [running? config-atom] (try (when @running? - (do-poll-errors @config-atom)) + (do-poll-errors @config-atom (ext/now-ms))) (catch Throwable t (log/error t "unexpected error in poll-errors:" (ex-message t)) nil))) (comment - (do-poll-errors @com.github.ivarref.yoltq/*config*)) + (do-poll-errors @com.github.ivarref.yoltq/*config* (ext/now-ms))) |
