aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
-rw-r--r--src/com/github/ivarref/yoltq.clj32
1 files changed, 29 insertions, 3 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 3164020..03a364f 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -17,7 +17,6 @@
(defonce ^:dynamic *running?* (atom false))
(defonce ^:dynamic *test-mode* false)
-
(def default-opts
(-> {; Default number of times a queue job will be retried before giving up
; Can be overridden on a per consumer basis with
@@ -79,7 +78,8 @@
(-> (merge-with (fn [a b] (or b a))
{:running-queues (atom #{})
:start-execute-time (atom {})
- :system-error (atom {})}
+ :system-error (atom {})
+ :healthy? (atom nil)}
default-opts
(if *test-mode* old-conf (select-keys old-conf [:handlers]))
cfg)
@@ -148,6 +148,32 @@
(reset! threadpool nil))))))
+(defn healthy? []
+ (some->> @*config*
+ :healthy?
+ (deref)))
+
+(defn queue-stats []
+ (let [{:keys [conn]} @*config*
+ db (d/db conn)]
+ (->> (d/q '[:find ?e ?qname ?status
+ :in $
+ :where
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/status ?status]]
+ db)
+ (mapv (partial zipmap [:e :qname :status]))
+ (mapv #(select-keys % [:qname :status]))
+ (mapv (fn [qitem] {qitem 1}))
+ (reduce (partial merge-with +) {})
+ (mapv (fn [[{:keys [qname status]} v]]
+ (array-map
+ :qname qname
+ :status status
+ :count v)))
+ (sort-by (juxt :qname :status))
+ (vec))))
+
(comment
(do
(require 'com.github.ivarref.yoltq.log-init)
@@ -177,4 +203,4 @@
(start!)
(dotimes [x n]
@(d/transact conn [(put :q {:work 123})]))
- nil)))) \ No newline at end of file
+ nil))))