aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/error_poller.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq/error_poller.clj')
-rw-r--r--src/com/github/ivarref/yoltq/error_poller.clj124
1 files changed, 124 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj
new file mode 100644
index 0000000..dffff28
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/error_poller.clj
@@ -0,0 +1,124 @@
+(ns com.github.ivarref.yoltq.error-poller
+ (:require [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d]))
+
+
+(defn get-state [v]
+ (case v
+ [:error :none] :recovery
+ [:error :some] :error
+ [:error :all] :error
+ [:recovery :none] :recovery
+ [:recovery :some] :recovery
+ [:recovery :all] :error
+ nil))
+
+
+(defn handle-error-count [{:keys [errors last-notify state]
+ :or {errors []
+ last-notify 0
+ state :recovery}}
+ {:keys [system-error-min-count system-error-callback-backoff]
+ :or {system-error-min-count 3}}
+ now-ms
+ error-count]
+ (let [new-errors (->> (conj errors error-count)
+ (take-last system-error-min-count)
+ (vec))
+ classify (fn [coll]
+ (cond
+ (not= system-error-min-count (count coll))
+ :missing
+
+ (every? pos-int? coll)
+ :all
+
+ (every? zero? coll)
+ :none
+
+ :else
+ :some))
+ old-state state]
+ (merge
+ {:errors new-errors
+ :last-notify last-notify}
+ (when-let [new-state (get-state [old-state (classify new-errors)])]
+ (merge
+ {:state new-state}
+ (when (and (= old-state :recovery)
+ (= new-state :error))
+ {:run-callback :error
+ :last-notify now-ms})
+
+ (when (and (= new-state :error)
+ (= old-state :error)
+ (> now-ms
+ (+ last-notify system-error-callback-backoff)))
+ {:run-callback :error
+ :last-notify now-ms})
+
+ (when (and (= new-state :recovery)
+ (= old-state :error))
+ {:run-callback :recovery}))))))
+
+
+(defn do-poll-errors [{:keys [conn
+ system-error
+ on-system-error
+ on-system-recovery
+ healthy?
+ healthy-allowed-error-time]
+ :or {on-system-error (fn []
+ (log/error "There are yoltq queues which have errors")
+ nil)
+ on-system-recovery (fn []
+ (log/info "Yoltq recovered"))}
+ :as config}
+ now-ms]
+ (assert (some? conn) "expected :conn to be present")
+ (assert (some? system-error) "expected :system-error to be present")
+ (assert (nat-int? healthy-allowed-error-time) "expected :healthy-allowed-error-time to be present")
+ (let [max-init-time (- now-ms healthy-allowed-error-time)
+ error-count (or (d/q '[:find (count ?e) .
+ :in $ ?status ?max-init-time
+ :where
+ [?e :com.github.ivarref.yoltq/status ?status]
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]
+ [(<= ?init-time ?max-init-time)]]
+ (d/db conn)
+ u/status-error
+ max-init-time)
+ 0)]
+ (if (pos-int? error-count)
+ (do
+ (log/debug "poll-errors found" error-count "errors in system")
+ (reset! healthy? false))
+ (reset! healthy? true))
+ (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config now-ms error-count)]
+ (when run-callback
+ (cond (= run-callback :error)
+ (on-system-error)
+
+ (= run-callback :recovery)
+ (on-system-recovery)
+
+ :else
+ (log/error "unhandled callback-type" run-callback))
+ (log/debug "run-callback is" run-callback))
+ error-count)))
+
+
+(defn poll-errors [running? config-atom]
+ (try
+ (when @running?
+ (do-poll-errors @config-atom (ext/now-ms)))
+ (catch Throwable t
+ (log/error t "unexpected error in poll-errors:" (ex-message t))
+ nil)))
+
+
+(comment
+ (do-poll-errors @com.github.ivarref.yoltq/*config* (ext/now-ms)))
+