aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md37
-rw-r--r--deps.edn12
-rw-r--r--src/com/github/ivarref/yoltq.clj23
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj146
4 files changed, 163 insertions, 55 deletions
diff --git a/README.md b/README.md
index f84a336..675fa9a 100644
--- a/README.md
+++ b/README.md
@@ -441,21 +441,48 @@ then not grab the datomic report queue, but use the one provided:
```clojure
(require '[com.github.ivarref.yoltq :as yq])
(yq/init! {:conn conn
- :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq)
+ :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq)
; ^^ can be any `java.util.concurrent.BlockingQueue` value
})
-(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id))
+(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id))
```
Added multicast support for `datomic.api/tx-report-queue`:
```clojure
-(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1))
+(require '[com.github.ivarref.yoltq :as yq])
+(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1))
; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue`
-(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2))
-; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue`
+(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2))
+; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` for the given `conn`
+
+(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true))
+; my-q3 specifies the third argument, `send-end-token?`, to true, so it will receive `:end` if the queue is stopped.
+; This can enable simpler consuming of queues:
+(future
+ (loop []
+ (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)]
+ (if (= q-item :end)
+ (println "Time to exit. Goodbye!")
+ (do
+ (println "Processing q-item" q-item)
+ (recur))))))
+
+@(d/transact conn [{:db/doc "new-data"}])
+
+; Stop the queue:
+(yq/stop-multicaster-id! conn :q-id-3)
+=> true
+; The multicaster thread will send `:end` and the consumer thread will then print "Time to exit. Goodbye!".
+
+; if the queue is already stopped (or never was started), `stop-multicaster...` functions will return false:
+(yq/stop-multicaster-id! conn :already-stopped-queue-or-typo)
+=> false
+
+; Stop all queues for all connections:
+(yq/stop-all-multicasters!)
```
`yq/get-tx-report-queue-multicast!` returns, like
diff --git a/deps.edn b/deps.edn
index 1e3fa9d..a328c86 100644
--- a/deps.edn
+++ b/deps.edn
@@ -22,6 +22,18 @@
:sha "0e8731e0f24db05b74769e219051b0e92b50624a"}}
:main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]}
+ :repl {:extra-paths ["test"]
+ :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"}
+ ivarref/datomic-schema {:mvn/version "0.2.0"}
+ com.taoensso/timbre {:mvn/version "5.2.1"}
+ com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
+ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
+ org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
+ com.taoensso/nippy {:mvn/version "3.2.0"}}
+ :exec-fn rebel-readline.tool/repl
+ :exec-args {}
+ :main-opts ["-m" "rebel-readline.main"]}
+
:release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}}
:deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}}
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 80c9491..298b9d5 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -407,7 +407,11 @@
If this is the last report destination for the given `conn`, the multicaster thread will exit.
Repeated calls are no-op.
- Returns nil."
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if the queue was stopped.
+ Return `false` if the queue does not exist."
[conn id]
(assert (instance? Connection conn))
(rq/stop-multicaster-id! conn id))
@@ -417,7 +421,11 @@
The multicaster thread will exit.
Repeated calls are no-op.
- Returns nil."
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if any queue belonging to `conn` was stopped.
+ Returns `false` is `conn` did not have any associated queues."
[conn]
(assert (instance? Connection conn))
(rq/stop-multicaster! conn))
@@ -427,7 +435,11 @@
All multicaster threads will exit.
Repeated calls are no-op.
- Returns nil."
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if any queue was stopped.
+ Returns `false` if no queues existed."
[]
(rq/stop-all-multicasters!))
@@ -485,7 +497,7 @@
started-consuming? (promise)
n 1]
(init! {:conn conn
- :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq)
+ :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true)
:slow-thread-show-stacktrace? false})
(add-consumer! :q (fn [_]
(deliver started-consuming? true)))
@@ -493,8 +505,11 @@
(start!)
(log/info "begin start! ... Done")
(Thread/sleep 2000)
+ (log/info "*******************************************")
@(d/transact conn [(put :q {:work 123})])
@started-consuming?
+ (stop-multicaster! conn)
+ (log/info "*******************************************")
(stop!)
(log/info "stop! done")
nil)))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
index a9f7e07..c3fd383 100644
--- a/src/com/github/ivarref/yoltq/report_queue.clj
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -52,9 +52,9 @@
(when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
(if (= poll-result :end)
(do
- (log/debug "Report queue listener received :end token. Exiting")
+ (log/debug "report-queue-listener received :end token. Exiting")
(reset! running-local? false))
- ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system."))
+ ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system."))
(process-poll-result! @config-atom
id-ident
poll-result
@@ -115,9 +115,11 @@
(if send-end-token?
(do
#_(log/debug "offering :end token")
- (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS))
+ (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)
+ (log/debug "Multicaster sent :end token")
+ (log/debug "Multicaster failed to send :end token")))
(do
- #_(log/debug "not offering :end token"))))
+ (log/debug "Multicaster not sending :end token"))))
(when (seq new-state)
(if (some? work-item)
(reduce-kv
@@ -125,7 +127,7 @@
(let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)]
(if (true? ok-offer)
(assoc m id [send-end-token? q])
- (log/warn "Failed to offer item in multicaster for connection" conn "and queue id" id))))
+ (log/error "Multicaster failed to offer item for connection" conn "and queue id" id))))
{}
new-state)
new-state)))
@@ -150,6 +152,7 @@
(do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))
(swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec)))
(d/remove-tx-report-queue conn)
+ (log/debug "Multicaster removed tx-report-queue for conn" conn)
nil)))]
(if new-state
(recur new-state)
@@ -180,26 +183,26 @@
(defn- wait-multicast-thread-step
[conn]
-; `get-tx-report-queue-multicast!` should return only when the multicaster thread
-; has picked up the new queue.
-;
-; Otherwise the following could happen:
-; 1. multicast thread is sleeping
-; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true`
-; 3: user-thread (or somebody else) calls `stop-multicaster`.
-; The multicast-state atom is now identical as it was in step 1.
-; , Step 2 and 3 happened while the multicast thread was sleeping.
-; 4: The multicast thread is scheduled and does _not_ detect any state change.
-; Therefore the multicast thread does _not_ send out an :end token as one would expect.
-;
-; The new queue is written to memory at this point. No other thread can remove it because
-; we are still, and have been during the modification of multicast-state, holding consumer-state-lock.
-; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock,
-; we can be sure that no other thread changes or has changed the state.
-;
-; Once [:iter-count conn] has changed, we know that the multicaster thread
-; will see the new queue. This means that we can be sure that the queue
-; will receive the `:end` token if the queue is stopped.
+ ; `get-tx-report-queue-multicast!` should return only when the multicaster thread
+ ; has picked up the new queue.
+ ;
+ ; Otherwise the following could happen:
+ ; 1. multicast thread is sleeping
+ ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true`
+ ; 3: user-thread (or somebody else) calls `stop-multicaster`.
+ ; The multicast-state atom is now identical as it was in step 1.
+ ; , Step 2 and 3 happened while the multicast thread was sleeping.
+ ; 4: The multicast thread is scheduled and does _not_ detect any state change.
+ ; Therefore the multicast thread does _not_ send out an :end token as one would expect.
+ ;
+ ; The new queue is written to memory at this point. No other thread can remove it because
+ ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock.
+ ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock,
+ ; we can be sure that no other thread changes or has changed the state.
+ ;
+ ; Once [:iter-count conn] has changed, we know that the multicaster thread
+ ; will see the new queue. This means that we can be sure that the queue
+ ; will receive the `:end` token if the queue is stopped.
(let [start-ms (System/currentTimeMillis)
iter-count (get-in @multicast-state [:iter-count conn] -1)]
(loop [spin-count 0]
@@ -250,10 +253,10 @@
(if needs-multicaster?
(do
(start-multicaster! conn)
- (log/debug "Multicaster thread started. Returning new queue for id" id)
+ (log/debug "Returning new queue for id" id "(multicaster thread started)")
new-q)
(do
- (log/debug "Multicaster thread already exists. Returning new queue for id" id)
+ (log/debug "Returning new queue for id" id "(multicaster thread already running)")
new-q)))))]
; wait for multicaster thread to pick up current Queue
(wait-multicast-thread-step conn)
@@ -306,32 +309,83 @@
(Thread/sleep 16)
(recur))))))))))))))
+(defn- all-queues [state]
+ (->> (mapcat (fn [[conn qmap]]
+ (mapv (fn [q-id] [conn q-id])
+ (keys qmap)))
+ (seq (get state :queues {})))
+ (into #{})))
+
+(comment
+ (do
+ (assert (= #{}
+ (all-queues {})))
+ (assert (= #{}
+ (all-queues {:queues {}})))
+ (assert (= #{[:conn-a :q-id]}
+ (all-queues {:queues {:conn-a {:q-id 1}}})))
+ (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]}
+ (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}})))
+ (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]}
+ (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}
+ :conn-b {:q-id-3 3}}})))))
+
+(defn- removed-queues? [old new]
+ (not= (all-queues old)
+ (all-queues new)))
+
(defn stop-multicaster-id! [conn id]
(assert (instance? Connection conn))
- (locking consumer-state-lock
- (wait-multicast-threads-exit
- (locking multicast-state-lock
- (swap-vals! multicast-state (fn [old-state]
- (let [new-state (dissoc-in old-state [:queues conn id])]
- (if (= {} (get-in new-state [:queues conn]))
- (dissoc-in old-state [:queues conn])
- new-state)))))))
- nil)
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state]
+ (let [new-state (dissoc-in old-state [:queues conn id])]
+ (if (= {} (get-in new-state [:queues conn]))
+ (dissoc-in old-state [:queues conn])
+ new-state))))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
(defn stop-multicaster! [conn]
(assert (instance? Connection conn))
- (locking consumer-state-lock
- (wait-multicast-threads-exit
- (locking multicast-state-lock
- (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))))))
- nil)
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
(defn stop-all-multicasters! []
- (locking consumer-state-lock
- (wait-multicast-threads-exit
- (locking multicast-state-lock
- (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {}))))))
- nil)
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (require '[datomic.api :as d])
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (defonce conn (let [uri (str "datomic:mem://demo")
+ _ (d/delete-database uri)
+ _ (d/create-database uri)
+ conn (d/connect uri)]
+ conn))))
(comment
(do