aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/report_queue.clj
diff options
context:
space:
mode:
authorire <refsdal.ivar@gmail.com>2025-05-21 09:51:30 +0200
committerire <refsdal.ivar@gmail.com>2025-05-21 09:51:30 +0200
commitaa0b3d0bd9e087c7e1e36e87cd6e10f9e2796449 (patch)
tree077fb665ddc227636d3f4007e1f2367016c98445 /src/com/github/ivarref/yoltq/report_queue.clj
parentImprove tx-report-queue sharing #7 (diff)
downloadfiinha-aa0b3d0bd9e087c7e1e36e87cd6e10f9e2796449.tar.gz
fiinha-aa0b3d0bd9e087c7e1e36e87cd6e10f9e2796449.tar.xz
Doc rationale for waiting for multicaster thread. Handle :end token in report-queue-listener #7
Diffstat (limited to 'src/com/github/ivarref/yoltq/report_queue.clj')
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj91
1 files changed, 51 insertions, 40 deletions
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index 239de12..2a2e489 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -45,15 +45,20 @@
(assert (instance? BlockingQueue q))
(log/debug "tx-report-queue-given:" tx-report-queue-given)
(try
- (while @running?
- (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
- (process-poll-result! @config-atom
- id-ident
- poll-result
- (fn [f]
- (when @running?
- (.execute ^ScheduledExecutorService pool f)))))
- (deliver ready? :ready))
+ (let [running-local? (atom true)]
+ (while (and @running? @running-local?)
+ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
+ (if (= poll-result :end)
+ (do
+ (reset! running-local? false)
+ #_(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system."))
+ (process-poll-result! @config-atom
+ id-ident
+ poll-result
+ (fn [f]
+ (when @running?
+ (.execute ^ScheduledExecutorService pool f))))))
+ (deliver ready? :ready)))
(catch Throwable t
(log/error t "Unexpected error in report-queue-listener:" (.getMessage t)))
(finally
@@ -128,6 +133,7 @@
(defonce ^:private thread-count (atom 0))
(defn- multicaster-loop [init-state conn ready?]
+ (assert (instance? Connection conn))
(let [input-queue (d/tx-report-queue conn)]
(deliver ready? true)
(loop [old-state init-state]
@@ -147,6 +153,7 @@
nil)))))
(defn- start-multicaster! [conn]
+ (assert (instance? Connection conn))
(let [ready? (promise)]
(future
(log/debug "Multicaster starting for conn" conn)
@@ -165,38 +172,42 @@
(finally
(swap! thread-count dec)
(log/debug "Multicaster exiting for conn" conn))))
- @ready?))
+ (when (= :timeout (deref ready? 30000 :timeout))
+ (throw (RuntimeException. "Timed out waiting for multicaster to start")))))
-(defn- wait-multicast-thread-step [conn]
- ; `get-tx-report-queue-multicast!` should return only when the multicaster thread
- ; has picked up the new queue.
- ;
- ; Otherwise the following could happen:
- ; 1. multicast thread is sleeping
- ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true`
- ; 3: user-thread (or somebody else) calls `stop-multicaster`.
- ; The multicast-state atom is now identical as it was in 1
- ; 4: multicast thread is scheduled and does _not_ detect any state change.
- ; And therefore the multicast thread does _not_ send out an :end token as one would expect.
- ;
- ; Once [:iter-count conn] has changed, we know that the multicaster thread
- ; will see the new queue.
- ; We are still holding the consumer-state-lock, so no other thread
- ; can do any stop-multicasting that would/could corrupt the state.
- ; We can then be sure that the queue will receive the `:end` token when/if
- ; the queue is stopped.
- (let [start-ms (System/currentTimeMillis)
- iter-count (get-in @multicast-state [:iter-count conn] -1)]
- (loop [spin-count 0]
- (if (not= iter-count (get-in @multicast-state [:iter-count conn]))
- nil
- (do
- (let [spent-ms (- (System/currentTimeMillis) start-ms)]
- (if (> spent-ms 30000)
- (throw (RuntimeException. "Timed out waiting for multicaster thread"))
- (do
- (Thread/sleep 16)
- (recur (inc spin-count))))))))))
+(defn- wait-multicast-thread-step [conn])
+; `get-tx-report-queue-multicast!` should return only when the multicaster thread
+; has picked up the new queue.
+;
+; Otherwise the following could happen:
+; 1. multicast thread is sleeping
+; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true`
+; 3: user-thread (or somebody else) calls `stop-multicaster`.
+; The multicast-state atom is now identical as it was in step 1.
+; , Step 2 and 3 happened while the multicast thread was sleeping.
+; 4: The multicast thread is scheduled and does _not_ detect any state change.
+; Therefore the multicast thread does _not_ send out an :end token as one would expect.
+;
+; The new queue is written to memory at this point. No other thread can remove it because
+; we are still, and have been during the modification of multicast-state, holding consumer-state-lock.
+; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock,
+; we can be sure that no other thread changes or has changed the state.
+;
+; Once [:iter-count conn] has changed, we know that the multicaster thread
+; will see the new queue. This means that we can be sure that the queue
+; will receive the `:end` token if the queue is stopped.
+(let [start-ms (System/currentTimeMillis)
+ iter-count (get-in @multicast-state [:iter-count conn] -1)]
+ (loop [spin-count 0]
+ (if (not= iter-count (get-in @multicast-state [:iter-count conn]))
+ nil
+ (do
+ (let [spent-ms (- (System/currentTimeMillis) start-ms)]
+ (if (> spent-ms 30000)
+ (throw (RuntimeException. "Timed out waiting for multicaster thread"))
+ (do
+ (Thread/sleep 16)
+ (recur (inc spin-count)))))))))
(defn get-tx-report-queue-multicast!
"Multicast the datomic.api/tx-report-queue to different consumers.