aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorire <refsdal.ivar@gmail.com>2025-05-20 22:43:39 +0200
committerire <refsdal.ivar@gmail.com>2025-05-20 22:43:39 +0200
commit4797e559410bce644c40b05fa9a321171a781e78 (patch)
treef0d0b2a63b8a3a262391c12e6f9cf2dce01f8417
parentFix tx-report-queue sharing #7 (diff)
downloadfiinha-4797e559410bce644c40b05fa9a321171a781e78.tar.gz
fiinha-4797e559410bce644c40b05fa9a321171a781e78.tar.xz
Improve tx-report-queue sharing #7
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj342
-rw-r--r--test/com/github/ivarref/yoltq/log_init.clj2
2 files changed, 283 insertions, 61 deletions
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index 9cddc93..239de12 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -6,7 +6,6 @@
(:import (datomic Connection Datom)
(java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit)))
-
(defn process-poll-result! [cfg id-ident poll-result consumer]
(let [{:keys [tx-data db-after]} poll-result]
(when-let [new-ids (->> tx-data
@@ -30,7 +29,6 @@
(catch Throwable t
(log/error t "Unexpected error in process-poll-result!")))))))))
-
(defn report-queue-listener [running?
ready?
^ScheduledExecutorService pool
@@ -65,80 +63,253 @@
(log/debug "Remove tx-report-queue")
(d/remove-tx-report-queue conn)))))))
-(defonce ^:private multicast-state-lock (Object.))
+; https://stackoverflow.com/a/14488425
+(defn- dissoc-in
+ "Dissociates an entry from a nested associative structure returning a new
+ nested structure. keys is a sequence of keys. Any empty maps that result
+ will not be present in the new structure."
+ [m [k & ks :as keys]]
+ (if ks
+ (if-let [nextmap (get m k)]
+ (let [newmap (dissoc-in nextmap ks)]
+ (if (seq newmap)
+ (assoc m k newmap)
+ (dissoc m k)))
+ m)
+ (dissoc m k)))
+
+(defn- queues-to-shutdown [old-state new-state]
+ (assert (map? old-state))
+ (assert (map? new-state))
+ (doseq [x (vals new-state)]
+ (assert (vector? x)))
+ (doseq [x (vals old-state)]
+ (assert (vector? x)))
+ (let [new-qs (into #{} (mapv second (vals new-state)))]
+ (reduce
+ (fn [o [send-end-token? old-q]]
+ ;(assert (boolean? send-end-token?))
+ ;(assert (instance? BlockingQueue old-q))
+ (if (contains? new-qs old-q)
+ o
+ (conj o [send-end-token? old-q])))
+ []
+ (vals old-state))))
+
+(comment
+ (queues-to-shutdown {:a [true 999] :b [false 777]}
+ {:a [true 123] :b [true 777]}))
+(defn- multicast-once [conn work-item old-state new-state]
+ (assert (map? old-state))
+ (assert (map? new-state))
+ (doseq [[send-end-token? q-to-shutdown] (queues-to-shutdown old-state new-state)]
+ (if send-end-token?
+ (do
+ #_(log/debug "offering :end token")
+ (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS))
+ (do
+ #_(log/debug "not offering :end token"))))
+ (when (seq new-state)
+ (if (some? work-item)
+ (reduce-kv
+ (fn [m id [send-end-token? q]]
+ (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)]
+ (if (true? ok-offer)
+ (assoc m id [send-end-token? q])
+ (log/warn "Failed to offer item in multicaster for connection" conn "and queue id" id))))
+ {}
+ new-state)
+ new-state)))
+
+(defonce ^:private multicast-state-lock (Object.))
+(defonce ^:private consumer-state-lock (Object.))
(defonce ^:private multicast-state (atom {}))
+(defonce ^:private thread-count (atom 0))
+
+(defn- multicaster-loop [init-state conn ready?]
+ (let [input-queue (d/tx-report-queue conn)]
+ (deliver ready? true)
+ (loop [old-state init-state]
+ (let [work-item (.poll ^BlockingQueue input-queue 16 TimeUnit/MILLISECONDS)
+ new-state (locking multicast-state-lock
+ ; writer to `multicast-state` must be protected by `multicast-state-lock`
+ ; it should block minimally / spend minimum amount of time
+ (swap! multicast-state (fn [old-state] (update-in old-state [:iter-count conn] (fnil inc 0))))
+ (if-let [new-state (multicast-once conn work-item old-state (get-in @multicast-state [:queues conn] {}))]
+ new-state
+ (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))
+ (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec)))
+ (d/remove-tx-report-queue conn)
+ nil)))]
+ (if new-state
+ (recur new-state)
+ nil)))))
(defn- start-multicaster! [conn]
- (let [multicaster-ready? (promise)]
+ (let [ready? (promise)]
(future
(log/debug "Multicaster starting for conn" conn)
(try
- (let [input-queue (d/tx-report-queue conn)]
- (loop []
- (when-let [mcast-state (get @multicast-state conn)]
- (when-let [dest-queues (vals mcast-state)]
- (let [element (.poll ^BlockingQueue input-queue 1 TimeUnit/SECONDS)]
- (deliver multicaster-ready? :ready)
- (when (some? element)
- (doseq [q dest-queues]
- (let [ok-offer (.offer ^BlockingQueue q element 30 TimeUnit/MINUTES)]
- (when (false? ok-offer)
- (log/error "Failed to offer item in multicaster for connection" conn))))))
- (recur)))))
+ (swap! thread-count inc)
+ (let [new-state (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] (fnil inc 0))))]
+ (assert (= 1 (get-in new-state [:thread-count conn])))
+ ; "parent" thread holds `multicast-state-lock` and
+ ; waits for `ready?` promise, so effectively this new thread also holds
+ ; the lock until `ready?` is delivered. That is: it is safe
+ ; for this thread to modify multicast-state regardless of what other threads are doing
+ (multicaster-loop (get-in new-state [:queues conn]) conn ready?))
(catch Throwable t
- (deliver multicaster-ready? :error)
- (log/error t "Unexpected error in multicaster:" (.getMessage t)))
+ (log/error t "Unexpected error in multicaster:" (.getMessage t))
+ (log/error "Multicaster exiting for conn"))
(finally
- (d/remove-tx-report-queue conn)
+ (swap! thread-count dec)
(log/debug "Multicaster exiting for conn" conn))))
- multicaster-ready?))
+ @ready?))
+
+(defn- wait-multicast-thread-step [conn]
+ ; `get-tx-report-queue-multicast!` should return only when the multicaster thread
+ ; has picked up the new queue.
+ ;
+ ; Otherwise the following could happen:
+ ; 1. multicast thread is sleeping
+ ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true`
+ ; 3: user-thread (or somebody else) calls `stop-multicaster`.
+ ; The multicast-state atom is now identical as it was in 1
+ ; 4: multicast thread is scheduled and does _not_ detect any state change.
+ ; And therefore the multicast thread does _not_ send out an :end token as one would expect.
+ ;
+ ; Once [:iter-count conn] has changed, we know that the multicaster thread
+ ; will see the new queue.
+ ; We are still holding the consumer-state-lock, so no other thread
+ ; can do any stop-multicasting that would/could corrupt the state.
+ ; We can then be sure that the queue will receive the `:end` token when/if
+ ; the queue is stopped.
+ (let [start-ms (System/currentTimeMillis)
+ iter-count (get-in @multicast-state [:iter-count conn] -1)]
+ (loop [spin-count 0]
+ (if (not= iter-count (get-in @multicast-state [:iter-count conn]))
+ nil
+ (do
+ (let [spent-ms (- (System/currentTimeMillis) start-ms)]
+ (if (> spent-ms 30000)
+ (throw (RuntimeException. "Timed out waiting for multicaster thread"))
+ (do
+ (Thread/sleep 16)
+ (recur (inc spin-count))))))))))
(defn get-tx-report-queue-multicast!
"Multicast the datomic.api/tx-report-queue to different consumers.
The multicaster is started on demand. `conn` and `id` identifies the consumer.
Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
- [conn id]
- (assert (instance? Connection conn))
- (assert (keyword? id))
- (locking multicast-state-lock
- (assert (map? @multicast-state))
- (if-let [existing-q (get-in @multicast-state [conn id])]
- (do
- (log/debug "returning existing queue for id" id)
- (assert (instance? BlockingQueue existing-q))
- existing-q)
- (let [needs-multicaster? (not (contains? @multicast-state conn))
- new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [conn id] (LinkedBlockingQueue.))))]
- (when needs-multicaster?
- (let [multicaster-promise (start-multicaster! conn)
- multicaster-result (deref multicaster-promise (* 30 60000) :timeout)]
- (cond (= multicaster-result :timeout)
- (do
- (log/error "Timeout waiting for multicaster to start")
- (throw (RuntimeException. "Timeout waiting for multicaster to start")))
- (= multicaster-result :error)
- (do
- (log/error "Multicaster failed to start")
- (throw (RuntimeException. "Multicaster failed to start")))
- (= multicaster-result :ready)
- (log/debug "Multicaster is ready")
+ ([conn id]
+ (get-tx-report-queue-multicast! conn id false))
+ ([conn id send-end-token?]
+ (assert (instance? Connection conn))
+ (assert (keyword? id))
+ (locking consumer-state-lock
+ (let [the-q
+ (locking multicast-state-lock
+ (assert (map? @multicast-state))
+ (if-let [existing-q (get-in @multicast-state [:queues conn id])]
+ (do
+ (swap! multicast-state
+ (fn [old-state]
+ (update-in old-state [:queues conn id] (fn [[end-token? q]]
+ (if (not= end-token? send-end-token?)
+ (log/debug "flipped `send-end-token?`")
+ (log/debug "identical `send-end-token?`"))
+ [send-end-token? q]))))
+ (log/debug "Returning existing queue for id" id)
+ (assert (instance? BlockingQueue (second existing-q)))
+ (second existing-q))
+ (let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn]))
+ new-q (LinkedBlockingQueue.)
+ new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))]
+ (if needs-multicaster?
+ (do
+ (start-multicaster! conn)
+ (log/debug "Multicaster thread started. Returning new queue for id" id)
+ new-q)
+ (do
+ (log/debug "Multicaster thread already exists. Returning new queue for id" id)
+ new-q)))))]
+ ; wait for multicaster thread to pick up current Queue
+ (wait-multicast-thread-step conn)
+ the-q))))
- :else
+(defn wait-multicast-threads-exit [[old-state new-state]]
+ (assert (map? old-state))
+ (assert (map? new-state))
+ (assert (map? (get old-state :queues {})))
+ (assert (map? (get new-state :queues {})))
+ (assert (map? (get old-state :thread-count {})))
+ (assert (map? (get new-state :thread-count {})))
+ (locking consumer-state-lock
+ ; No new multicast threads will be launched inside this block.
+ ; The lock is already held by parent function.
+ ;
+ ; Why do we need to _wait_ for multicaster thread(s) to exit after
+ ; removing all queue ids for a given connection?
+ ; Otherwise the following could happen:
+ ; 1. multicaster thread is sleeping
+ ; 2. user calls stop-multicaster!
+ ; One would expect that multicaster thread would exit, but it is still sleeping
+ ; 3. user calls get-tx-report-queue-multicast! with the same conn
+ ; The state is now empty, so a new multicaster thread is spawned.
+ ; 4. Now there is two multicaster threads for the same connection!
+ ; ... and since the datomic report queue can be shared between threads
+ ; it will seemingly work, but when the end event is sent, it will be
+ ; sent by multiple threads.
+ (let [old-conns (into #{} (keys (get old-state :queues {})))
+ new-conns (into #{} (keys (get new-state :queues {})))]
+ (doseq [old-conn old-conns]
+ (when-not (contains? new-conns old-conn)
+ (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)]
+ (assert (= 1 old-threadcount))
+ (let [start-ms (System/currentTimeMillis)]
+ (loop []
+ (if (= 0 (get-in @multicast-state [:thread-count old-conn]))
+ :ok
(do
- (log/error "Unexpected state from multicaster:" multicaster-result)
- (throw (RuntimeException. (str "Unexpected state from multicaster: " multicaster-result)))))))
- (let [new-q (get-in new-state [conn id])]
- (assert (instance? BlockingQueue new-q))
- new-q)))))
+ (let [spent-ms (- (System/currentTimeMillis) start-ms)]
+ (if (> spent-ms 30000)
+ (throw (RuntimeException. "Timed out waiting for multicaster thread to exit"))
+ (do
+ (Thread/sleep 16)
+ (recur))))))))))))))
+
+(defn stop-multicaster-id! [conn id]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (swap-vals! multicast-state (fn [old-state]
+ (let [new-state (dissoc-in old-state [:queues conn id])]
+ (if (= {} (get-in new-state [:queues conn]))
+ (dissoc-in old-state [:queues conn])
+ new-state))))))))
+
+(defn stop-multicaster! [conn]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))))))
(defn stop-all-multicasters! []
- (reset! multicast-state {}))
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))))))
(comment
(do
(require 'com.github.ivarref.yoltq.log-init)
+ (defn drain! [^BlockingQueue q]
+ (loop [items []]
+ (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)]
+ (recur (conj items elem))
+ items)))
(com.github.ivarref.yoltq.log-init/init-logging!
[[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
[#{"com.github.ivarref.yoltq.report-queue"} :debug]
@@ -153,20 +324,71 @@
conn))))
(comment
- (defn drain! [^BlockingQueue q]
- (loop [cnt 0]
- (if (nil? (.poll q 1 TimeUnit/SECONDS))
- cnt
- (recur (inc cnt))))))
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (defn drain! [^BlockingQueue q]
+ (loop [items []]
+ (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)]
+ (recur (conj items elem))
+ items)))
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (log/info "********************************")
+ (defonce conn (let [uri (str "datomic:mem://demo")
+ _ (d/delete-database uri)
+ _ (d/create-database uri)
+ conn (d/connect uri)]
+ conn))
+ (log/info "stop-all!")
+ (stop-all-multicasters!)
+ (assert (= 0 @thread-count))
+ (let [q1 (get-tx-report-queue-multicast! conn :q1 false)
+ q2 (get-tx-report-queue-multicast! conn :q2 false)
+ _ (get-tx-report-queue-multicast! conn :q1 true)]
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ (log/info "begin drain q1")
+ (stop-multicaster-id! conn :q1)
+ (println "thread count" @thread-count)
+ (let [qitems-2 (drain! q2)
+ qitems-1 (drain! q1)]
+ (assert (= :end (last qitems-1)))
+ (println "drain count q1:" (count qitems-1))
+ (println "drain count q2:" (count qitems-2))))))
+
+(comment
+ (do
+ (let [q (get-tx-report-queue-multicast! conn :q1 true)]
+ (log/debug "stopping id :q1")
+ (stop-multicaster-id! conn :q1)
+ (let [drained (drain! q)]
+ (println "drained:" drained)
+ (assert (= [:end] drained)))
+ @multicast-state)))
(comment
- (let [q-1 (get-tx-report-queue-multicast! conn :q1)
- q-2 (get-tx-report-queue-multicast! conn :q2)]))
+ (stop-all-multicasters!))
(comment
- (drain! (get-tx-report-queue-multicast! conn :q1)))
+ (do
+ (let [q (get-tx-report-queue-multicast! conn :q2 false)]
+ (println "drain count:" (count (drain! q)))
+ @multicast-state
+ nil)))
+
+(comment
+ (get-tx-report-queue-multicast! conn :q1 false)
+ (get-tx-report-queue-multicast! conn :q1 true))
(comment
(do
@(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
:yay)) \ No newline at end of file
diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj
index f3fb6dc..7eae557 100644
--- a/test/com/github/ivarref/yoltq/log_init.clj
+++ b/test/com/github/ivarref/yoltq/log_init.clj
@@ -48,7 +48,7 @@
(color-f (force msg_))
- #_maybe-stacktrace))))
+ maybe-stacktrace))))
(catch Throwable t