diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq/report_queue.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq/report_queue.clj | 91 |
1 files changed, 51 insertions, 40 deletions
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 239de12..2a2e489 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -45,15 +45,20 @@ (assert (instance? BlockingQueue q)) (log/debug "tx-report-queue-given:" tx-report-queue-given) (try - (while @running? - (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] - (process-poll-result! @config-atom - id-ident - poll-result - (fn [f] - (when @running? - (.execute ^ScheduledExecutorService pool f))))) - (deliver ready? :ready)) + (let [running-local? (atom true)] + (while (and @running? @running-local?) + (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] + (if (= poll-result :end) + (do + (reset! running-local? false) + #_(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + (process-poll-result! @config-atom + id-ident + poll-result + (fn [f] + (when @running? + (.execute ^ScheduledExecutorService pool f)))))) + (deliver ready? :ready))) (catch Throwable t (log/error t "Unexpected error in report-queue-listener:" (.getMessage t))) (finally @@ -128,6 +133,7 @@ (defonce ^:private thread-count (atom 0)) (defn- multicaster-loop [init-state conn ready?] + (assert (instance? Connection conn)) (let [input-queue (d/tx-report-queue conn)] (deliver ready? true) (loop [old-state init-state] @@ -147,6 +153,7 @@ nil))))) (defn- start-multicaster! [conn] + (assert (instance? Connection conn)) (let [ready? (promise)] (future (log/debug "Multicaster starting for conn" conn) @@ -165,38 +172,42 @@ (finally (swap! thread-count dec) (log/debug "Multicaster exiting for conn" conn)))) - @ready?)) + (when (= :timeout (deref ready? 30000 :timeout)) + (throw (RuntimeException. "Timed out waiting for multicaster to start"))))) -(defn- wait-multicast-thread-step [conn] - ; `get-tx-report-queue-multicast!` should return only when the multicaster thread - ; has picked up the new queue. - ; - ; Otherwise the following could happen: - ; 1. multicast thread is sleeping - ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` - ; 3: user-thread (or somebody else) calls `stop-multicaster`. - ; The multicast-state atom is now identical as it was in 1 - ; 4: multicast thread is scheduled and does _not_ detect any state change. - ; And therefore the multicast thread does _not_ send out an :end token as one would expect. - ; - ; Once [:iter-count conn] has changed, we know that the multicaster thread - ; will see the new queue. - ; We are still holding the consumer-state-lock, so no other thread - ; can do any stop-multicasting that would/could corrupt the state. - ; We can then be sure that the queue will receive the `:end` token when/if - ; the queue is stopped. - (let [start-ms (System/currentTimeMillis) - iter-count (get-in @multicast-state [:iter-count conn] -1)] - (loop [spin-count 0] - (if (not= iter-count (get-in @multicast-state [:iter-count conn])) - nil - (do - (let [spent-ms (- (System/currentTimeMillis) start-ms)] - (if (> spent-ms 30000) - (throw (RuntimeException. "Timed out waiting for multicaster thread")) - (do - (Thread/sleep 16) - (recur (inc spin-count)))))))))) +(defn- wait-multicast-thread-step [conn]) +; `get-tx-report-queue-multicast!` should return only when the multicaster thread +; has picked up the new queue. +; +; Otherwise the following could happen: +; 1. multicast thread is sleeping +; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` +; 3: user-thread (or somebody else) calls `stop-multicaster`. +; The multicast-state atom is now identical as it was in step 1. +; , Step 2 and 3 happened while the multicast thread was sleeping. +; 4: The multicast thread is scheduled and does _not_ detect any state change. +; Therefore the multicast thread does _not_ send out an :end token as one would expect. +; +; The new queue is written to memory at this point. No other thread can remove it because +; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. +; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, +; we can be sure that no other thread changes or has changed the state. +; +; Once [:iter-count conn] has changed, we know that the multicaster thread +; will see the new queue. This means that we can be sure that the queue +; will receive the `:end` token if the queue is stopped. +(let [start-ms (System/currentTimeMillis) + iter-count (get-in @multicast-state [:iter-count conn] -1)] + (loop [spin-count 0] + (if (not= iter-count (get-in @multicast-state [:iter-count conn])) + nil + (do + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count))))))))) (defn get-tx-report-queue-multicast! "Multicast the datomic.api/tx-report-queue to different consumers. |
