aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/report_queue.clj
blob: 20e0a934ff6a063910c66164dedf4e089aef78e6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
(ns com.github.ivarref.yoltq.report-queue
  (:require [com.github.ivarref.yoltq.utils :as u]
            [com.github.ivarref.yoltq.impl :as i]
            [datomic.api :as d]
            [clojure.tools.logging :as log])
  (:import (datomic Datom)
           (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit)))


(defn process-poll-result! [cfg id-ident poll-result consumer]
  (let [{:keys [tx-data db-after]} poll-result]
    (when-let [new-ids (->> tx-data
                            (filter (fn [^Datom datom] (and
                                                         (= (.a datom) id-ident)
                                                         (.added datom))))
                            (mapv (fn [^Datom datom] (.v datom)))
                            (into [])
                            (not-empty))]
      (doseq [id new-ids]
        (consumer (fn []
                    (try
                      (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name bindings]} (u/get-queue-item db-after id)]
                        (with-bindings (or bindings {})
                          (if (i/depends-on-waiting? cfg {:id id})
                            nil
                            (some->>
                              (u/prepare-processing db-after id queue-name lock status)
                              (i/take! cfg)
                              (i/execute! cfg)))))
                      (catch Throwable t
                        (log/error t "unexpected error in process-poll-result!")))))))))


(defn report-queue-listener [running?
                             ready?
                             ^ScheduledExecutorService pool
                             config-atom]
  (let [conn (:conn @config-atom)
        ^BlockingQueue q (d/tx-report-queue conn)
        id-ident (d/q '[:find ?e .
                        :where [?e :db/ident :com.github.ivarref.yoltq/id]]
                      (d/db conn))]
    (try
      (while @running?
        (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
          (process-poll-result! @config-atom
                                id-ident
                                poll-result
                                (fn [f]
                                  (when @running?
                                    (.execute ^ScheduledExecutorService pool f)))))
        (deliver ready? true))
      (catch Throwable t
        (log/error t "unexpected error in report-queue-listener"))
      (finally
        (log/debug "remove tx-report-queue")
        (d/remove-tx-report-queue conn)))))