aboutsummaryrefslogtreecommitdiff
path: root/src/com
diff options
context:
space:
mode:
Diffstat (limited to 'src/com')
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj54
1 files changed, 27 insertions, 27 deletions
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index b3685b9..f83e3ba 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -182,7 +182,7 @@
(throw (RuntimeException. "Timed out waiting for multicaster to start")))))
(defn- wait-multicast-thread-step
- [conn]
+ [conn state]
; `get-tx-report-queue-multicast!` should return only when the multicaster thread
; has picked up the new queue.
;
@@ -201,20 +201,20 @@
; we can be sure that no other thread changes or has changed the state.
;
; Once [:iter-count conn] has changed, we know that the multicaster thread
- ; will see the new queue. This means that we can be sure that the queue
+ ; has seen the new queue. This means that we can be sure that the queue
; will receive the `:end` token if the queue is stopped.
(let [start-ms (System/currentTimeMillis)
- iter-count (get-in @multicast-state [:iter-count conn] -1)]
+ iter-count (get-in state [:iter-count conn] -1)]
(loop [spin-count 0]
- (if (not= iter-count (get-in @multicast-state [:iter-count conn]))
+ (if (not= iter-count (locking multicast-state-lock
+ (get-in @multicast-state [:iter-count conn] -1)))
nil
- (do
- (let [spent-ms (- (System/currentTimeMillis) start-ms)]
- (if (> spent-ms 30000)
- (throw (RuntimeException. "Timed out waiting for multicaster thread"))
- (do
- (Thread/sleep 16)
- (recur (inc spin-count))))))))))
+ (let [spent-ms (- (System/currentTimeMillis) start-ms)]
+ (if (> spent-ms 30000)
+ (throw (RuntimeException. "Timed out waiting for multicaster thread"))
+ (do
+ (Thread/sleep 16)
+ (recur (inc spin-count)))))))))
(defn get-tx-report-queue-multicast!
"Multicast the datomic.api/tx-report-queue to different consumers.
@@ -232,21 +232,21 @@
([conn id send-end-token?]
(assert (instance? Connection conn))
(locking consumer-state-lock
- (let [the-q
+ (let [[new-state the-q]
(locking multicast-state-lock
(assert (map? @multicast-state))
(if-let [existing-q (get-in @multicast-state [:queues conn id])]
(do
- (swap! multicast-state
- (fn [old-state]
- (update-in old-state [:queues conn id] (fn [[end-token? q]]
- (if (not= end-token? send-end-token?)
- (log/debug "flipped `send-end-token?`")
- (log/debug "identical `send-end-token?`"))
- [send-end-token? q]))))
- (log/debug "Returning existing queue for id" id)
- (assert (instance? BlockingQueue (second existing-q)))
- (second existing-q))
+ (let [new-state (swap! multicast-state
+ (fn [old-state]
+ (update-in old-state [:queues conn id] (fn [[end-token? q]]
+ (if (not= end-token? send-end-token?)
+ (log/debug "flipped `send-end-token?`")
+ (log/debug "identical `send-end-token?`"))
+ [send-end-token? q]))))]
+ (log/debug "Returning existing queue for id" id)
+ (assert (instance? BlockingQueue (second existing-q)))
+ [new-state (second existing-q)]))
(let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn]))
new-q (LinkedBlockingQueue.)
new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))]
@@ -254,12 +254,12 @@
(do
(start-multicaster! conn)
(log/debug "Returning new queue for id" id "(multicaster thread started)")
- new-q)
+ [new-state new-q])
(do
(log/debug "Returning new queue for id" id "(multicaster thread already running)")
- new-q)))))]
+ [new-state new-q])))))]
; wait for multicaster thread to pick up current Queue
- (wait-multicast-thread-step conn)
+ (wait-multicast-thread-step conn new-state)
the-q))))
(defn- wait-multicast-threads-exit [[old-state new-state]]
@@ -418,8 +418,8 @@
@(d/transact conn [{:db/doc "demo"}])
@(d/transact conn [{:db/doc "demo"}])
(log/info "begin drain q1")
- (stop-multicaster-id! conn :q1)
- (stop-multicaster-id! conn :q1)
+ (stop-multicast-consumer-id! conn :q1)
+ (stop-multicast-consumer-id! conn :q1)
(println "thread count" @thread-count)
(let [qitems-2 (drain! q2)
qitems-1 (drain! q1)]