diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq')
| -rw-r--r-- | src/com/github/ivarref/yoltq/report_queue.clj | 54 |
1 files changed, 27 insertions, 27 deletions
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index b3685b9..f83e3ba 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -182,7 +182,7 @@ (throw (RuntimeException. "Timed out waiting for multicaster to start"))))) (defn- wait-multicast-thread-step - [conn] + [conn state] ; `get-tx-report-queue-multicast!` should return only when the multicaster thread ; has picked up the new queue. ; @@ -201,20 +201,20 @@ ; we can be sure that no other thread changes or has changed the state. ; ; Once [:iter-count conn] has changed, we know that the multicaster thread - ; will see the new queue. This means that we can be sure that the queue + ; has seen the new queue. This means that we can be sure that the queue ; will receive the `:end` token if the queue is stopped. (let [start-ms (System/currentTimeMillis) - iter-count (get-in @multicast-state [:iter-count conn] -1)] + iter-count (get-in state [:iter-count conn] -1)] (loop [spin-count 0] - (if (not= iter-count (get-in @multicast-state [:iter-count conn])) + (if (not= iter-count (locking multicast-state-lock + (get-in @multicast-state [:iter-count conn] -1))) nil - (do - (let [spent-ms (- (System/currentTimeMillis) start-ms)] - (if (> spent-ms 30000) - (throw (RuntimeException. "Timed out waiting for multicaster thread")) - (do - (Thread/sleep 16) - (recur (inc spin-count)))))))))) + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count))))))))) (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. @@ -232,21 +232,21 @@ ([conn id send-end-token?] (assert (instance? Connection conn)) (locking consumer-state-lock - (let [the-q + (let [[new-state the-q] (locking multicast-state-lock (assert (map? @multicast-state)) (if-let [existing-q (get-in @multicast-state [:queues conn id])] (do - (swap! multicast-state - (fn [old-state] - (update-in old-state [:queues conn id] (fn [[end-token? q]] - (if (not= end-token? send-end-token?) - (log/debug "flipped `send-end-token?`") - (log/debug "identical `send-end-token?`")) - [send-end-token? q])))) - (log/debug "Returning existing queue for id" id) - (assert (instance? BlockingQueue (second existing-q))) - (second existing-q)) + (let [new-state (swap! multicast-state + (fn [old-state] + (update-in old-state [:queues conn id] (fn [[end-token? q]] + (if (not= end-token? send-end-token?) + (log/debug "flipped `send-end-token?`") + (log/debug "identical `send-end-token?`")) + [send-end-token? q]))))] + (log/debug "Returning existing queue for id" id) + (assert (instance? BlockingQueue (second existing-q))) + [new-state (second existing-q)])) (let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn])) new-q (LinkedBlockingQueue.) new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))] @@ -254,12 +254,12 @@ (do (start-multicaster! conn) (log/debug "Returning new queue for id" id "(multicaster thread started)") - new-q) + [new-state new-q]) (do (log/debug "Returning new queue for id" id "(multicaster thread already running)") - new-q)))))] + [new-state new-q])))))] ; wait for multicaster thread to pick up current Queue - (wait-multicast-thread-step conn) + (wait-multicast-thread-step conn new-state) the-q)))) (defn- wait-multicast-threads-exit [[old-state new-state]] @@ -418,8 +418,8 @@ @(d/transact conn [{:db/doc "demo"}]) @(d/transact conn [{:db/doc "demo"}]) (log/info "begin drain q1") - (stop-multicaster-id! conn :q1) - (stop-multicaster-id! conn :q1) + (stop-multicast-consumer-id! conn :q1) + (stop-multicast-consumer-id! conn :q1) (println "thread count" @thread-count) (let [qitems-2 (drain! q2) qitems-1 (drain! q1)] |
