aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/report_queue.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq/report_queue.clj')
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj146
1 files changed, 100 insertions, 46 deletions
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index a9f7e07..c3fd383 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -52,9 +52,9 @@
(when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
(if (= poll-result :end)
(do
- (log/debug "Report queue listener received :end token. Exiting")
+ (log/debug "report-queue-listener received :end token. Exiting")
(reset! running-local? false))
- ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system."))
+ ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system."))
(process-poll-result! @config-atom
id-ident
poll-result
@@ -115,9 +115,11 @@
(if send-end-token?
(do
#_(log/debug "offering :end token")
- (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS))
+ (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)
+ (log/debug "Multicaster sent :end token")
+ (log/debug "Multicaster failed to send :end token")))
(do
- #_(log/debug "not offering :end token"))))
+ (log/debug "Multicaster not sending :end token"))))
(when (seq new-state)
(if (some? work-item)
(reduce-kv
@@ -125,7 +127,7 @@
(let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)]
(if (true? ok-offer)
(assoc m id [send-end-token? q])
- (log/warn "Failed to offer item in multicaster for connection" conn "and queue id" id))))
+ (log/error "Multicaster failed to offer item for connection" conn "and queue id" id))))
{}
new-state)
new-state)))
@@ -150,6 +152,7 @@
(do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))
(swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec)))
(d/remove-tx-report-queue conn)
+ (log/debug "Multicaster removed tx-report-queue for conn" conn)
nil)))]
(if new-state
(recur new-state)
@@ -180,26 +183,26 @@
(defn- wait-multicast-thread-step
[conn]
-; `get-tx-report-queue-multicast!` should return only when the multicaster thread
-; has picked up the new queue.
-;
-; Otherwise the following could happen:
-; 1. multicast thread is sleeping
-; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true`
-; 3: user-thread (or somebody else) calls `stop-multicaster`.
-; The multicast-state atom is now identical as it was in step 1.
-; , Step 2 and 3 happened while the multicast thread was sleeping.
-; 4: The multicast thread is scheduled and does _not_ detect any state change.
-; Therefore the multicast thread does _not_ send out an :end token as one would expect.
-;
-; The new queue is written to memory at this point. No other thread can remove it because
-; we are still, and have been during the modification of multicast-state, holding consumer-state-lock.
-; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock,
-; we can be sure that no other thread changes or has changed the state.
-;
-; Once [:iter-count conn] has changed, we know that the multicaster thread
-; will see the new queue. This means that we can be sure that the queue
-; will receive the `:end` token if the queue is stopped.
+ ; `get-tx-report-queue-multicast!` should return only when the multicaster thread
+ ; has picked up the new queue.
+ ;
+ ; Otherwise the following could happen:
+ ; 1. multicast thread is sleeping
+ ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true`
+ ; 3: user-thread (or somebody else) calls `stop-multicaster`.
+ ; The multicast-state atom is now identical as it was in step 1.
+ ; , Step 2 and 3 happened while the multicast thread was sleeping.
+ ; 4: The multicast thread is scheduled and does _not_ detect any state change.
+ ; Therefore the multicast thread does _not_ send out an :end token as one would expect.
+ ;
+ ; The new queue is written to memory at this point. No other thread can remove it because
+ ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock.
+ ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock,
+ ; we can be sure that no other thread changes or has changed the state.
+ ;
+ ; Once [:iter-count conn] has changed, we know that the multicaster thread
+ ; will see the new queue. This means that we can be sure that the queue
+ ; will receive the `:end` token if the queue is stopped.
(let [start-ms (System/currentTimeMillis)
iter-count (get-in @multicast-state [:iter-count conn] -1)]
(loop [spin-count 0]
@@ -250,10 +253,10 @@
(if needs-multicaster?
(do
(start-multicaster! conn)
- (log/debug "Multicaster thread started. Returning new queue for id" id)
+ (log/debug "Returning new queue for id" id "(multicaster thread started)")
new-q)
(do
- (log/debug "Multicaster thread already exists. Returning new queue for id" id)
+ (log/debug "Returning new queue for id" id "(multicaster thread already running)")
new-q)))))]
; wait for multicaster thread to pick up current Queue
(wait-multicast-thread-step conn)
@@ -306,32 +309,83 @@
(Thread/sleep 16)
(recur))))))))))))))
+(defn- all-queues [state]
+ (->> (mapcat (fn [[conn qmap]]
+ (mapv (fn [q-id] [conn q-id])
+ (keys qmap)))
+ (seq (get state :queues {})))
+ (into #{})))
+
+(comment
+ (do
+ (assert (= #{}
+ (all-queues {})))
+ (assert (= #{}
+ (all-queues {:queues {}})))
+ (assert (= #{[:conn-a :q-id]}
+ (all-queues {:queues {:conn-a {:q-id 1}}})))
+ (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]}
+ (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}})))
+ (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]}
+ (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}
+ :conn-b {:q-id-3 3}}})))))
+
+(defn- removed-queues? [old new]
+ (not= (all-queues old)
+ (all-queues new)))
+
(defn stop-multicaster-id! [conn id]
(assert (instance? Connection conn))
- (locking consumer-state-lock
- (wait-multicast-threads-exit
- (locking multicast-state-lock
- (swap-vals! multicast-state (fn [old-state]
- (let [new-state (dissoc-in old-state [:queues conn id])]
- (if (= {} (get-in new-state [:queues conn]))
- (dissoc-in old-state [:queues conn])
- new-state)))))))
- nil)
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state]
+ (let [new-state (dissoc-in old-state [:queues conn id])]
+ (if (= {} (get-in new-state [:queues conn]))
+ (dissoc-in old-state [:queues conn])
+ new-state))))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
(defn stop-multicaster! [conn]
(assert (instance? Connection conn))
- (locking consumer-state-lock
- (wait-multicast-threads-exit
- (locking multicast-state-lock
- (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))))))
- nil)
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
(defn stop-all-multicasters! []
- (locking consumer-state-lock
- (wait-multicast-threads-exit
- (locking multicast-state-lock
- (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {}))))))
- nil)
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (require '[datomic.api :as d])
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (defonce conn (let [uri (str "datomic:mem://demo")
+ _ (d/delete-database uri)
+ _ (d/create-database uri)
+ conn (d/connect uri)]
+ conn))))
(comment
(do