aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/report_queue.clj
diff options
context:
space:
mode:
authorire <refsdal.ivar@gmail.com>2025-05-21 12:04:15 +0200
committerire <refsdal.ivar@gmail.com>2025-05-21 12:04:15 +0200
commitccfe353aebe8c22429fdaf76a5e0bf34cefca955 (patch)
tree37a28c07942c6a7a48ad8bbb6a549f8579492d85 /src/com/github/ivarref/yoltq/report_queue.clj
parentDoc rationale for waiting for multicaster thread. Handle :end token in report... (diff)
downloadfiinha-ccfe353aebe8c22429fdaf76a5e0bf34cefca955.tar.gz
fiinha-ccfe353aebe8c22429fdaf76a5e0bf34cefca955.tar.xz
Small fixes #7
Diffstat (limited to 'src/com/github/ivarref/yoltq/report_queue.clj')
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj86
1 files changed, 43 insertions, 43 deletions
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index 2a2e489..a9f7e07 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -6,6 +6,8 @@
(:import (datomic Connection Datom)
(java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit)))
+; Private API, subject to change!
+
(defn process-poll-result! [cfg id-ident poll-result consumer]
(let [{:keys [tx-data db-after]} poll-result]
(when-let [new-ids (->> tx-data
@@ -50,8 +52,9 @@
(when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
(if (= poll-result :end)
(do
- (reset! running-local? false)
- #_(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system."))
+ (log/debug "Report queue listener received :end token. Exiting")
+ (reset! running-local? false))
+ ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system."))
(process-poll-result! @config-atom
id-ident
poll-result
@@ -175,7 +178,8 @@
(when (= :timeout (deref ready? 30000 :timeout))
(throw (RuntimeException. "Timed out waiting for multicaster to start")))))
-(defn- wait-multicast-thread-step [conn])
+(defn- wait-multicast-thread-step
+ [conn]
; `get-tx-report-queue-multicast!` should return only when the multicaster thread
; has picked up the new queue.
;
@@ -196,29 +200,34 @@
; Once [:iter-count conn] has changed, we know that the multicaster thread
; will see the new queue. This means that we can be sure that the queue
; will receive the `:end` token if the queue is stopped.
-(let [start-ms (System/currentTimeMillis)
- iter-count (get-in @multicast-state [:iter-count conn] -1)]
- (loop [spin-count 0]
- (if (not= iter-count (get-in @multicast-state [:iter-count conn]))
- nil
- (do
- (let [spent-ms (- (System/currentTimeMillis) start-ms)]
- (if (> spent-ms 30000)
- (throw (RuntimeException. "Timed out waiting for multicaster thread"))
- (do
- (Thread/sleep 16)
- (recur (inc spin-count)))))))))
+ (let [start-ms (System/currentTimeMillis)
+ iter-count (get-in @multicast-state [:iter-count conn] -1)]
+ (loop [spin-count 0]
+ (if (not= iter-count (get-in @multicast-state [:iter-count conn]))
+ nil
+ (do
+ (let [spent-ms (- (System/currentTimeMillis) start-ms)]
+ (if (> spent-ms 30000)
+ (throw (RuntimeException. "Timed out waiting for multicaster thread"))
+ (do
+ (Thread/sleep 16)
+ (recur (inc spin-count))))))))))
(defn get-tx-report-queue-multicast!
"Multicast the datomic.api/tx-report-queue to different consumers.
- The multicaster is started on demand. `conn` and `id` identifies the consumer.
+ A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer.
+ Repeated calls using the same `conn` and `id` returns the same queue.
+
+ The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread
+ to send `:end` if the queue is stopped. The default value is `false`.
- Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`.
+
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
([conn id]
(get-tx-report-queue-multicast! conn id false))
([conn id send-end-token?]
(assert (instance? Connection conn))
- (assert (keyword? id))
(locking consumer-state-lock
(let [the-q
(locking multicast-state-lock
@@ -250,7 +259,7 @@
(wait-multicast-thread-step conn)
the-q))))
-(defn wait-multicast-threads-exit [[old-state new-state]]
+(defn- wait-multicast-threads-exit [[old-state new-state]]
(assert (map? old-state))
(assert (map? new-state))
(assert (map? (get old-state :queues {})))
@@ -275,6 +284,12 @@
; sent by multiple threads.
(let [old-conns (into #{} (keys (get old-state :queues {})))
new-conns (into #{} (keys (get new-state :queues {})))]
+ (assert (every?
+ (fn [x] (instance? Connection x))
+ old-conns))
+ (assert (every?
+ (fn [x] (instance? Connection x))
+ new-conns))
(doseq [old-conn old-conns]
(when-not (contains? new-conns old-conn)
(let [old-threadcount (get-in old-state [:thread-count old-conn] nil)]
@@ -292,6 +307,7 @@
(recur))))))))))))))
(defn stop-multicaster-id! [conn id]
+ (assert (instance? Connection conn))
(locking consumer-state-lock
(wait-multicast-threads-exit
(locking multicast-state-lock
@@ -299,40 +315,23 @@
(let [new-state (dissoc-in old-state [:queues conn id])]
(if (= {} (get-in new-state [:queues conn]))
(dissoc-in old-state [:queues conn])
- new-state))))))))
+ new-state)))))))
+ nil)
(defn stop-multicaster! [conn]
+ (assert (instance? Connection conn))
(locking consumer-state-lock
(wait-multicast-threads-exit
(locking multicast-state-lock
- (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))))))
+ (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))))))
+ nil)
(defn stop-all-multicasters! []
(locking consumer-state-lock
(wait-multicast-threads-exit
(locking multicast-state-lock
- (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))))))
-
-(comment
- (do
- (require 'com.github.ivarref.yoltq.log-init)
- (defn drain! [^BlockingQueue q]
- (loop [items []]
- (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)]
- (recur (conj items elem))
- items)))
- (com.github.ivarref.yoltq.log-init/init-logging!
- [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
- [#{"com.github.ivarref.yoltq.report-queue"} :debug]
- [#{"com.github.ivarref.yoltq.poller"} :info]
- [#{"com.github.ivarref.yoltq"} :debug]
- ;[#{"ivarref.yoltq*"} :info]
- [#{"*"} :info]])
- (defonce conn (let [uri (str "datomic:mem://demo")
- _ (d/delete-database uri)
- _ (d/create-database uri)
- conn (d/connect uri)]
- conn))))
+ (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {}))))))
+ nil)
(comment
(do
@@ -366,6 +365,7 @@
@(d/transact conn [{:db/doc "demo"}])
(log/info "begin drain q1")
(stop-multicaster-id! conn :q1)
+ (stop-multicaster-id! conn :q1)
(println "thread count" @thread-count)
(let [qitems-2 (drain! q2)
qitems-1 (drain! q1)]