aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/virtual_queue.clj
blob: e49aca3fd4b337f1636ddcde1c16ac53855d3b2b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
(ns com.github.ivarref.yoltq.virtual-queue
  (:require [clojure.tools.logging :as log]
            [com.github.ivarref.yoltq.report-queue :as rq]
            [com.github.ivarref.yoltq.ext-sys :as ext]
            [com.github.ivarref.yoltq :as dq]
            [datomic.api :as d]
            [com.github.ivarref.yoltq.poller :as poller])
  (:import (java.util.concurrent BlockingQueue TimeUnit)))


(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn]
  (let [ready? (promise)]
    (future
      (let [q (d/tx-report-queue conn)]
        (try
          (while @running?
            (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)]
              (swap! txq conj poll-result))
            (deliver ready? true)
            (reset! bootstrapped? true))
          (catch Throwable t
            (log/error t "test-poller crashed: " (ex-message t)))
          (finally
            (try
              (d/remove-tx-report-queue conn)
              (catch Throwable t
                (log/warn t "remove-tx-report-queue failed:" (ex-message t))))
            (deliver poller-exited? true)))))
    @ready?))


(defmacro with-virtual-queue!
  [& body]
  `(let [txq# (atom [])
         poller-exited?# (promise)
         bootstrapped?# (atom false)
         running?# (atom true)
         config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#)
                        :init-backoff-time 0
                        :hung-log-level    :warn
                        :tx-queue          txq#})]
     (with-bindings {#'dq/*config*       config#
                     #'dq/*running?*     (atom false)
                     #'dq/*test-mode*    true
                     #'ext/*now-ns-atom* (atom 0)
                     #'ext/*random-atom* (atom 0)
                     #'ext/*squuid-atom* (atom 0)}
       (try
         ~@body
         (finally
           (reset! running?# false)
           (when @bootstrapped?#
             @poller-exited?#))))))


(defn call-with-virtual-queue!
  [f]
  (with-virtual-queue!
    (f)))


(defn run-report-queue! [min-items]
  (let [{:keys [tx-queue conn]} @dq/*config*
        id-ident (d/q '[:find ?e .
                        :where [?e :db/ident :com.github.ivarref.yoltq/id]]
                      (d/db conn))]
    (let [timeout (+ 3000 (System/currentTimeMillis))]
      (while (and (< (System/currentTimeMillis) timeout)
                  (< (count @tx-queue) min-items))
        (Thread/sleep 10)))
    (when (< (count @tx-queue) min-items)
      (let [msg (str "run-report-queue: timeout waiting for " min-items " items")]
        (log/error msg)
        (throw (ex-info msg {}))))
    (let [res (atom [])]
      (doseq [itm (first (swap-vals! tx-queue (constantly [])))]
        (rq/process-poll-result!
          @dq/*config*
          id-ident
          itm
          (fn [f] (swap! res conj (f)))))
      @res)))


(defn run-one-report-queue! []
  (first (run-report-queue! 1)))


(defn run-queue-once! [q status]
  (poller/poll-once! @dq/*config* q status))


(defn put! [q payload]
  @(d/transact (:conn @dq/*config*) [(dq/put q payload)]))