blob: 9cf81c761e3c1c51d6fe54765223d2c6e9755eb4 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
(ns com.github.ivarref.yoltq.poller
(:require [com.github.ivarref.yoltq.utils :as u]
[com.github.ivarref.yoltq.impl :as i]
[clojure.tools.logging :as log]))
(defn poll-once! [cfg q status]
(when-let [item (case status
:init (u/get-init cfg q)
:error (u/get-error cfg q)
:hung (u/get-hung cfg q))]
(with-bindings (get item :bindings {})
(if (i/depends-on-waiting? cfg item)
nil
(some->> item
(i/take! cfg)
(i/execute! cfg))))))
(defn poll-queue! [running?
{:keys [running-queues] :as cfg}
[queue-name status :as q]]
(try
(let [[old _] (swap-vals! running-queues conj q)]
(if-not (contains? old q)
(try
(log/debug "polling queue" queue-name "for status" status)
(let [start-time (u/now-ms)
last-res (loop [prev-res nil]
(when @running?
(let [res (poll-once! cfg queue-name status)]
(log/debug "poll-once! returned" res)
(if (and res (:success? res))
(recur res)
prev-res))))]
(let [spent-ms (- (u/now-ms) start-time)]
(log/trace "done polling queue" q "in" spent-ms "ms"))
last-res)
(finally
(swap! running-queues disj q)))
(log/debug "queue" q "is already being polled, doing nothing...")))
(catch Throwable t
(log/error t "poll-queue! crashed:" (ex-message t)))
(finally)))
(comment
(def cfg @com.github.ivarref.yoltq/*config*))
(comment
(poll-queue!
(atom true)
@com.github.ivarref.yoltq/*config*
[:add-message-thread :init]))
(defn poll-all-queues! [running? config-atom pool]
(try
(when @running?
(let [{:keys [handlers]} @config-atom]
(doseq [q (shuffle (vec (for [q-name (keys handlers)
status [:init :error :hung]]
[q-name status])))]
(.execute pool (fn [] (poll-queue! running? @config-atom q))))))
(catch Throwable t
(log/error t "poll-all-queues! crashed:" (ex-message t)))))
|