aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/poller.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq/poller.clj')
-rw-r--r--src/com/github/ivarref/yoltq/poller.clj64
1 files changed, 64 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj
new file mode 100644
index 0000000..9cf81c7
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/poller.clj
@@ -0,0 +1,64 @@
+(ns com.github.ivarref.yoltq.poller
+ (:require [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.impl :as i]
+ [clojure.tools.logging :as log]))
+
+
+(defn poll-once! [cfg q status]
+ (when-let [item (case status
+ :init (u/get-init cfg q)
+ :error (u/get-error cfg q)
+ :hung (u/get-hung cfg q))]
+ (with-bindings (get item :bindings {})
+ (if (i/depends-on-waiting? cfg item)
+ nil
+ (some->> item
+ (i/take! cfg)
+ (i/execute! cfg))))))
+
+
+(defn poll-queue! [running?
+ {:keys [running-queues] :as cfg}
+ [queue-name status :as q]]
+ (try
+ (let [[old _] (swap-vals! running-queues conj q)]
+ (if-not (contains? old q)
+ (try
+ (log/debug "polling queue" queue-name "for status" status)
+ (let [start-time (u/now-ms)
+ last-res (loop [prev-res nil]
+ (when @running?
+ (let [res (poll-once! cfg queue-name status)]
+ (log/debug "poll-once! returned" res)
+ (if (and res (:success? res))
+ (recur res)
+ prev-res))))]
+ (let [spent-ms (- (u/now-ms) start-time)]
+ (log/trace "done polling queue" q "in" spent-ms "ms"))
+ last-res)
+ (finally
+ (swap! running-queues disj q)))
+ (log/debug "queue" q "is already being polled, doing nothing...")))
+ (catch Throwable t
+ (log/error t "poll-queue! crashed:" (ex-message t)))
+ (finally)))
+
+(comment
+ (def cfg @com.github.ivarref.yoltq/*config*))
+
+(comment
+ (poll-queue!
+ (atom true)
+ @com.github.ivarref.yoltq/*config*
+ [:add-message-thread :init]))
+
+(defn poll-all-queues! [running? config-atom pool]
+ (try
+ (when @running?
+ (let [{:keys [handlers]} @config-atom]
+ (doseq [q (shuffle (vec (for [q-name (keys handlers)
+ status [:init :error :hung]]
+ [q-name status])))]
+ (.execute pool (fn [] (poll-queue! running? @config-atom q))))))
+ (catch Throwable t
+ (log/error t "poll-all-queues! crashed:" (ex-message t)))))