aboutsummaryrefslogtreecommitdiff
path: root/src/com
diff options
context:
space:
mode:
Diffstat (limited to 'src/com')
-rw-r--r--src/com/github/ivarref/yoltq.clj32
-rw-r--r--src/com/github/ivarref/yoltq/error_poller.clj19
2 files changed, 42 insertions, 9 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 3164020..03a364f 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -17,7 +17,6 @@
(defonce ^:dynamic *running?* (atom false))
(defonce ^:dynamic *test-mode* false)
-
(def default-opts
(-> {; Default number of times a queue job will be retried before giving up
; Can be overridden on a per consumer basis with
@@ -79,7 +78,8 @@
(-> (merge-with (fn [a b] (or b a))
{:running-queues (atom #{})
:start-execute-time (atom {})
- :system-error (atom {})}
+ :system-error (atom {})
+ :healthy? (atom nil)}
default-opts
(if *test-mode* old-conf (select-keys old-conf [:handlers]))
cfg)
@@ -148,6 +148,32 @@
(reset! threadpool nil))))))
+(defn healthy? []
+ (some->> @*config*
+ :healthy?
+ (deref)))
+
+(defn queue-stats []
+ (let [{:keys [conn]} @*config*
+ db (d/db conn)]
+ (->> (d/q '[:find ?e ?qname ?status
+ :in $
+ :where
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/status ?status]]
+ db)
+ (mapv (partial zipmap [:e :qname :status]))
+ (mapv #(select-keys % [:qname :status]))
+ (mapv (fn [qitem] {qitem 1}))
+ (reduce (partial merge-with +) {})
+ (mapv (fn [[{:keys [qname status]} v]]
+ (array-map
+ :qname qname
+ :status status
+ :count v)))
+ (sort-by (juxt :qname :status))
+ (vec))))
+
(comment
(do
(require 'com.github.ivarref.yoltq.log-init)
@@ -177,4 +203,4 @@
(start!)
(dotimes [x n]
@(d/transact conn [(put :q {:work 123})]))
- nil)))) \ No newline at end of file
+ nil))))
diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj
index 77339f7..1268482 100644
--- a/src/com/github/ivarref/yoltq/error_poller.clj
+++ b/src/com/github/ivarref/yoltq/error_poller.clj
@@ -66,9 +66,13 @@
(defn do-poll-errors [{:keys [conn system-error
on-system-error
- on-system-recovery]
- :or {on-system-error (fn [] nil)
- on-system-recovery (fn [] nil)}
+ on-system-recovery
+ healthy?]
+ :or {on-system-error (fn []
+ (log/error "There are yoltq queues which have errors")
+ nil)
+ on-system-recovery (fn []
+ (log/info "Yoltq recovered"))}
:as config}]
(assert (some? conn) "expected :conn to be present")
(assert (some? system-error) "expected :system-error to be present")
@@ -79,8 +83,11 @@
(d/db conn)
u/status-error)
0)]
- (when (pos-int? error-count)
- (log/debug "poll-errors found" error-count "errors in system"))
+ (if (pos-int? error-count)
+ (do
+ (log/debug "poll-errors found" error-count "errors in system")
+ (reset! healthy? false))
+ (reset! healthy? true))
(let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)]
(when run-callback
(cond (= run-callback :error)
@@ -100,7 +107,7 @@
(when @running?
(do-poll-errors @config-atom))
(catch Throwable t
- (log/error t "unexpected error in poll-erros:" (ex-message t))
+ (log/error t "unexpected error in poll-errors:" (ex-message t))
nil)))