aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref
diff options
context:
space:
mode:
authorire <refsdal.ivar@gmail.com>2025-05-13 19:02:10 +0200
committerire <refsdal.ivar@gmail.com>2025-05-13 19:02:10 +0200
commit1df100143cf935cca10f0afa62ef00f2673c655a (patch)
tree5c70e02f480c113b75bc73354e8af15f2e7fa2ae /src/com/github/ivarref
parentRelease 0.2.64: Allow for infinitive retries (diff)
downloadfiinha-1df100143cf935cca10f0afa62ef00f2673c655a.tar.gz
fiinha-1df100143cf935cca10f0afa62ef00f2673c655a.tar.xz
Fix reflection warnings
Diffstat (limited to 'src/com/github/ivarref')
-rw-r--r--src/com/github/ivarref/yoltq.clj25
1 files changed, 18 insertions, 7 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 379d701..a7dcddf 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -12,8 +12,7 @@
(:import (datomic Connection)
(java.lang.management ManagementFactory)
(java.time Duration Instant ZoneOffset ZonedDateTime)
- (java.util.concurrent ExecutorService Executors TimeUnit)))
-
+ (java.util.concurrent ExecutorService Executors ScheduledExecutorService TimeUnit)))
(defonce ^:dynamic *config* (atom nil))
(defonce threadpool (atom nil))
@@ -85,7 +84,7 @@
u/duration->millis))
-(defn init! [{:keys [conn] :as cfg}]
+(defn init! [{:keys [conn tx-report-queue] :as cfg}]
(assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil")))
(locking threadpool
@(d/transact conn i/schema)
@@ -97,6 +96,9 @@
:system-error (atom {})
:healthy? (atom nil)
:slow? (atom nil)
+ :get-tx-report-queue (fn []
+ (or tx-report-queue
+ (d/tx-report-queue conn)))
:slow-thread-watcher-done? (promise)}
default-opts
(if *test-mode* old-conf (select-keys old-conf [:handlers]))
@@ -140,9 +142,9 @@
(let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size)))
queue-listener-ready (promise)]
(reset! *running?* true)
- (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
- (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
- (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*)))
+ (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
+ (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
+ (.execute ^ScheduledExecutorService pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*)))
(future (try
(slow-executor/show-slow-threads pool *config*)
(finally
@@ -327,7 +329,7 @@
[{:keys [age-days queue-name now db duration->long]
:or {age-days 30
now (ZonedDateTime/now ZoneOffset/UTC)
- duration->long (fn [duration] (.toSeconds duration))}}]
+ duration->long (fn [duration] (.toSeconds ^Duration duration))}}]
(let [{:keys [conn]} @*config*
db (or db (d/db conn))
->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)]
@@ -357,6 +359,15 @@
:min (apply min values))})))
(into (sorted-map)))))
+
+
+(defn add-tx-report-queue!
+ ([conn]
+ (add-tx-report-queue! conn :default))
+ ([conn id]
+ (if @*config*
+ :...)))
+
(comment
(do
(require 'com.github.ivarref.yoltq.log-init)