From ea40c5dbc2b47d6fd2a23236828dc9e4ab1f77dc Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sat, 4 Sep 2021 13:23:07 +0200 Subject: Initial commit Add release script Release 0.1.3 Use com.github.ivarref.yoltq namespace Use com.github.ivarref.yoltq namespace --- deps.edn | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 deps.edn (limited to 'deps.edn') diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..cf8297c --- /dev/null +++ b/deps.edn @@ -0,0 +1,28 @@ +{:deps {org.clojure/tools.logging {:mvn/version "1.1.0"} + org.clojure/clojure {:mvn/version "1.10.3"}} + + :paths ["src"] + + :aliases {:test {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.1.2"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + + :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" + :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} + :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} + + :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.1.3"}} + :main-opts ["-m" "deps-deploy.deps-deploy" "deploy" + "target/out.jar" "true"]}} + + :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}} -- cgit v1.2.3 From d13b0cb0b72a9cef9f8e9bd82616899796a4853f Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 17 Sep 2021 14:51:01 +0200 Subject: Use [#'taoensso.timbre/*context*] as default :capture-bindings if present --- README.md | 10 +++++++--- deps.edn | 3 ++- src/com/github/ivarref/yoltq.clj | 7 ++++--- test/com/github/ivarref/yoltq/log_init.clj | 3 +++ test/com/github/ivarref/yoltq/virtual_test.clj | 24 ++++++++++++++++++------ 5 files changed, 34 insertions(+), 13 deletions(-) (limited to 'deps.edn') diff --git a/README.md b/README.md index 514c4a3..902be2f 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,7 @@ Inspecting `(yq/put :q {:work 123})]` you will see something like this: :queue-name :q, ; Destination queue :status :init, ; Status :payload "{:work 123}", ; Payload persisted to the database with pr-str - :bindings "{}", + :bindings "{}", ; Bindings that will be applied before executing consumer function :lock #uuid"037d7da1-5158-4243-8f72-feb1e47e15ca", ; Lock to protect from multiple consumers :tries 0, ; How many times the job has been executed :init-time 4305758012289 ; Time of initialization (System/nanoTime) @@ -153,7 +153,7 @@ The `payload` will be deserialized from the database using `clojure.edn/read-str you will get back what you put into `yq/put`. The yoltq system treats a queue consumer function invocation as successful if it does not throw an exception. -Thus any regular return value, be it `nil`, `false`, `true`, etc. is considered a success. +Any return value, be it `nil`, `false`, `true`, etc. is considered a success. ### Listening for queue jobs @@ -166,7 +166,7 @@ and process newly created queue jobs fairly quickly. This also means that queue jobs in status `:init` will almost always be processed without any type of backoff*. -This pool also schedules polling jobs that will regularly check for various statuses: +The threadpool also schedules polling jobs that will check for various statuses regularly: * Jobs in status `:error` that have waited for at least `:error-backoff-time` (default: 5 seconds) will be retried. * Jobs that have been in `:processing` for at least `:hung-backoff-time` (default: 30 minutes) will be considered hung and retried. @@ -212,6 +212,10 @@ A queue job will remain in status `:error` once `:max-retries` (default: 100) ha Ideally this will not happen. +### Logging + + + ### Total health and system sanity diff --git a/deps.edn b/deps.edn index cf8297c..a457628 100644 --- a/deps.edn +++ b/deps.edn @@ -3,7 +3,8 @@ :paths ["src"] - :aliases {:test {:extra-paths ["test"] + :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} + :test {:extra-paths ["test"] :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} com.taoensso/timbre {:mvn/version "5.1.2"} com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 6341e41..58efca1 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -1,6 +1,5 @@ (ns com.github.ivarref.yoltq - (:require [datomic-schema.core] - [datomic.api :as d] + (:require [datomic.api :as d] [clojure.tools.logging :as log] [com.github.ivarref.yoltq.impl :as i] [com.github.ivarref.yoltq.report-queue :as rq] @@ -58,7 +57,9 @@ ; contain the stacktrace of the stuck threads. :pool-size 4 - :capture-bindings [] + :capture-bindings (if-let [s (resolve (symbol "taoensso.timbre/*context*"))] + [s] + []) ; How often should the system be polled for failed queue jobs :system-error-poll-delay (Duration/ofMinutes 1) diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj index cf69e55..1aa6c02 100644 --- a/test/com/github/ivarref/yoltq/log_init.clj +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -39,6 +39,9 @@ " " (color-f (min-length 5 (str/upper-case (name level)))) " " + + (when-let [x-req-id (:x-request-id context)] + (str "[" x-req-id "] ")) #_(.getName ^Thread (Thread/currentThread)) (color-f (force msg_)) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index e2ea19b..3c7c5b4 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -9,8 +9,7 @@ [clojure.tools.logging :as log] [com.github.ivarref.yoltq.impl :as i] [com.github.ivarref.yoltq :as yq] - [clojure.pprint :as pprint] - [clojure.edn :as edn])) + [taoensso.timbre :as timbre])) (use-fixtures :each vq/call-with-virtual-queue!) @@ -254,11 +253,24 @@ (deftest binding-test (let [conn (u/empty-conn)] (dq/init! {:conn conn - :capture-bindings [#'*some-binding*]}) + :capture-bindings [#'*some-binding* #'timbre/*context*]}) (dq/add-consumer! :q (fn [_] *some-binding*)) - (binding [*some-binding* 1] @(d/transact conn [(dq/put :q nil)])) - (binding [*some-binding* 2] @(d/transact conn [(dq/put :q nil)])) - @(d/transact conn [(dq/put :q nil)]) + (binding [timbre/*context* {:x-request-id "wooho"}] + (binding [*some-binding* 1] + @(d/transact conn [(dq/put :q nil)])) + (binding [*some-binding* 2] + @(d/transact conn [(dq/put :q nil)])) + @(d/transact conn [(dq/put :q nil)])) + (is (= 1 (vq/consume-expect! :q :done))) (is (= 2 (vq/consume-expect! :q :done))) (is (nil? (vq/consume-expect! :q :done))))) + + +(deftest default-binding-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*))) + (binding [timbre/*context* {:x-request-id "123"}] + @(d/transact conn [(dq/put :q nil)])) + (is (= "123" (vq/consume-expect! :q :done))))) -- cgit v1.2.3 From b28837ea804fbc6abd14fae23a92933b9406d5e1 Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sun, 27 Mar 2022 13:49:50 +0200 Subject: Add healthy?, queue-stats functions and default functions for :on-system-error and :on-system-recovery --- README.md | 30 +++++++++++++++++++- deps.edn | 8 ++++-- pom.xml | 4 +-- release.sh | 4 +-- src/com/github/ivarref/yoltq.clj | 32 ++++++++++++++++++++-- src/com/github/ivarref/yoltq/error_poller.clj | 19 +++++++++---- .../com/github/ivarref/yoltq/error_poller_test.clj | 2 +- 7 files changed, 80 insertions(+), 19 deletions(-) (limited to 'deps.edn') diff --git a/README.md b/README.md index 7e49431..f62d46c 100644 --- a/README.md +++ b/README.md @@ -331,6 +331,34 @@ These dynamic bindings will be in place when yoltq logs errors, warnings etc. about failing consumer functions, possibly making troubleshooting easier. +## Change log + +### 2022-03-27 v0.2.41 +``` + Added function `healthy?` that returns: + true if no errors + false if one or more errors + nil if error-poller is yet to be executed. + + Added default functions for `:on-system-error` and `:on-system-recovery` + that simply logs that the system is in error (ERROR level) or has + recovered (INFO level). + + Added function `queue-stats` that returns a nicely "formatted" + vector of queue stats, for example: + (queue-stats) + => + [{:qname :add-message-thread, :status :done, :count 10274} + {:qname :add-message-thread, :status :init, :count 30} + {:qname :add-message-thread, :status :processing, :count 1} + {:qname :send-message, :status :done, :count 21106} + {:qname :send-message, :status :init, :count 56}] +``` + +### 2021-09-27 v0.2.39: ? +### 2021-09-27 v0.2.37: ? + +### 2021-09-24 v0.2.33: First publicly announced release. ## License @@ -345,4 +373,4 @@ Licenses when the conditions for such availability set forth in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version, with the GNU Classpath Exception which is available -at https://www.gnu.org/software/classpath/license.html. \ No newline at end of file +at https://www.gnu.org/software/classpath/license.html. diff --git a/deps.edn b/deps.edn index a457628..d0f0a26 100644 --- a/deps.edn +++ b/deps.edn @@ -22,8 +22,10 @@ :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} - :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.1.3"}} - :main-opts ["-m" "deps-deploy.deps-deploy" "deploy" - "target/out.jar" "true"]}} + :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} + :exec-fn deps-deploy.deps-deploy/deploy + :exec-args {:installer :remote + :sign-releases? false + :artifact "target/out.jar"}}} :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}} diff --git a/pom.xml b/pom.xml index 9784836..e486fb1 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ jar com.github.ivarref yoltq - 0.2.39 + 0.2.40 yoltq @@ -30,7 +30,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.39 + v0.2.40 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/release.sh b/release.sh index 70f67b5..dec59a2 100755 --- a/release.sh +++ b/release.sh @@ -13,8 +13,6 @@ git commit -m "Release $VERSION" git tag -a v$VERSION -m "Release v$VERSION" git push --follow-tags -clojure -M:deploy +clojure -X:deploy echo "Released $VERSION" - -rm *.pom.asc \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 3164020..03a364f 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -17,7 +17,6 @@ (defonce ^:dynamic *running?* (atom false)) (defonce ^:dynamic *test-mode* false) - (def default-opts (-> {; Default number of times a queue job will be retried before giving up ; Can be overridden on a per consumer basis with @@ -79,7 +78,8 @@ (-> (merge-with (fn [a b] (or b a)) {:running-queues (atom #{}) :start-execute-time (atom {}) - :system-error (atom {})} + :system-error (atom {}) + :healthy? (atom nil)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) cfg) @@ -148,6 +148,32 @@ (reset! threadpool nil)))))) +(defn healthy? [] + (some->> @*config* + :healthy? + (deref))) + +(defn queue-stats [] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?status + :in $ + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status]] + db) + (mapv (partial zipmap [:e :qname :status])) + (mapv #(select-keys % [:qname :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname status]} v]] + (array-map + :qname qname + :status status + :count v))) + (sort-by (juxt :qname :status)) + (vec)))) + (comment (do (require 'com.github.ivarref.yoltq.log-init) @@ -177,4 +203,4 @@ (start!) (dotimes [x n] @(d/transact conn [(put :q {:work 123})])) - nil)))) \ No newline at end of file + nil)))) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index 77339f7..1268482 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -66,9 +66,13 @@ (defn do-poll-errors [{:keys [conn system-error on-system-error - on-system-recovery] - :or {on-system-error (fn [] nil) - on-system-recovery (fn [] nil)} + on-system-recovery + healthy?] + :or {on-system-error (fn [] + (log/error "There are yoltq queues which have errors") + nil) + on-system-recovery (fn [] + (log/info "Yoltq recovered"))} :as config}] (assert (some? conn) "expected :conn to be present") (assert (some? system-error) "expected :system-error to be present") @@ -79,8 +83,11 @@ (d/db conn) u/status-error) 0)] - (when (pos-int? error-count) - (log/debug "poll-errors found" error-count "errors in system")) + (if (pos-int? error-count) + (do + (log/debug "poll-errors found" error-count "errors in system") + (reset! healthy? false)) + (reset! healthy? true)) (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] (when run-callback (cond (= run-callback :error) @@ -100,7 +107,7 @@ (when @running? (do-poll-errors @config-atom)) (catch Throwable t - (log/error t "unexpected error in poll-erros:" (ex-message t)) + (log/error t "unexpected error in poll-errors:" (ex-message t)) nil))) diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj index 2e0873e..18f0aa7 100644 --- a/test/com/github/ivarref/yoltq/error_poller_test.clj +++ b/test/com/github/ivarref/yoltq/error_poller_test.clj @@ -1,5 +1,5 @@ (ns com.github.ivarref.yoltq.error-poller-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [deftest is]] [com.github.ivarref.yoltq.error-poller :as ep] [clojure.tools.logging :as log] [com.github.ivarref.yoltq.log-init :as logconfig] -- cgit v1.2.3 From 6c26a3b6871286510bb8e9770ee7f7e3abf97abe Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Sun, 27 Mar 2022 18:39:44 +0200 Subject: Start use current millis in the database, not nano offset --- .gitignore | 3 +- README.md | 38 ++++++++++++++-------- deps.edn | 2 +- src/com/github/ivarref/yoltq.clj | 8 ++--- src/com/github/ivarref/yoltq/error_poller.clj | 10 +++--- src/com/github/ivarref/yoltq/ext_sys.clj | 13 ++++---- src/com/github/ivarref/yoltq/impl.clj | 14 ++++---- src/com/github/ivarref/yoltq/poller.clj | 19 +++++++---- .../ivarref/yoltq/slow_executor_detector.clj | 4 +-- src/com/github/ivarref/yoltq/test_queue.clj | 2 +- src/com/github/ivarref/yoltq/utils.clj | 19 ++++++----- test/com/github/ivarref/yoltq/test_utils.clj | 9 ++--- 12 files changed, 81 insertions(+), 60 deletions(-) (limited to 'deps.edn') diff --git a/.gitignore b/.gitignore index cb9a7ca..c82fdd7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ tree.txt .stage-url.txt *.pom.asc *.pom -temp/ \ No newline at end of file +temp/ +.clj-kondo/ diff --git a/README.md b/README.md index 45ba8c4..9c5669c 100644 --- a/README.md +++ b/README.md @@ -333,12 +333,21 @@ easier. ## Change log -### 2022-03-27 [v0.2.41](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) +### 20..-..-.. vHEAD [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...HEAD) +* Critical bugfix. +``` +Started using (System/currentTimeMillis) and not (System/nanoTime) +when storing time in the database. +``` + +* Bump Clojure to `1.11.0`. + +### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) * Added function `healthy?` that returns: ``` - true if no errors - false if one or more errors - nil if error-poller is yet to be executed. + true if no errors + false if one or more errors + nil if error-poller is yet to be executed. ``` * Added default functions for `:on-system-error` and `:on-system-recovery` @@ -348,22 +357,23 @@ easier. * Added function `queue-stats` that returns a nicely "formatted" vector of queue stats, for example: ``` -(queue-stats) -=> -[{:qname :add-message-thread, :status :done, :count 10274} - {:qname :add-message-thread, :status :init, :count 30} - {:qname :add-message-thread, :status :processing, :count 1} - {:qname :send-message, :status :done, :count 21106} - {:qname :send-message, :status :init, :count 56}] + (queue-stats) + => + [{:qname :add-message-thread, :status :done, :count 10274} + {:qname :add-message-thread, :status :init, :count 30} + {:qname :add-message-thread, :status :processing, :count 1} + {:qname :send-message, :status :done, :count 21106} + {:qname :send-message, :status :init, :count 56}] ``` -### 2021-09-27 [v0.2.39](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) +### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) Added `:valid-payload?` option for queue consumers. -### 2021-09-27 [v0.2.37](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) +### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) Improved error reporting. -### 2021-09-24 v0.2.33: First publicly announced release. +### 2021-09-24 v0.2.33 +First publicly announced release. ## License diff --git a/deps.edn b/deps.edn index d0f0a26..8e769e1 100644 --- a/deps.edn +++ b/deps.edn @@ -1,5 +1,5 @@ {:deps {org.clojure/tools.logging {:mvn/version "1.1.0"} - org.clojure/clojure {:mvn/version "1.10.3"}} + org.clojure/clojure {:mvn/version "1.11.0"}} :paths ["src"] diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 03a364f..17aa40a 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -19,7 +19,7 @@ (def default-opts (-> {; Default number of times a queue job will be retried before giving up - ; Can be overridden on a per consumer basis with + ; Can be overridden on a per-consumer basis with ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200}) :max-retries 100 @@ -34,7 +34,7 @@ :hung-backoff-time (Duration/ofMinutes 30) ; Most queue jobs in init state will be consumed by the tx-report-queue listener. - ; However in the case where a init job was added right before the application + ; However, in the case where an init job was added right before the application ; was shut down and did not have time to be processed by the tx-report-queue listener, ; it will be consumer by the init poller. This init poller backs off by ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could @@ -66,7 +66,7 @@ ; How often should the system invoke :system-error-callback-backoff (Duration/ofHours 1)} - u/duration->nanos)) + u/duration->millis)) (defn init! [{:keys [conn] :as cfg}] @@ -83,7 +83,7 @@ default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) cfg) - u/duration->nanos)))] + u/duration->millis)))] new-cfg))) diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj index 1268482..ee6359e 100644 --- a/src/com/github/ivarref/yoltq/error_poller.clj +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -22,7 +22,7 @@ state :recovery}} {:keys [system-error-min-count system-error-callback-backoff] :or {system-error-min-count 3}} - now-ns + now-ms error-count] (let [new-errors (->> (conj errors error-count) (take-last system-error-min-count) @@ -50,14 +50,14 @@ (when (and (= old-state :recovery) (= new-state :error)) {:run-callback :error - :last-notify now-ns}) + :last-notify now-ms}) (when (and (= new-state :error) (= old-state :error) - (> now-ns + (> now-ms (+ last-notify system-error-callback-backoff))) {:run-callback :error - :last-notify now-ns}) + :last-notify now-ms}) (when (and (= new-state :recovery) (= old-state :error)) @@ -88,7 +88,7 @@ (log/debug "poll-errors found" error-count "errors in system") (reset! healthy? false)) (reset! healthy? true)) - (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)] (when run-callback (cond (= run-callback :error) (on-system-error) diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj index 3480475..692b934 100644 --- a/src/com/github/ivarref/yoltq/ext_sys.clj +++ b/src/com/github/ivarref/yoltq/ext_sys.clj @@ -1,17 +1,18 @@ (ns com.github.ivarref.yoltq.ext-sys (:require [datomic.api :as d]) + (:refer-clojure :exclude [random-uuid]) (:import (java.util UUID))) -(def ^:dynamic *now-ns-atom* nil) +(def ^:dynamic *now-ms-atom* nil) (def ^:dynamic *squuid-atom* nil) (def ^:dynamic *random-atom* nil) -(defn now-ns [] - (if *now-ns-atom* - @*now-ns-atom* - (System/nanoTime))) +(defn now-ms [] + (if *now-ms-atom* + @*now-ms-atom* + (System/currentTimeMillis))) (defn squuid [] @@ -23,4 +24,4 @@ (defn random-uuid [] (if *random-atom* (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc)))) - (UUID/randomUUID))) \ No newline at end of file + (UUID/randomUUID))) diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index 8b75fc3..b4eef8d 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -63,7 +63,7 @@ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) :com.github.ivarref.yoltq/lock (u/random-uuid) :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ns)} + :com.github.ivarref.yoltq/init-time (u/now-ms)} (when-let [[q ext-id] (:depends-on opts)] (when-not (d/q '[:find ?e . :in $ ?ext-id @@ -138,8 +138,8 @@ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status] (if (= new-status u/status-done) - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)} - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)} + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})] start-time (System/nanoTime) {:keys [db-after]} @(d/transact conn tx)] (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) @@ -171,7 +171,7 @@ (log/debug "queue item" (str id) "for queue" queue-name "is now processing") (let [{:keys [retval exception]} (try - (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name]) + (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) (let [v (f payload)] {:retval v}) (catch Throwable t @@ -188,7 +188,7 @@ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :retval retval :success? true :allow-cas-failure? true))) (some? exception) @@ -198,14 +198,14 @@ (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :exception exception))) :else (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) - (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) (assoc q-item :retval retval :success? true)))))) (do (log/error "no handler for queue" queue-name) diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj index 28b158f..9cf81c7 100644 --- a/src/com/github/ivarref/yoltq/poller.clj +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -25,17 +25,16 @@ (if-not (contains? old q) (try (log/debug "polling queue" queue-name "for status" status) - (let [start-time (u/now-ns) + (let [start-time (u/now-ms) last-res (loop [prev-res nil] (when @running? (let [res (poll-once! cfg queue-name status)] + (log/debug "poll-once! returned" res) (if (and res (:success? res)) (recur res) prev-res))))] - (let [spent-ns (- (u/now-ns) start-time)] - (log/trace "done polling queue" q "in" - (format "%.1f" (double (/ spent-ns 1e6))) - "ms")) + (let [spent-ms (- (u/now-ms) start-time)] + (log/trace "done polling queue" q "in" spent-ms "ms")) last-res) (finally (swap! running-queues disj q))) @@ -44,6 +43,14 @@ (log/error t "poll-queue! crashed:" (ex-message t))) (finally))) +(comment + (def cfg @com.github.ivarref.yoltq/*config*)) + +(comment + (poll-queue! + (atom true) + @com.github.ivarref.yoltq/*config* + [:add-message-thread :init])) (defn poll-all-queues! [running? config-atom pool] (try @@ -54,4 +61,4 @@ [q-name status])))] (.execute pool (fn [] (poll-queue! running? @config-atom q)))))) (catch Throwable t - (log/error t "poll-all-queues! crashed:" (ex-message t))))) \ No newline at end of file + (log/error t "poll-all-queues! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj index f15ef7d..80d3718 100644 --- a/src/com/github/ivarref/yoltq/slow_executor_detector.clj +++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj @@ -7,7 +7,7 @@ (defn- do-show-slow-threads [{:keys [start-execute-time max-execute-time]}] (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time] - (when (> (ext/now-ns) (+ start-time max-execute-time)) + (when (> (ext/now-ms) (+ start-time max-execute-time)) (log/error "thread" (.getName thread) "spent too much time on" "queue item" (str queue-id) "for queue" queue-name @@ -25,4 +25,4 @@ (dotimes [_ 3] (when @running? (Thread/sleep 1000)))) (catch Throwable t - (log/error t "reap! crashed:" (ex-message t))))) \ No newline at end of file + (log/error t "reap! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj index 6183216..ee9cd54 100644 --- a/src/com/github/ivarref/yoltq/test_queue.clj +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -47,7 +47,7 @@ (with-bindings {#'yq/*config* config# #'yq/*running?* (atom false) #'yq/*test-mode* true - #'ext/*now-ns-atom* (atom 0) + #'ext/*now-ms-atom* (atom 0) #'ext/*random-atom* (atom 0) #'ext/*squuid-atom* (atom 0)} (try diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index d551510..ad2444a 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -3,6 +3,7 @@ [clojure.edn :as edn] [com.github.ivarref.yoltq.ext-sys :as ext] [clojure.tools.logging :as log]) + (:refer-clojure :exclude [random-uuid]) (:import (datomic Connection) (java.time Duration))) @@ -13,10 +14,10 @@ (def status-error :error) -(defn duration->nanos [m] +(defn duration->millis [m] (reduce-kv (fn [o k v] (if (instance? Duration v) - (assoc o k (.toNanos v)) + (assoc o k (.toMillis v)) (assoc o k v))) {} m)) @@ -30,8 +31,8 @@ (ext/random-uuid)) -(defn now-ns [] - (ext/now-ns)) +(defn now-ms [] + (ext/now-ms)) (defn root-cause [e] @@ -75,7 +76,7 @@ :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] - {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]})) + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]})) (defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name] @@ -94,11 +95,11 @@ [?e :com.github.ivarref.yoltq/lock ?lock]] db queue-name - (- (now-ns) init-backoff-time)) + (- (now-ms) init-backoff-time)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] (prepare-processing db id queue-name old-lock :init)) - (log/trace "no new-items in :init status for queue" queue-name)))) + (log/debug "no new-items in :init status for queue" queue-name)))) (defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name] @@ -120,7 +121,7 @@ [?e :com.github.ivarref.yoltq/lock ?lock]] db queue-name - (- (now-ns) error-backoff-time) + (- (now-ms) error-backoff-time) (inc max-retries)) (not-empty))] (let [[id old-lock] (rand-nth (into [] ids))] @@ -131,7 +132,7 @@ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " (str (if (nil? conn) "nil" conn)) "\nConfig was: " (str cfg))) - (let [now (or now (now-ns)) + (let [now (or now (now-ms)) max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries) db (or db (d/db conn))] (when-let [ids (->> (d/q '[:find ?id ?lock ?tries diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj index 5427ff5..e4151c2 100644 --- a/test/com/github/ivarref/yoltq/test_utils.clj +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -8,7 +8,8 @@ [com.github.ivarref.yoltq.impl :as i] [clojure.edn :as edn] [com.github.ivarref.yoltq.ext-sys :as ext]) - (:import (java.util UUID))) + (:import (java.util UUID) + (java.time Duration))) (logconfig/init-logging! @@ -39,10 +40,10 @@ (defn advance! [tp] - (assert (some? ext/*now-ns-atom*) "Expected to be running in test-mode!") - (swap! ext/*now-ns-atom* + (if (number? tp) + (assert (some? ext/*now-ms-atom*) "Expected to be running in test-mode!") + (swap! ext/*now-ms-atom* + (if (number? tp) tp - (.toNanos tp)))) + (.toMillis ^Duration tp)))) (defn done-count [] -- cgit v1.2.3 From 7cf016c691fc08c81138fc592a7657087151c3ca Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Wed, 22 Jun 2022 10:26:16 +0200 Subject: Release 0.2.56 Fix line break issue? Added support for `:yoltq/queue-id` metadata on functions --- README.md | 37 +++++++++++++++++++------- deps.edn | 8 +++--- pom.xml | 8 +++--- release.sh | 6 +++-- src/com/github/ivarref/yoltq.clj | 32 +++++++++++++++------- test/com/github/ivarref/yoltq/virtual_test.clj | 33 +++++++++++++++++------ 6 files changed, 87 insertions(+), 37 deletions(-) (limited to 'deps.edn') diff --git a/README.md b/README.md index e5b2059..8ead585 100644 --- a/README.md +++ b/README.md @@ -333,22 +333,41 @@ easier. ## Change log -### 2022-03-29 v0.2.55 [diff](https://github.com/ivarref/yoltq/compare/v0.2.54...v0.2.55) +#### 2022-06-22 v0.2.56 [diff](https://github.com/ivarref/yoltq/compare/v0.2.55...v0.2.56) +Added support for `:yoltq/queue-id` metadata on functions. I.e. it's possible to write +the following: +```clojure +(defn my-consumer + {:yoltq/queue-id :some-queue} + [payload] + :work-work-work) + +(yq/add-consumer! #'my-consumer ; <-- will resolve to :some-queue + my-consumer) + +@(d/transact conn [(yq/put #'my-consumer ; <-- will resolve to :some-queue + {:id "a"})]) +``` + +The idea here is that it is simpler to jump to var definitions than going via keywords, +which essentially refers to a var/function anyway. + +#### 2022-03-29 v0.2.55 [diff](https://github.com/ivarref/yoltq/compare/v0.2.54...v0.2.55) Added: `unhealthy?` function which returns `true` if there are queues in error, or `false` otherwise. -### 2022-03-28 v0.2.54 [diff](https://github.com/ivarref/yoltq/compare/v0.2.51...v0.2.54) +#### 2022-03-28 v0.2.54 [diff](https://github.com/ivarref/yoltq/compare/v0.2.51...v0.2.54) Fixed: Schedules should now be using milliseconds and not nanoseconds. -### 2022-03-28 v0.2.51 [diff](https://github.com/ivarref/yoltq/compare/v0.2.48...v0.2.51) +#### 2022-03-28 v0.2.51 [diff](https://github.com/ivarref/yoltq/compare/v0.2.48...v0.2.51) * Don't OOM on migrating large amounts of data. * Respect `:auto-migrate? false`. -### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48) +#### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48) * Auto migration is done in the background. * Only poll for current version of jobs, thus no races for auto migration. -### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46) +#### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46) * Critical bugfix that in some cases can lead to stalled jobs. ``` Started using (System/currentTimeMillis) and not (System/nanoTime) @@ -357,7 +376,7 @@ when storing time in the database. * Bump Clojure to `1.11.0`. -### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) +#### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) * Added function `healthy?` that returns: ``` true if no errors @@ -381,13 +400,13 @@ when storing time in the database. {:qname :send-message, :status :init, :count 56}] ``` -### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) +#### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) Added `:valid-payload?` option for queue consumers. -### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) +#### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) Improved error reporting. -### 2021-09-24 v0.2.33 +#### 2021-09-24 v0.2.33 First publicly announced release. ## License diff --git a/deps.edn b/deps.edn index 8e769e1..6923881 100644 --- a/deps.edn +++ b/deps.edn @@ -1,12 +1,12 @@ -{:deps {org.clojure/tools.logging {:mvn/version "1.1.0"} - org.clojure/clojure {:mvn/version "1.11.0"}} +{:deps {org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"}} :paths ["src"] - :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} + :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} :test {:extra-paths ["test"] :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.1.2"} + com.taoensso/timbre {:mvn/version "5.2.1"} com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} diff --git a/pom.xml b/pom.xml index 9f591b9..c45ccd9 100644 --- a/pom.xml +++ b/pom.xml @@ -4,18 +4,18 @@ jar com.github.ivarref yoltq - 0.2.55 + 0.2.56 yoltq org.clojure clojure - 1.11.0 + 1.11.1 org.clojure tools.logging - 1.1.0 + 1.2.4 @@ -30,7 +30,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.55 + v0.2.56 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/release.sh b/release.sh index cf0f09f..d27d125 100755 --- a/release.sh +++ b/release.sh @@ -23,9 +23,11 @@ sed -i "s/HEAD/v$VERSION/g" ./README.md git add pom.xml README.md git commit -m "Release $VERSION" git reset --soft HEAD~2 -git commit -m"Release $VERSION\n$MSG" +git commit -m"Release $VERSION +$MSG" -git tag -a v"$VERSION" -m "Release v$VERSION\n$MSG" +git tag -a v"$VERSION" -m "Release v$VERSION +$MSG" git push --follow-tags --force clojure -X:deploy diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index bb7a43e..ba27d2c 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -1,16 +1,16 @@ (ns com.github.ivarref.yoltq - (:require [datomic.api :as d] - [clojure.tools.logging :as log] + (:require [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.error-poller :as errpoller] [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.migrate :as migrate] [com.github.ivarref.yoltq.poller :as poller] - [com.github.ivarref.yoltq.error-poller :as errpoller] + [com.github.ivarref.yoltq.report-queue :as rq] [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] - [com.github.ivarref.yoltq.migrate :as migrate] - [com.github.ivarref.yoltq.utils :as u]) + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) (:import (datomic Connection) - (java.util.concurrent Executors TimeUnit ExecutorService) - (java.time Duration))) + (java.time Duration) + (java.util.concurrent ExecutorService Executors TimeUnit))) (defonce ^:dynamic *config* (atom nil)) @@ -92,11 +92,23 @@ new-cfg))) +(defn get-queue-id + [queue-id-or-var] + (cond (and (var? queue-id-or-var) + (keyword? (:yoltq/queue-id (meta queue-id-or-var)))) + (:yoltq/queue-id (meta queue-id-or-var)) + + (keyword? queue-id-or-var) + queue-id-or-var + + :else + (throw (ex-info (str "Could not get queue-id for " queue-id-or-var) {:queue-id queue-id-or-var})))) + (defn add-consumer! ([queue-id f] (add-consumer! queue-id f {})) ([queue-id f opts] - (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f})))))) + (swap! *config* (fn [old-config] (assoc-in old-config [:handlers (get-queue-id queue-id)] (merge opts {:f f})))))) (defn put @@ -105,7 +117,7 @@ (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] (when (and *test-mode* bootstrap-poller!) (bootstrap-poller! conn)) - (i/put cfg queue-id payload opts)))) + (i/put cfg (get-queue-id queue-id) payload opts)))) (defn- do-start! [] diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 34c9026..e077517 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -1,15 +1,15 @@ (ns com.github.ivarref.yoltq.virtual-test - (:require [datomic-schema.core] - [clojure.test :refer [use-fixtures deftest is] :refer-macros [thrown?]] + (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] [com.github.ivarref.yoltq.test-queue :as tq] [com.github.ivarref.yoltq.test-utils :as u] - [datomic.api :as d] [com.github.ivarref.yoltq.utils :as uu] - [clojure.tools.logging :as log] - [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq :as yq] - [taoensso.timbre :as timbre] - [com.github.ivarref.yoltq.migrate :as migrate])) + [datomic-schema.core] + [datomic.api :as d] + [taoensso.timbre :as timbre])) (use-fixtures :each tq/call-with-virtual-queue!) @@ -350,3 +350,20 @@ @(d/transact conn [(yq/put :q {:id "a"})]) (timbre/with-level :fatal (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + + +(defn my-consumer + {:yoltq/queue-id :some-q} + [state payload] + (swap! state conj payload)) + +(deftest queue-id-can-be-var + (let [conn (u/empty-conn) + received (atom #{})] + (yq/init! {:conn conn}) + (yq/add-consumer! #'my-consumer (partial my-consumer received)) + @(d/transact conn [(yq/put #'my-consumer {:id "a"})]) + (tq/consume! :some-q) + (is (= #{{:id "a"}} @received)) + #_(timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) -- cgit v1.2.3 From 8f945d8c0189ad73d862c988faa511e0a7b017df Mon Sep 17 00:00:00 2001 From: Ivar Refsdal Date: Fri, 18 Nov 2022 14:12:50 +0100 Subject: Release 0.2.63: Add support for :encode and :decode function. Add :partition-fn. Fixes #1 --- README.md | 65 ++++++++++++++++- deps.edn | 6 +- pom.xml | 9 ++- src/com/github/ivarref/yoltq/impl.clj | 95 +++++++++++++++---------- src/com/github/ivarref/yoltq/utils.clj | 1 - test/com/github/ivarref/yoltq/virtual_test.clj | 98 ++++++++++++++++++++++---- 6 files changed, 218 insertions(+), 56 deletions(-) (limited to 'deps.edn') diff --git a/README.md b/README.md index 63b9ad3..a914cc7 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,8 @@ Inspecting `(yq/put :q {:work 123})]` you will see something like this: This is the queue job as it will be stored into the database. You can see that the payload, i.e. the second argument of `yq/put`, -is persisted into the database. Thus the payload must be `pr-str`-able. +is persisted into the database. Thus the payload must be `pr-str`-able (unless you have specified +custom `:encode` and `:decode` functions that override this). A queue job will initially have status `:init`. @@ -220,6 +221,47 @@ is shut down abruptly during processing of queue jobs. A queue job will remain in status `:error` once `:max-retries` (default: 100) have been reached. Ideally this will not happen. ¯\\\_(ツ)\_/¯ +### Custom encoding and decoding + +Yoltq will use `pr-str` and `clojure.edn/read-string` by default to encode and decode data. +You may specify `:encode` and `:decode` either globally or per queue to override this behaviour. +The `:encode` function must return a byte array or a string. + +For example if you want to use [nippy](https://github.com/ptaoussanis/nippy): +```clojure +(require '[taoensso.nippy :as nippy]) + +; Globally for all queues: +(yq/init! + {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + +; Or per queue: +(yq/add-consumer! + :q ; Queue to consume + (fn [payload] (println "got payload:" payload)) ; Queue consumer function + {:encode nippy/freeze + :decode nippy/thaw}) ; Queue options, here with :encode and :decode +``` + +### Partitions + +Yoltq supports specifying which [partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions) +queue entities should belong to. +The default function is: +```clojure +(defn default-partition-fn [_queue-name] + (keyword "yoltq" (str "queue_" (.getValue (java.time.Year/now))))) +``` +This is to say that there will be a single partition per year for yoltq. +Yoltq will take care of creating the partition if it does not exist. + +You may override this function, either globally or per queue, with the keyword `:partition-fn`. +E.g.: +```clojure +(yq/init! {:conn conn :partition-fn (fn [_queue-name] :my-partition)}) +``` ### All configuration options @@ -376,8 +418,29 @@ For Redis there is [carmine](https://github.com/ptaoussanis/carmine). Note: I have not tried these libraries myself. +## Other stuff + +If you liked this library, you may also like: + +* [conformity](https://github.com/avescodes/conformity): A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, data, or otherwise. +* [datomic-schema](https://github.com/ivarref/datomic-schema): Simplified writing of Datomic schemas (works with conformity). +* [double-trouble](https://github.com/ivarref/double-trouble): Handle duplicate Datomic transactions with ease. +* [gen-fn](https://github.com/ivarref/gen-fn): Generate Datomic function literals from regular Clojure namespaces. +* [rewriting-history](https://github.com/ivarref/rewriting-history): A library to rewrite Datomic history. + ## Change log +#### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63) +Added custom `:encode` and `:decode` support. + +Added support for specifying `:partifion-fn` to specify which partition a queue item should belong to. +It defaults to: +```clojure +(defn default-partition-fn [_queue-name] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) +``` +Yoltq takes care of creating the partition if it does not exist. + #### 2022-11-15 v0.2.62 [diff](https://github.com/ivarref/yoltq/compare/v0.2.61...v0.2.62) Added function `processing-time-stats`: diff --git a/deps.edn b/deps.edn index 6923881..e36885e 100644 --- a/deps.edn +++ b/deps.edn @@ -1,5 +1,6 @@ -{:deps {org.clojure/tools.logging {:mvn/version "1.2.4"} - org.clojure/clojure {:mvn/version "1.11.1"}} +{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"}} :paths ["src"] @@ -11,6 +12,7 @@ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} :jvm-opts ["-DDISABLE_SPY=true" "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] diff --git a/pom.xml b/pom.xml index 2c11984..463899d 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ jar com.github.ivarref yoltq - 0.2.62 + 0.2.63 yoltq @@ -12,6 +12,11 @@ clojure 1.11.1 + + com.github.ivarref + double-trouble + 0.1.102 + org.clojure tools.logging @@ -30,7 +35,7 @@ scm:git:git://github.com/ivarref/yoltq.git scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.62 + v0.2.63 https://github.com/ivarref/yoltq \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index c37b0e6..ac573d1 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -1,11 +1,12 @@ (ns com.github.ivarref.yoltq.impl - (:require [datomic.api :as d] - [clojure.tools.logging :as log] + (:require [clojure.edn :as edn] [clojure.string :as str] - [com.github.ivarref.yoltq.utils :as u] + [clojure.tools.logging :as log] + [com.github.ivarref.double-trouble :as dt] [com.github.ivarref.yoltq.ext-sys :as ext] - [clojure.edn :as edn])) - + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) + (:import (java.time Year))) (def schema [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} @@ -13,6 +14,7 @@ #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} @@ -41,13 +43,22 @@ (log/error "could not read-string" what ":" (ex-message e)) (throw e)))) +(defn default-partition-fn [_queue-keyword] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) -(defn put [{:keys [capture-bindings conn] :as config} +(defn put [{:keys [capture-bindings conn encode partition-fn] + :or {partition-fn default-partition-fn + encode (partial pr-str-safe :payload)} + :as config} queue-name payload opts] (if-let [q-config (get-in config [:handlers queue-name])] (let [id (u/squuid) + encode (get q-config :encode encode) + partition-fn (get q-config :partition-fn partition-fn) + partition (partition-fn queue-name) + _ (assert (keyword? partition) "Partition must be a keyword") depends-on (get q-config :depends-on (fn [_] nil)) valid-payload? (get q-config :valid-payload? (fn [_] true)) opts (merge @@ -58,32 +69,41 @@ (assoc o (symbol k) (deref k))) {} (or capture-bindings [])) - (pr-str-safe :capture-bindings))] - (when-not (valid-payload? payload) - (log/error "Payload was not valid. Payload was:" payload) - (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + (pr-str-safe :capture-bindings)) + _ (when-not (valid-payload? payload) + (log/error "Payload was not valid. Payload was:" payload) + (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + encoded (encode payload) + _ (when (not (or (bytes? encoded) (string? encoded))) + (log/error "Payload must be encoded to either a string or a byte array") + (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))] (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) - (merge - {:com.github.ivarref.yoltq/id id - :com.github.ivarref.yoltq/queue-name queue-name - :com.github.ivarref.yoltq/status u/status-init - :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload) - :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) - :com.github.ivarref.yoltq/lock (u/random-uuid) - :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ms) - :com.github.ivarref.yoltq/version "2"} - (when-let [[q ext-id] (:depends-on opts)] - (when-not (d/q '[:find ?e . - :in $ ?ext-id - :where - [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] - (d/db conn) - (pr-str-safe :depends-on [q ext-id])) - (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) - (when-let [ext-id (:id opts)] - {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))) + (do + (dt/ensure-partition! conn partition) + (merge + (if (bytes? encoded) + {:com.github.ivarref.yoltq/payload-bytes encoded} + {:com.github.ivarref.yoltq/payload encoded}) + {:db/id (d/tempid partition) + :com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/bindings str-bindings + :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ms) + :com.github.ivarref.yoltq/version "2"} + (when-let [[q ext-id] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] + (d/db conn) + (pr-str-safe :depends-on [q ext-id])) + (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) + (when-let [ext-id (:id opts)] + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) @@ -169,20 +189,23 @@ "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) -(defn execute! [{:keys [handlers mark-status-fn! start-execute-time collect-spent-time!] - :or {mark-status-fn! mark-status!} +(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!] + :or {mark-status-fn! mark-status! + decode edn/read-string} :as cfg} - {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}] + {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}] (when queue-item (if (= :error status) (assoc queue-item :failed? true) (if-let [queue (get handlers queue-name)] - (let [{:keys [f allow-cas-failure?]} queue] + (let [{:keys [f allow-cas-failure?]} queue + decode (get queue :decode decode)] (log/debug "queue item" (str id) "for queue" queue-name "is now processing") (let [{:keys [retval exception]} (try (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) - (let [v (f payload)] + (let [payload (decode (or payload payload-bytes)) + v (f payload)] {:retval v}) (catch Throwable t {:exception t}) diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj index 39572a9..7665b6d 100644 --- a/src/com/github/ivarref/yoltq/utils.clj +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -57,7 +57,6 @@ (defn get-queue-item [db id] (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) (dissoc :db/id) - (update :com.github.ivarref.yoltq/payload edn/read-string) (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {}))) (update :com.github.ivarref.yoltq/bindings (fn [s] diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 996792e..2800c21 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -1,18 +1,21 @@ (ns com.github.ivarref.yoltq.virtual-test - (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] - [clojure.tools.logging :as log] - [com.github.ivarref.yoltq :as yq] - [com.github.ivarref.yoltq.error-poller :as error-poller] - [com.github.ivarref.yoltq.ext-sys :as ext] - [com.github.ivarref.yoltq.impl :as i] - [com.github.ivarref.yoltq.migrate :as migrate] - [com.github.ivarref.yoltq.test-queue :as tq] - [com.github.ivarref.yoltq.test-utils :as u] - [com.github.ivarref.yoltq.utils :as uu] - [datomic-schema.core] - [datomic.api :as d] - [taoensso.timbre :as timbre]) - (:import (java.time Duration))) + (:require + [clojure.string :as str] + [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.error-poller :as error-poller] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.test-queue :as tq] + [com.github.ivarref.yoltq.test-utils :as u] + [com.github.ivarref.yoltq.utils :as uu] + [datomic-schema.core] + [datomic.api :as d] + [taoensso.nippy :as nippy] + [taoensso.timbre :as timbre]) + (:import (java.time Duration LocalDateTime))) (use-fixtures :each tq/call-with-virtual-queue!) @@ -380,3 +383,70 @@ (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms)))) (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms))))) (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms))))))) + +(deftest global-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest queue-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:encode nippy/freeze + :decode nippy/thaw}) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest global-partition + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :partition-fn (fn [_queue-name] :my-part)}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest partition-per-queue + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:partition-fn (fn [_queue-name] :my-part)}) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest string-encode-decode + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :encode (fn [x] (str/join (reverse x))) + :decode (fn [x] (str/join (reverse x)))}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q "asdf")]) + (tq/consume! :q) + (is (= @got-work "asdf")))) -- cgit v1.2.3 From ae49a7ec82ecd3988e0f7825b0adead1dc77c911 Mon Sep 17 00:00:00 2001 From: ire Date: Tue, 13 May 2025 21:39:07 +0200 Subject: Fix tx-report-queue sharing #7 --- README.md | 34 +++++++ deps.edn | 54 +++++------ src/com/github/ivarref/yoltq.clj | 86 ++++++++++++++--- src/com/github/ivarref/yoltq/report_queue.clj | 133 ++++++++++++++++++++++++-- test/com/github/ivarref/yoltq/log_init.clj | 2 + 5 files changed, 258 insertions(+), 51 deletions(-) (limited to 'deps.edn') diff --git a/README.md b/README.md index c5f2bdb..f84a336 100644 --- a/README.md +++ b/README.md @@ -434,6 +434,40 @@ If you liked this library, you may also like: ## Change log +#### 2025-05-13 v0.2.?? [diff](https://github.com/ivarref/yoltq/compare/v0.2.64...HEAD) +Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will +then not grab the datomic report queue, but use the one provided: + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(yq/init! {:conn conn + :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq) + ; ^^ can be any `java.util.concurrent.BlockingQueue` value + }) + +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id)) + +``` + +Added multicast support for `datomic.api/tx-report-queue`: +```clojure +(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1)) +; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` + +(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +``` + +`yq/get-tx-report-queue-multicast!` returns, like +`datomic.api/tx-report-queue`, +`java.util.concurrent.BlockingQueue` and starts a background thread that does +the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!` +returns the same `BlockingQueue`. + +Changed the default for `max-retries` from `10000` to `9223372036854775807`. + +Fixed reflection warnings. + #### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64) Added support for `max-retries` being `0`, meaning the job should be retried forever (or at least 9223372036854775807 times). diff --git a/deps.edn b/deps.edn index e36885e..1e3fa9d 100644 --- a/deps.edn +++ b/deps.edn @@ -1,33 +1,31 @@ -{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} - org.clojure/tools.logging {:mvn/version "1.2.4"} - org.clojure/clojure {:mvn/version "1.11.1"}} +{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"} + com.datomic/peer {:mvn/version "1.0.7364"}} - :paths ["src"] + :paths ["src"] - :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}} - :test {:extra-paths ["test"] - :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.2.1"} - com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} - clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} - com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} - org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} - com.taoensso/nippy {:mvn/version "3.2.0"} - io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} - :jvm-opts ["-DDISABLE_SPY=true" - "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] - :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + :aliases {:test {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :exec-fn cognitect.test-runner.api/test + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} - :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" - :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} - :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" + :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} + :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} - :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} - :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} - :exec-fn deps-deploy.deps-deploy/deploy - :exec-args {:installer :remote - :sign-releases? false - :artifact "target/out.jar"}}} - - :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}} + :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} + :exec-fn deps-deploy.deps-deploy/deploy + :exec-args {:installer :remote + :sign-releases? false + :artifact "target/out.jar"}}}} \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index a7dcddf..32298b7 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -12,7 +12,7 @@ (:import (datomic Connection) (java.lang.management ManagementFactory) (java.time Duration Instant ZoneOffset ZonedDateTime) - (java.util.concurrent ExecutorService Executors ScheduledExecutorService TimeUnit))) + (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit))) (defonce ^:dynamic *config* (atom nil)) (defonce threadpool (atom nil)) @@ -26,7 +26,7 @@ ; If you want no limit on the number of retries, specify ; the value `0`. That will set the effective retry limit to ; 9223372036854775807 times. - :max-retries 10000 + :max-retries 9223372036854775807 ; Minimum amount of time to wait before a failed queue job is retried :error-backoff-time (Duration/ofSeconds 5) @@ -86,6 +86,9 @@ (defn init! [{:keys [conn tx-report-queue] :as cfg}] (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (when (some? tx-report-queue) + (assert (instance? BlockingQueue tx-report-queue) + (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue"))) (locking threadpool @(d/transact conn i/schema) (let [new-cfg (swap! *config* @@ -96,9 +99,6 @@ :system-error (atom {}) :healthy? (atom nil) :slow? (atom nil) - :get-tx-report-queue (fn [] - (or tx-report-queue - (d/tx-report-queue conn))) :slow-thread-watcher-done? (promise)} default-opts (if *test-mode* old-conf (select-keys old-conf [:handlers])) @@ -144,12 +144,37 @@ (reset! *running?* true) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) - (.execute ^ScheduledExecutorService pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.execute ^ScheduledExecutorService pool + (fn [] + (try + (log/debug "report-queue-listener starting") + (rq/report-queue-listener *running?* queue-listener-ready pool *config*) + (finally + (log/debug "report-queue-listener exiting") + (deliver queue-listener-ready :finally))))) (future (try (slow-executor/show-slow-threads pool *config*) (finally (deliver slow-thread-watcher-done? :done)))) - @queue-listener-ready))) + (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)] + (cond (= :timeout q-listener-retval) + (do + (log/error "Timed out waiting for report-queue-listener to start") + (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start"))) + + (= :finally q-listener-retval) + (do + (log/error "report-queue-listener did not start") + (throw (IllegalStateException. "report-queue-listener did not start"))) + + (= :ready q-listener-retval) + (do + (log/debug "report-queue-listener is ready")) + + :else + (do + (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))) + (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))))))))) (defn start! [] @@ -359,14 +384,13 @@ :min (apply min values))}))) (into (sorted-map))))) +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + The multicaster is started on demand. `conn` and `id` identifies the consumer. - -(defn add-tx-report-queue! - ([conn] - (add-tx-report-queue! conn :default)) - ([conn id] - (if @*config* - :...))) + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + [conn id] + (rq/get-tx-report-queue-multicast! conn id)) (comment (do @@ -401,3 +425,37 @@ @started-consuming? (stop!) nil))))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq.migrate"} :warn] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true))) + (log/info "begin start! ...") + (start!) + (log/info "begin start! ... Done") + (Thread/sleep 2000) + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop!) + (log/info "stop! done") + nil)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index 20e0a93..9cddc93 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -3,8 +3,8 @@ [com.github.ivarref.yoltq.impl :as i] [datomic.api :as d] [clojure.tools.logging :as log]) - (:import (datomic Datom) - (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit))) + (:import (datomic Connection Datom) + (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) (defn process-poll-result! [cfg id-ident poll-result consumer] @@ -28,18 +28,24 @@ (i/take! cfg) (i/execute! cfg))))) (catch Throwable t - (log/error t "unexpected error in process-poll-result!"))))))))) + (log/error t "Unexpected error in process-poll-result!"))))))))) (defn report-queue-listener [running? ready? ^ScheduledExecutorService pool config-atom] - (let [conn (:conn @config-atom) - ^BlockingQueue q (d/tx-report-queue conn) + (let [cfg @config-atom + conn (:conn cfg) + tx-report-queue-given (contains? cfg :tx-report-queue) + ^BlockingQueue q (if tx-report-queue-given + (get cfg :tx-report-queue) + (d/tx-report-queue conn)) id-ident (d/q '[:find ?e . :where [?e :db/ident :com.github.ivarref.yoltq/id]] (d/db conn))] + (assert (instance? BlockingQueue q)) + (log/debug "tx-report-queue-given:" tx-report-queue-given) (try (while @running? (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] @@ -49,9 +55,118 @@ (fn [f] (when @running? (.execute ^ScheduledExecutorService pool f))))) - (deliver ready? true)) + (deliver ready? :ready)) (catch Throwable t - (log/error t "unexpected error in report-queue-listener")) + (log/error t "Unexpected error in report-queue-listener:" (.getMessage t))) (finally - (log/debug "remove tx-report-queue") - (d/remove-tx-report-queue conn))))) \ No newline at end of file + (if tx-report-queue-given + (log/debug "Remove tx-report-queue handled elsewhere") + (do + (log/debug "Remove tx-report-queue") + (d/remove-tx-report-queue conn))))))) + +(defonce ^:private multicast-state-lock (Object.)) + +(defonce ^:private multicast-state (atom {})) + +(defn- start-multicaster! [conn] + (let [multicaster-ready? (promise)] + (future + (log/debug "Multicaster starting for conn" conn) + (try + (let [input-queue (d/tx-report-queue conn)] + (loop [] + (when-let [mcast-state (get @multicast-state conn)] + (when-let [dest-queues (vals mcast-state)] + (let [element (.poll ^BlockingQueue input-queue 1 TimeUnit/SECONDS)] + (deliver multicaster-ready? :ready) + (when (some? element) + (doseq [q dest-queues] + (let [ok-offer (.offer ^BlockingQueue q element 30 TimeUnit/MINUTES)] + (when (false? ok-offer) + (log/error "Failed to offer item in multicaster for connection" conn)))))) + (recur))))) + (catch Throwable t + (deliver multicaster-ready? :error) + (log/error t "Unexpected error in multicaster:" (.getMessage t))) + (finally + (d/remove-tx-report-queue conn) + (log/debug "Multicaster exiting for conn" conn)))) + multicaster-ready?)) + +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + The multicaster is started on demand. `conn` and `id` identifies the consumer. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + [conn id] + (assert (instance? Connection conn)) + (assert (keyword? id)) + (locking multicast-state-lock + (assert (map? @multicast-state)) + (if-let [existing-q (get-in @multicast-state [conn id])] + (do + (log/debug "returning existing queue for id" id) + (assert (instance? BlockingQueue existing-q)) + existing-q) + (let [needs-multicaster? (not (contains? @multicast-state conn)) + new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [conn id] (LinkedBlockingQueue.))))] + (when needs-multicaster? + (let [multicaster-promise (start-multicaster! conn) + multicaster-result (deref multicaster-promise (* 30 60000) :timeout)] + (cond (= multicaster-result :timeout) + (do + (log/error "Timeout waiting for multicaster to start") + (throw (RuntimeException. "Timeout waiting for multicaster to start"))) + (= multicaster-result :error) + (do + (log/error "Multicaster failed to start") + (throw (RuntimeException. "Multicaster failed to start"))) + (= multicaster-result :ready) + (log/debug "Multicaster is ready") + + :else + (do + (log/error "Unexpected state from multicaster:" multicaster-result) + (throw (RuntimeException. (str "Unexpected state from multicaster: " multicaster-result))))))) + (let [new-q (get-in new-state [conn id])] + (assert (instance? BlockingQueue new-q)) + new-q))))) + +(defn stop-all-multicasters! [] + (reset! multicast-state {})) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) + +(comment + (defn drain! [^BlockingQueue q] + (loop [cnt 0] + (if (nil? (.poll q 1 TimeUnit/SECONDS)) + cnt + (recur (inc cnt)))))) + +(comment + (let [q-1 (get-tx-report-queue-multicast! conn :q1) + q-2 (get-tx-report-queue-multicast! conn :q2)])) + +(comment + (drain! (get-tx-report-queue-multicast! conn :q1))) + +(comment + (do + @(d/transact conn [{:db/doc "demo"}]) + :yay)) \ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj index 1aa6c02..f3fb6dc 100644 --- a/test/com/github/ivarref/yoltq/log_init.clj +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -3,6 +3,8 @@ [taoensso.timbre :as timbre] [clojure.string :as str])) +(set! *warn-on-reflection* true) + (def level-colors {;:warn colors/red :error colors/red}) -- cgit v1.2.3 From a1e4e1b96fd254ec7d7e467648dd5e88f1c9530b Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 21 May 2025 13:38:51 +0200 Subject: Doc. Return true/false if queues were stopped or not #7 --- README.md | 37 ++++++- deps.edn | 12 +++ src/com/github/ivarref/yoltq.clj | 23 +++- src/com/github/ivarref/yoltq/report_queue.clj | 146 ++++++++++++++++++-------- 4 files changed, 163 insertions(+), 55 deletions(-) (limited to 'deps.edn') diff --git a/README.md b/README.md index f84a336..675fa9a 100644 --- a/README.md +++ b/README.md @@ -441,21 +441,48 @@ then not grab the datomic report queue, but use the one provided: ```clojure (require '[com.github.ivarref.yoltq :as yq]) (yq/init! {:conn conn - :tx-report-queue (yq/get-tx-report-queue-multicast! my-conn :yoltq) + :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq) ; ^^ can be any `java.util.concurrent.BlockingQueue` value }) -(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! my-conn :another-consumer-id)) +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id)) ``` Added multicast support for `datomic.api/tx-report-queue`: ```clojure -(def my-q1 (yq/get-tx-report-queue-multicast! my-conn :q-id-1)) +(require '[com.github.ivarref.yoltq :as yq]) +(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1)) ; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` -(def my-q2 (yq/get-tx-report-queue-multicast! my-conn :q-id-2)) -; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` for the given `conn` + +(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true)) +; my-q3 specifies the third argument, `send-end-token?`, to true, so it will receive `:end` if the queue is stopped. +; This can enable simpler consuming of queues: +(future + (loop [] + (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)] + (if (= q-item :end) + (println "Time to exit. Goodbye!") + (do + (println "Processing q-item" q-item) + (recur)))))) + +@(d/transact conn [{:db/doc "new-data"}]) + +; Stop the queue: +(yq/stop-multicaster-id! conn :q-id-3) +=> true +; The multicaster thread will send `:end` and the consumer thread will then print "Time to exit. Goodbye!". + +; if the queue is already stopped (or never was started), `stop-multicaster...` functions will return false: +(yq/stop-multicaster-id! conn :already-stopped-queue-or-typo) +=> false + +; Stop all queues for all connections: +(yq/stop-all-multicasters!) ``` `yq/get-tx-report-queue-multicast!` returns, like diff --git a/deps.edn b/deps.edn index 1e3fa9d..a328c86 100644 --- a/deps.edn +++ b/deps.edn @@ -22,6 +22,18 @@ :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :repl {:extra-paths ["test"] + :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} + ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"}} + :exec-fn rebel-readline.tool/repl + :exec-args {} + :main-opts ["-m" "rebel-readline.main"]} + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 80c9491..298b9d5 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -407,7 +407,11 @@ If this is the last report destination for the given `conn`, the multicaster thread will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if the queue was stopped. + Return `false` if the queue does not exist." [conn id] (assert (instance? Connection conn)) (rq/stop-multicaster-id! conn id)) @@ -417,7 +421,11 @@ The multicaster thread will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue belonging to `conn` was stopped. + Returns `false` is `conn` did not have any associated queues." [conn] (assert (instance? Connection conn)) (rq/stop-multicaster! conn)) @@ -427,7 +435,11 @@ All multicaster threads will exit. Repeated calls are no-op. - Returns nil." + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue was stopped. + Returns `false` if no queues existed." [] (rq/stop-all-multicasters!)) @@ -485,7 +497,7 @@ started-consuming? (promise) n 1] (init! {:conn conn - :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq) + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true) :slow-thread-show-stacktrace? false}) (add-consumer! :q (fn [_] (deliver started-consuming? true))) @@ -493,8 +505,11 @@ (start!) (log/info "begin start! ... Done") (Thread/sleep 2000) + (log/info "*******************************************") @(d/transact conn [(put :q {:work 123})]) @started-consuming? + (stop-multicaster! conn) + (log/info "*******************************************") (stop!) (log/info "stop! done") nil)))) \ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj index a9f7e07..c3fd383 100644 --- a/src/com/github/ivarref/yoltq/report_queue.clj +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -52,9 +52,9 @@ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] (if (= poll-result :end) (do - (log/debug "Report queue listener received :end token. Exiting") + (log/debug "report-queue-listener received :end token. Exiting") (reset! running-local? false)) - ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) (process-poll-result! @config-atom id-ident poll-result @@ -115,9 +115,11 @@ (if send-end-token? (do #_(log/debug "offering :end token") - (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)) + (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS) + (log/debug "Multicaster sent :end token") + (log/debug "Multicaster failed to send :end token"))) (do - #_(log/debug "not offering :end token")))) + (log/debug "Multicaster not sending :end token")))) (when (seq new-state) (if (some? work-item) (reduce-kv @@ -125,7 +127,7 @@ (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)] (if (true? ok-offer) (assoc m id [send-end-token? q]) - (log/warn "Failed to offer item in multicaster for connection" conn "and queue id" id)))) + (log/error "Multicaster failed to offer item for connection" conn "and queue id" id)))) {} new-state) new-state))) @@ -150,6 +152,7 @@ (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))) (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec))) (d/remove-tx-report-queue conn) + (log/debug "Multicaster removed tx-report-queue for conn" conn) nil)))] (if new-state (recur new-state) @@ -180,26 +183,26 @@ (defn- wait-multicast-thread-step [conn] -; `get-tx-report-queue-multicast!` should return only when the multicaster thread -; has picked up the new queue. -; -; Otherwise the following could happen: -; 1. multicast thread is sleeping -; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` -; 3: user-thread (or somebody else) calls `stop-multicaster`. -; The multicast-state atom is now identical as it was in step 1. -; , Step 2 and 3 happened while the multicast thread was sleeping. -; 4: The multicast thread is scheduled and does _not_ detect any state change. -; Therefore the multicast thread does _not_ send out an :end token as one would expect. -; -; The new queue is written to memory at this point. No other thread can remove it because -; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. -; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, -; we can be sure that no other thread changes or has changed the state. -; -; Once [:iter-count conn] has changed, we know that the multicaster thread -; will see the new queue. This means that we can be sure that the queue -; will receive the `:end` token if the queue is stopped. + ; `get-tx-report-queue-multicast!` should return only when the multicaster thread + ; has picked up the new queue. + ; + ; Otherwise the following could happen: + ; 1. multicast thread is sleeping + ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` + ; 3: user-thread (or somebody else) calls `stop-multicaster`. + ; The multicast-state atom is now identical as it was in step 1. + ; , Step 2 and 3 happened while the multicast thread was sleeping. + ; 4: The multicast thread is scheduled and does _not_ detect any state change. + ; Therefore the multicast thread does _not_ send out an :end token as one would expect. + ; + ; The new queue is written to memory at this point. No other thread can remove it because + ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. + ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, + ; we can be sure that no other thread changes or has changed the state. + ; + ; Once [:iter-count conn] has changed, we know that the multicaster thread + ; will see the new queue. This means that we can be sure that the queue + ; will receive the `:end` token if the queue is stopped. (let [start-ms (System/currentTimeMillis) iter-count (get-in @multicast-state [:iter-count conn] -1)] (loop [spin-count 0] @@ -250,10 +253,10 @@ (if needs-multicaster? (do (start-multicaster! conn) - (log/debug "Multicaster thread started. Returning new queue for id" id) + (log/debug "Returning new queue for id" id "(multicaster thread started)") new-q) (do - (log/debug "Multicaster thread already exists. Returning new queue for id" id) + (log/debug "Returning new queue for id" id "(multicaster thread already running)") new-q)))))] ; wait for multicaster thread to pick up current Queue (wait-multicast-thread-step conn) @@ -306,32 +309,83 @@ (Thread/sleep 16) (recur)))))))))))))) +(defn- all-queues [state] + (->> (mapcat (fn [[conn qmap]] + (mapv (fn [q-id] [conn q-id]) + (keys qmap))) + (seq (get state :queues {}))) + (into #{}))) + +(comment + (do + (assert (= #{} + (all-queues {}))) + (assert (= #{} + (all-queues {:queues {}}))) + (assert (= #{[:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1}}}))) + (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}}))) + (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2} + :conn-b {:q-id-3 3}}}))))) + +(defn- removed-queues? [old new] + (not= (all-queues old) + (all-queues new))) + (defn stop-multicaster-id! [conn id] (assert (instance? Connection conn)) - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] - (let [new-state (dissoc-in old-state [:queues conn id])] - (if (= {} (get-in new-state [:queues conn])) - (dissoc-in old-state [:queues conn]) - new-state))))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] + (let [new-state (dissoc-in old-state [:queues conn id])] + (if (= {} (get-in new-state [:queues conn])) + (dissoc-in old-state [:queues conn]) + new-state))))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) (defn stop-multicaster! [conn] (assert (instance? Connection conn)) - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) (defn stop-all-multicasters! [] - (locking consumer-state-lock - (wait-multicast-threads-exit - (locking multicast-state-lock - (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))))) - nil) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (require '[datomic.api :as d]) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) (comment (do -- cgit v1.2.3 From 9bbf1698b1017611053b7930f25ce8dd2553c571 Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 18 Jun 2025 09:09:48 +0200 Subject: Release using GitHub --- deps.edn | 61 ++++++++++++++++++++++++++++--------------------------------- 1 file changed, 28 insertions(+), 33 deletions(-) (limited to 'deps.edn') diff --git a/deps.edn b/deps.edn index a328c86..2ca2ddf 100644 --- a/deps.edn +++ b/deps.edn @@ -5,39 +5,34 @@ :paths ["src"] - :aliases {:test {:extra-paths ["test"] - :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.2.1"} - com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} - clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} - org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} - com.taoensso/nippy {:mvn/version "3.2.0"} - io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} - :exec-fn cognitect.test-runner.api/test - :jvm-opts ["-DDISABLE_SPY=true" - "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] - :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + :aliases {:test {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :exec-fn cognitect.test-runner.api/test + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} - :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" - :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} - :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" + :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} + :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} - :repl {:extra-paths ["test"] - :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} - ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.2.1"} - com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} - clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} - org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} - com.taoensso/nippy {:mvn/version "3.2.0"}} - :exec-fn rebel-readline.tool/repl - :exec-args {} - :main-opts ["-m" "rebel-readline.main"]} + :repl {:extra-paths ["test"] + :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} + ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"}} + :exec-fn rebel-readline.tool/repl + :exec-args {} + :main-opts ["-m" "rebel-readline.main"]} - :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} - - :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.2.0"}} - :exec-fn deps-deploy.deps-deploy/deploy - :exec-args {:installer :remote - :sign-releases? false - :artifact "target/out.jar"}}}} \ No newline at end of file + :build {:deps {com.github.liquidz/build.edn {:mvn/version "0.11.241"}} + :ns-default build-edn.main}}} -- cgit v1.2.3 From f0b2c9a2a5ff4191c9226f349e41306bec8e0278 Mon Sep 17 00:00:00 2001 From: ire Date: Wed, 18 Jun 2025 09:14:15 +0200 Subject: Release using GitHub --- deps.edn | 70 ++++++++++++++++++++++++++++++++------------------------------ pom.xml | 41 ------------------------------------ release.sh | 37 --------------------------------- 3 files changed, 36 insertions(+), 112 deletions(-) delete mode 100644 pom.xml delete mode 100755 release.sh (limited to 'deps.edn') diff --git a/deps.edn b/deps.edn index 2ca2ddf..f0488fa 100644 --- a/deps.edn +++ b/deps.edn @@ -1,38 +1,40 @@ -{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} - org.clojure/tools.logging {:mvn/version "1.2.4"} - org.clojure/clojure {:mvn/version "1.11.1"} - com.datomic/peer {:mvn/version "1.0.7364"}} +{:deps + {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"} + com.datomic/peer {:mvn/version "1.0.7364"}} - :paths ["src"] + :paths + ["src"] - :aliases {:test {:extra-paths ["test"] - :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.2.1"} - com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} - clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} - org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} - com.taoensso/nippy {:mvn/version "3.2.0"} - io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} - :exec-fn cognitect.test-runner.api/test - :jvm-opts ["-DDISABLE_SPY=true" - "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] - :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + :aliases + {:test + {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :exec-fn cognitect.test-runner.api/test + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} - :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" - :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} - :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + :repl + {:extra-paths ["test"] + :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} + ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"}} + :exec-fn rebel-readline.tool/repl + :exec-args {} + :main-opts ["-m" "rebel-readline.main"]} - :repl {:extra-paths ["test"] - :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} - ivarref/datomic-schema {:mvn/version "0.2.0"} - com.taoensso/timbre {:mvn/version "5.2.1"} - com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} - clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} - org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} - com.taoensso/nippy {:mvn/version "3.2.0"}} - :exec-fn rebel-readline.tool/repl - :exec-args {} - :main-opts ["-m" "rebel-readline.main"]} - - :build {:deps {com.github.liquidz/build.edn {:mvn/version "0.11.241"}} - :ns-default build-edn.main}}} + :build + {:deps {com.github.liquidz/build.edn {:mvn/version "0.11.241"}} + :ns-default build-edn.main}}} diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 466f47a..0000000 --- a/pom.xml +++ /dev/null @@ -1,41 +0,0 @@ - - - 4.0.0 - jar - com.github.ivarref - yoltq - 0.2.64 - yoltq - - - org.clojure - clojure - 1.11.1 - - - com.github.ivarref - double-trouble - 0.1.102 - - - org.clojure - tools.logging - 1.2.4 - - - - src - - - - clojars - https://repo.clojars.org/ - - - - scm:git:git://github.com/ivarref/yoltq.git - scm:git:ssh://git@github.com/ivarref/yoltq.git - v0.2.64 - https://github.com/ivarref/yoltq - - \ No newline at end of file diff --git a/release.sh b/release.sh deleted file mode 100755 index 3d06135..0000000 --- a/release.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash - -if [[ $# -ne 1 ]]; then - echo "Illegal number of parameters" >&2 - exit 2 -fi - -set -ex - -git update-index --refresh -git diff-index --quiet HEAD -- - -clojure -Spom -clojure -M:test -clojure -M:jar -clojure -X:release ivarref.pom-patch/clojars-repo-only! - -LAST_TAG="$(git rev-list --tags --no-walk --max-count=1)" -COMMITS_SINCE_LAST_TAG="$(git rev-list "$LAST_TAG"..HEAD --count)" -echo "Squashing $COMMITS_SINCE_LAST_TAG commits ..." -git reset --soft HEAD~"$COMMITS_SINCE_LAST_TAG" -MSG="$(git log --format=%B --reverse HEAD..HEAD@{1})" -git commit -m"$MSG" - -VERSION="$(clojure -X:release ivarref.pom-patch/set-patch-version! :patch :commit-count)" -echo "Releasing $VERSION" -sed -i "s/HEAD/v$VERSION/g" ./README.md -git add pom.xml README.md -git commit -m "Release $VERSION" -git reset --soft HEAD~2 -git commit -m"Release $VERSION: $1" - -git tag -a v"$VERSION" -m "Release v$VERSION: $1" -git push --follow-tags --force - -clojure -X:deploy -echo "Released $VERSION" -- cgit v1.2.3