diff options
Diffstat (limited to 'README.md')
| -rw-r--r-- | README.md | 130 |
1 files changed, 110 insertions, 20 deletions
@@ -1,7 +1,8 @@ # yoltq -An opinionated Datomic queue for building (more) reliable systems. -Implements the [transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html) +An opinionated Datomic queue for building (more) reliable systems. +Implements the +[transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html) pattern. Supports retries, backoff, ordering and more. On-prem only. @@ -62,8 +63,8 @@ Imagine the following code: ```clojure (defn post-handler [user-input] (let [db-item (process user-input) - ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds - :socket-timeout 10000 ; timeout in milliseconds + ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds + :socket-timeout 10000 ; milliseconds ...})] ; may throw exception @(d/transact conn [(assoc db-item :some/ext-ref ext-ref)]))) ``` @@ -78,8 +79,8 @@ The queue way to solve this would be: ```clojure (defn get-ext-ref [{:keys [id]}] - (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds - :socket-timeout 10000 ; timeout in milliseconds + (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds + :socket-timeout 10000 ; milliseconds ...})] ; may throw exception @(d/transact conn [[:db/cas [:some/id id] :some/ext-ref @@ -105,7 +106,8 @@ Thus `get-ext-ref` and any queue consumer should tolerate to be executed successfully several times. For `get-ext-ref` this is solved by using -the database function [:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas) +the database function +[:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas) to achieve a write-once behaviour. The yoltq system treats cas failures as job successes when a consumer has `:allow-cas-failure?` set to `true` in its options. @@ -154,18 +156,19 @@ the payload. It can be added like this: ; An optional map of queue opts {:allow-cas-failure? true ; Treat [:db.cas ...] failures as success. This is one way for the ; consumer function to ensure idempotence. - :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload. Should return truthy for valid payloads. + :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload. + ; Should return truthy for valid payloads. ; The default function always returns true. :max-retries 10}) ; Specify maximum number of times an item will be retried. Default: 10000. ; If :max-retries is given as 0, the job will ~always be retried, i.e. ; 9223372036854775807 times (Long/MAX_VALUE). ``` -The `payload` will be deserialized from the database using `clojure.edn/read-string` before invocation, i.e. -you will get back what you put into `yq/put`. +The `payload` will be deserialized from the database using `clojure.edn/read-string` before +invocation, i.e. you will get back what you put into `yq/put`. -The yoltq system treats a queue consumer function invocation as successful if it does not throw an exception. -Any return value, be it `nil`, `false`, `true`, etc. is considered a success. +The yoltq system treats a queue consumer function invocation as successful if it does not throw +an exception. Any return value, be it `nil`, `false`, `true`, etc. is considered a success. ### Listening for queue jobs @@ -251,7 +254,8 @@ For example if you want to use [nippy](https://github.com/ptaoussanis/nippy): ### Partitions -Yoltq supports specifying which [partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions) +Yoltq supports specifying which +[partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions) queue entities should belong to. The default function is: ```clojure @@ -270,7 +274,8 @@ E.g.: ### All configuration options For an exhaustive list of all configuration options, -see [yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21). +see +[yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21). # Regular and REPL usage @@ -426,15 +431,94 @@ Note: I have not tried these libraries myself. If you liked this library, you may also like: -* [conformity](https://github.com/avescodes/conformity): A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, data, or otherwise. -* [datomic-schema](https://github.com/ivarref/datomic-schema): Simplified writing of Datomic schemas (works with conformity). -* [double-trouble](https://github.com/ivarref/double-trouble): Handle duplicate Datomic transactions with ease. -* [gen-fn](https://github.com/ivarref/gen-fn): Generate Datomic function literals from regular Clojure namespaces. -* [rewriting-history](https://github.com/ivarref/rewriting-history): A library to rewrite Datomic history. +* [conformity](https://github.com/avescodes/conformity): + A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, + data, or otherwise. +* [datomic-schema](https://github.com/ivarref/datomic-schema): + Simplified writing of Datomic schemas (works with conformity). +* [double-trouble](https://github.com/ivarref/double-trouble): + Handle duplicate Datomic transactions with ease. +* [gen-fn](https://github.com/ivarref/gen-fn): + Generate Datomic function literals from regular Clojure namespaces. +* [rewriting-history](https://github.com/ivarref/rewriting-history): + A library to rewrite Datomic history. ## Change log +#### [Unreleased] + +#### [0.2.85] - 2025-07-29 + +#### [v0.2.82] - 2025-06-18 +Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will +then not grab the datomic report queue, but use the one provided: + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(yq/init! {:conn conn + :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq) + ; ^^ can be any `java.util.concurrent.BlockingQueue` value + }) + +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id)) + +``` + +Added multicast support for `datomic.api/tx-report-queue`: +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1)) +; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` + +(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +; for the given `conn` + +(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true)) +; my-q3 sets the optional third argument, `send-end-token?`, to true. +; The queue will then receive `:end` if the queue is stopped. +; This can enable simpler consuming of queues: +(future + (loop [] + (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)] + (if (= q-item :end) + (println "Time to exit. Goodbye!") + (do + (println "Processing q-item" q-item) + (recur)))))) + +; The default value for `send-end-token?` is `false`, i.e. the behaviour will be +; identical to that of datomic.api/tx-report-queue. + +@(d/transact conn [{:db/doc "new-data"}]) + +; Stop the queue: +(yq/stop-multicast-consumer-id! conn :q-id-3) +=> true +; The multicaster thread will send `:end`. +; The consumer thread will then print "Time to exit. Goodbye!". + +; if the queue is already stopped (or never was started), the `stop-multicaster...` +; functions will return false: +(yq/stop-multicast-consumer-id! conn :already-stopped-queue-or-typo) +=> false + +; Stop all queues for all connections: +(yq/stop-all-multicasters!) +``` + +`yq/get-tx-report-queue-multicast!` returns, like +`datomic.api/tx-report-queue`, +`java.util.concurrent.BlockingQueue` and starts a background thread that does +the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!` +returns the same `BlockingQueue`. + +Changed the default for `max-retries` from `10000` to `9223372036854775807`. + +Fixed reflection warnings. + #### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64) + Added support for `max-retries` being `0`, meaning the job should be retried forever (or at least 9223372036854775807 times). @@ -443,7 +527,8 @@ Changed the default for `max-retries` from `100` to `10000`. #### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63) Added custom `:encode` and `:decode` support. -Added support for specifying `:partifion-fn` to specify which partition a queue item should belong to. +Added support for specifying `:partifion-fn` to specify +which partition a queue item should belong to. It defaults to: ```clojure (defn default-partition-fn [_queue-name] @@ -619,8 +704,13 @@ Added `:valid-payload?` option for queue consumers. Improved error reporting. #### 2021-09-24 v0.2.33 + First publicly announced release. +## Making a new release + +Go to https://github.com/ivarref/yoltq/actions/workflows/release.yml and press `Run workflow`. + ## License Copyright © 2021-2022 Ivar Refsdal |
