aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/report_queue.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq/report_queue.clj')
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj133
1 files changed, 124 insertions, 9 deletions
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index 20e0a93..9cddc93 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -3,8 +3,8 @@
[com.github.ivarref.yoltq.impl :as i]
[datomic.api :as d]
[clojure.tools.logging :as log])
- (:import (datomic Datom)
- (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit)))
+ (:import (datomic Connection Datom)
+ (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit)))
(defn process-poll-result! [cfg id-ident poll-result consumer]
@@ -28,18 +28,24 @@
(i/take! cfg)
(i/execute! cfg)))))
(catch Throwable t
- (log/error t "unexpected error in process-poll-result!")))))))))
+ (log/error t "Unexpected error in process-poll-result!")))))))))
(defn report-queue-listener [running?
ready?
^ScheduledExecutorService pool
config-atom]
- (let [conn (:conn @config-atom)
- ^BlockingQueue q (d/tx-report-queue conn)
+ (let [cfg @config-atom
+ conn (:conn cfg)
+ tx-report-queue-given (contains? cfg :tx-report-queue)
+ ^BlockingQueue q (if tx-report-queue-given
+ (get cfg :tx-report-queue)
+ (d/tx-report-queue conn))
id-ident (d/q '[:find ?e .
:where [?e :db/ident :com.github.ivarref.yoltq/id]]
(d/db conn))]
+ (assert (instance? BlockingQueue q))
+ (log/debug "tx-report-queue-given:" tx-report-queue-given)
(try
(while @running?
(when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
@@ -49,9 +55,118 @@
(fn [f]
(when @running?
(.execute ^ScheduledExecutorService pool f)))))
- (deliver ready? true))
+ (deliver ready? :ready))
(catch Throwable t
- (log/error t "unexpected error in report-queue-listener"))
+ (log/error t "Unexpected error in report-queue-listener:" (.getMessage t)))
(finally
- (log/debug "remove tx-report-queue")
- (d/remove-tx-report-queue conn))))) \ No newline at end of file
+ (if tx-report-queue-given
+ (log/debug "Remove tx-report-queue handled elsewhere")
+ (do
+ (log/debug "Remove tx-report-queue")
+ (d/remove-tx-report-queue conn)))))))
+
+(defonce ^:private multicast-state-lock (Object.))
+
+(defonce ^:private multicast-state (atom {}))
+
+(defn- start-multicaster! [conn]
+ (let [multicaster-ready? (promise)]
+ (future
+ (log/debug "Multicaster starting for conn" conn)
+ (try
+ (let [input-queue (d/tx-report-queue conn)]
+ (loop []
+ (when-let [mcast-state (get @multicast-state conn)]
+ (when-let [dest-queues (vals mcast-state)]
+ (let [element (.poll ^BlockingQueue input-queue 1 TimeUnit/SECONDS)]
+ (deliver multicaster-ready? :ready)
+ (when (some? element)
+ (doseq [q dest-queues]
+ (let [ok-offer (.offer ^BlockingQueue q element 30 TimeUnit/MINUTES)]
+ (when (false? ok-offer)
+ (log/error "Failed to offer item in multicaster for connection" conn))))))
+ (recur)))))
+ (catch Throwable t
+ (deliver multicaster-ready? :error)
+ (log/error t "Unexpected error in multicaster:" (.getMessage t)))
+ (finally
+ (d/remove-tx-report-queue conn)
+ (log/debug "Multicaster exiting for conn" conn))))
+ multicaster-ready?))
+
+(defn get-tx-report-queue-multicast!
+ "Multicast the datomic.api/tx-report-queue to different consumers.
+ The multicaster is started on demand. `conn` and `id` identifies the consumer.
+
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ [conn id]
+ (assert (instance? Connection conn))
+ (assert (keyword? id))
+ (locking multicast-state-lock
+ (assert (map? @multicast-state))
+ (if-let [existing-q (get-in @multicast-state [conn id])]
+ (do
+ (log/debug "returning existing queue for id" id)
+ (assert (instance? BlockingQueue existing-q))
+ existing-q)
+ (let [needs-multicaster? (not (contains? @multicast-state conn))
+ new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [conn id] (LinkedBlockingQueue.))))]
+ (when needs-multicaster?
+ (let [multicaster-promise (start-multicaster! conn)
+ multicaster-result (deref multicaster-promise (* 30 60000) :timeout)]
+ (cond (= multicaster-result :timeout)
+ (do
+ (log/error "Timeout waiting for multicaster to start")
+ (throw (RuntimeException. "Timeout waiting for multicaster to start")))
+ (= multicaster-result :error)
+ (do
+ (log/error "Multicaster failed to start")
+ (throw (RuntimeException. "Multicaster failed to start")))
+ (= multicaster-result :ready)
+ (log/debug "Multicaster is ready")
+
+ :else
+ (do
+ (log/error "Unexpected state from multicaster:" multicaster-result)
+ (throw (RuntimeException. (str "Unexpected state from multicaster: " multicaster-result)))))))
+ (let [new-q (get-in new-state [conn id])]
+ (assert (instance? BlockingQueue new-q))
+ new-q)))))
+
+(defn stop-all-multicasters! []
+ (reset! multicast-state {}))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (defonce conn (let [uri (str "datomic:mem://demo")
+ _ (d/delete-database uri)
+ _ (d/create-database uri)
+ conn (d/connect uri)]
+ conn))))
+
+(comment
+ (defn drain! [^BlockingQueue q]
+ (loop [cnt 0]
+ (if (nil? (.poll q 1 TimeUnit/SECONDS))
+ cnt
+ (recur (inc cnt))))))
+
+(comment
+ (let [q-1 (get-tx-report-queue-multicast! conn :q1)
+ q-2 (get-tx-report-queue-multicast! conn :q2)]))
+
+(comment
+ (drain! (get-tx-report-queue-multicast! conn :q1)))
+
+(comment
+ (do
+ @(d/transact conn [{:db/doc "demo"}])
+ :yay)) \ No newline at end of file