aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/report_queue.clj
diff options
context:
space:
mode:
authorIvar Refsdal <ivar.refsdal@nsd.no>2021-09-04 13:23:07 +0200
committerIvar Refsdal <ivar.refsdal@nsd.no>2021-09-14 12:52:42 +0200
commitea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc (patch)
tree38db9a13c41576dd39a18ec4f4b2d498322a30c2 /src/com/github/ivarref/yoltq/report_queue.clj
downloadfiinha-ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc.tar.gz
fiinha-ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc.tar.xz
Initial commit
Add release script Release 0.1.3 Use com.github.ivarref.yoltq namespace Use com.github.ivarref.yoltq namespace
Diffstat (limited to 'src/com/github/ivarref/yoltq/report_queue.clj')
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj54
1 files changed, 54 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
new file mode 100644
index 0000000..a40d29a
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -0,0 +1,54 @@
+(ns com.github.ivarref.yoltq.report-queue
+ (:require [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.impl :as i]
+ [datomic.api :as d]
+ [clojure.tools.logging :as log])
+ (:import (datomic Datom)
+ (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit)))
+
+
+(defn process-poll-result! [cfg id-ident poll-result consumer]
+ (let [{:keys [tx-data db-after]} poll-result]
+ (when-let [new-ids (->> tx-data
+ (filter (fn [^Datom datom] (and
+ (= (.a datom) id-ident)
+ (.added datom))))
+ (mapv (fn [^Datom datom] (.v datom)))
+ (into [])
+ (not-empty))]
+ (doseq [id new-ids]
+ (consumer (fn []
+ (try
+ (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name]} (u/get-queue-item db-after id)]
+ (some->>
+ (u/prepare-processing id queue-name lock status)
+ (i/take! cfg)
+ (i/execute! cfg)))
+ (catch Throwable t
+ (log/error t "unexpected error in process-poll-result!")))))))))
+
+
+(defn report-queue-listener [running?
+ ready?
+ ^ScheduledExecutorService pool
+ config-atom]
+ (let [conn (:conn @config-atom)
+ ^BlockingQueue q (d/tx-report-queue conn)
+ id-ident (d/q '[:find ?e .
+ :where [?e :db/ident :com.github.ivarref.yoltq/id]]
+ (d/db conn))]
+ (try
+ (while @running?
+ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
+ (process-poll-result! @config-atom
+ id-ident
+ poll-result
+ (fn [f]
+ (when @running?
+ (.execute ^ScheduledExecutorService pool f)))))
+ (deliver ready? true))
+ (catch Throwable t
+ (log/error t "unexpected error in report-queue-listener"))
+ (finally
+ (log/debug "remove tx-report-queue")
+ (d/remove-tx-report-queue conn))))) \ No newline at end of file