blob: ee6359ed82e95cbf8f8b58ac886c25953b7d4005 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
(ns com.github.ivarref.yoltq.error-poller
(:require [datomic.api :as d]
[com.github.ivarref.yoltq.utils :as u]
[com.github.ivarref.yoltq.ext-sys :as ext]
[clojure.tools.logging :as log]))
(defn get-state [v]
(case v
[:error :none] :recovery
[:error :some] :error
[:error :all] :error
[:recovery :none] :recovery
[:recovery :some] :recovery
[:recovery :all] :error
nil))
(defn handle-error-count [{:keys [errors last-notify state]
:or {errors []
last-notify 0
state :recovery}}
{:keys [system-error-min-count system-error-callback-backoff]
:or {system-error-min-count 3}}
now-ms
error-count]
(let [new-errors (->> (conj errors error-count)
(take-last system-error-min-count)
(vec))
classify (fn [coll]
(cond
(not= system-error-min-count (count coll))
:missing
(every? pos-int? coll)
:all
(every? zero? coll)
:none
:else
:some))
old-state state]
(merge
{:errors new-errors
:last-notify last-notify}
(when-let [new-state (get-state [old-state (classify new-errors)])]
(merge
{:state new-state}
(when (and (= old-state :recovery)
(= new-state :error))
{:run-callback :error
:last-notify now-ms})
(when (and (= new-state :error)
(= old-state :error)
(> now-ms
(+ last-notify system-error-callback-backoff)))
{:run-callback :error
:last-notify now-ms})
(when (and (= new-state :recovery)
(= old-state :error))
{:run-callback :recovery}))))))
(defn do-poll-errors [{:keys [conn system-error
on-system-error
on-system-recovery
healthy?]
:or {on-system-error (fn []
(log/error "There are yoltq queues which have errors")
nil)
on-system-recovery (fn []
(log/info "Yoltq recovered"))}
:as config}]
(assert (some? conn) "expected :conn to be present")
(assert (some? system-error) "expected :system-error to be present")
(let [error-count (or (d/q '[:find (count ?e) .
:in $ ?status
:where
[?e :com.github.ivarref.yoltq/status ?status]]
(d/db conn)
u/status-error)
0)]
(if (pos-int? error-count)
(do
(log/debug "poll-errors found" error-count "errors in system")
(reset! healthy? false))
(reset! healthy? true))
(let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)]
(when run-callback
(cond (= run-callback :error)
(on-system-error)
(= run-callback :recovery)
(on-system-recovery)
:else
(log/error "unhandled callback-type" run-callback))
(log/debug "run-callback is" run-callback))
new-state)))
(defn poll-errors [running? config-atom]
(try
(when @running?
(do-poll-errors @config-atom))
(catch Throwable t
(log/error t "unexpected error in poll-errors:" (ex-message t))
nil)))
(comment
(do-poll-errors @com.github.ivarref.yoltq/*config*))
|