aboutsummaryrefslogtreecommitdiff
path: root/src/com
diff options
context:
space:
mode:
Diffstat (limited to 'src/com')
-rw-r--r--src/com/github/ivarref/yoltq.clj33
1 files changed, 28 insertions, 5 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index ba27d2c..f4c2bf7 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -10,7 +10,8 @@
[datomic.api :as d])
(:import (datomic Connection)
(java.time Duration)
- (java.util.concurrent ExecutorService Executors TimeUnit)))
+ (java.util.concurrent ExecutorService Executors TimeUnit)
+ (java.lang.management ManagementFactory)))
(defonce ^:dynamic *config* (atom nil))
@@ -168,12 +169,14 @@
(defn healthy? []
- (some->> @*config*
- :healthy?
- (deref)))
+ (or
+ (< (.toMinutes (Duration/ofMillis (.getUptime (ManagementFactory/getRuntimeMXBean)))) 10)
+ (some->> @*config*
+ :healthy?
+ (deref))))
(defn unhealthy?
- "Returns `true` if there are queues in error, otherwise `false`."
+ "Returns `true` if there are queues in error and application has been up for over 10 minutes, otherwise `false`."
[]
(false? (healthy?)))
@@ -198,6 +201,26 @@
(sort-by (juxt :qname :status))
(vec))))
+(defn get-errors [qname]
+ (let [{:keys [conn]} @*config*
+ db (d/db conn)]
+ (->> (d/q '[:find [?id ...]
+ :in $ ?qname ?status
+ :where
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/status ?status]
+ [?e :com.github.ivarref.yoltq/id ?id]]
+ db
+ qname
+ :error)
+ (mapv (partial u/get-queue-item db)))))
+
+(defn retry-one-error! [qname]
+ (let [{:keys [handlers] :as cfg} @*config*
+ _ (assert (contains? handlers qname) "Queue not found")
+ cfg (assoc-in cfg [:handlers qname :max-retries] Integer/MAX_VALUE)]
+ (poller/poll-once! cfg qname :error)))
+
(comment
(do
(require 'com.github.ivarref.yoltq.log-init)