diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq/poller.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq/poller.clj | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj new file mode 100644 index 0000000..9cf81c7 --- /dev/null +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -0,0 +1,64 @@ +(ns com.github.ivarref.yoltq.poller + (:require [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i] + [clojure.tools.logging :as log])) + + +(defn poll-once! [cfg q status] + (when-let [item (case status + :init (u/get-init cfg q) + :error (u/get-error cfg q) + :hung (u/get-hung cfg q))] + (with-bindings (get item :bindings {}) + (if (i/depends-on-waiting? cfg item) + nil + (some->> item + (i/take! cfg) + (i/execute! cfg)))))) + + +(defn poll-queue! [running? + {:keys [running-queues] :as cfg} + [queue-name status :as q]] + (try + (let [[old _] (swap-vals! running-queues conj q)] + (if-not (contains? old q) + (try + (log/debug "polling queue" queue-name "for status" status) + (let [start-time (u/now-ms) + last-res (loop [prev-res nil] + (when @running? + (let [res (poll-once! cfg queue-name status)] + (log/debug "poll-once! returned" res) + (if (and res (:success? res)) + (recur res) + prev-res))))] + (let [spent-ms (- (u/now-ms) start-time)] + (log/trace "done polling queue" q "in" spent-ms "ms")) + last-res) + (finally + (swap! running-queues disj q))) + (log/debug "queue" q "is already being polled, doing nothing..."))) + (catch Throwable t + (log/error t "poll-queue! crashed:" (ex-message t))) + (finally))) + +(comment + (def cfg @com.github.ivarref.yoltq/*config*)) + +(comment + (poll-queue! + (atom true) + @com.github.ivarref.yoltq/*config* + [:add-message-thread :init])) + +(defn poll-all-queues! [running? config-atom pool] + (try + (when @running? + (let [{:keys [handlers]} @config-atom] + (doseq [q (shuffle (vec (for [q-name (keys handlers) + status [:init :error :hung]] + [q-name status])))] + (.execute pool (fn [] (poll-queue! running? @config-atom q)))))) + (catch Throwable t + (log/error t "poll-all-queues! crashed:" (ex-message t))))) |
