aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2025-12-05 19:57:57 -0300
committerEuAndreh <eu@euandre.org>2025-12-05 19:57:57 -0300
commite1b99db2364971d456a62e8ec7567e6b5250d004 (patch)
tree8b1b7772cc44d90b97856d5d02481f3649e65908
parentrm -rf *: setup change to Clojure (diff)
parentUpdate for release (diff)
downloadfiinha-e1b99db2364971d456a62e8ec7567e6b5250d004.tar.gz
fiinha-e1b99db2364971d456a62e8ec7567e6b5250d004.tar.xz
Merge remote-tracking branch 'upstream/main', where "upstream" is yoltq.
-rw-r--r--.github/workflows/release.yml52
-rw-r--r--.gitignore14
-rw-r--r--LICENSE277
-rw-r--r--README.md769
-rw-r--r--build.edn34
-rw-r--r--deps.edn40
-rw-r--r--src/com/github/ivarref/yoltq.clj539
-rw-r--r--src/com/github/ivarref/yoltq/error_poller.clj124
-rw-r--r--src/com/github/ivarref/yoltq/ext_sys.clj27
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj248
-rw-r--r--src/com/github/ivarref/yoltq/migrate.clj61
-rw-r--r--src/com/github/ivarref/yoltq/poller.clj64
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj459
-rw-r--r--src/com/github/ivarref/yoltq/slow_executor_detector.clj36
-rw-r--r--src/com/github/ivarref/yoltq/test_queue.clj191
-rw-r--r--src/com/github/ivarref/yoltq/utils.clj179
-rw-r--r--test/com/github/ivarref/yoltq/error_poller_test.clj35
-rw-r--r--test/com/github/ivarref/yoltq/http_hang_demo.clj45
-rw-r--r--test/com/github/ivarref/yoltq/log_init.clj66
-rw-r--r--test/com/github/ivarref/yoltq/migrate_test.clj92
-rw-r--r--test/com/github/ivarref/yoltq/readme_demo.clj48
-rw-r--r--test/com/github/ivarref/yoltq/test_utils.clj80
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj474
23 files changed, 3954 insertions, 0 deletions
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
new file mode 100644
index 0000000..1350854
--- /dev/null
+++ b/.github/workflows/release.yml
@@ -0,0 +1,52 @@
+name: Tag and Release
+on: workflow_dispatch
+
+jobs:
+ tag-and-release:
+ runs-on: ubuntu-22.04
+ steps:
+ - uses: actions/checkout@v3
+ with:
+ # NOTE: Fetch all for counting commits
+ fetch-depth: 0
+ - uses: actions/setup-java@v3
+ with:
+ distribution: 'adopt'
+ java-version: 21
+ - uses: DeLaGuardo/setup-clojure@13.4
+ with:
+ cli: 1.12.0.1530
+
+ - name: Show versions
+ run: |
+ java -version
+ clojure --version
+
+ - name: deploy to clojars
+ # NOTE: Specify ID to refer outputs from other steps
+ id: deploy
+ run: |
+ clojure -T:build deploy
+ env:
+ CLOJARS_PASSWORD: ${{secrets.CLOJARS_PASSWORD}}
+ CLOJARS_USERNAME: ${{secrets.CLOJARS_USERNAME}}
+
+ - uses: actions/create-release@v1
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ with:
+ # NOTE: Refer outputs
+ tag_name: ${{ steps.deploy.outputs.version }}
+ release_name: ${{ steps.deploy.outputs.version }}
+ body: released
+ draft: false
+ prerelease: false
+
+ - run: |
+ clojure -T:build update-documents
+ git diff
+ git config --global user.email "github-actions@example.com"
+ git config --global user.name "github-actions"
+ git add -A
+ git commit -m "Update for release"
+ git push \ No newline at end of file
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..707e1be
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,14 @@
+.idea/
+*.iml
+.cpcache/
+.nrepl-port
+target/
+.connkey
+tree.txt
+.db.url
+.stage-url.txt
+*.pom.asc
+*.pom
+temp/
+.clj-kondo/
+.rebel_readline_history \ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d3087e4
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,277 @@
+Eclipse Public License - v 2.0
+
+ THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
+ PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION
+ OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
+
+1. DEFINITIONS
+
+"Contribution" means:
+
+ a) in the case of the initial Contributor, the initial content
+ Distributed under this Agreement, and
+
+ b) in the case of each subsequent Contributor:
+ i) changes to the Program, and
+ ii) additions to the Program;
+ where such changes and/or additions to the Program originate from
+ and are Distributed by that particular Contributor. A Contribution
+ "originates" from a Contributor if it was added to the Program by
+ such Contributor itself or anyone acting on such Contributor's behalf.
+ Contributions do not include changes or additions to the Program that
+ are not Modified Works.
+
+"Contributor" means any person or entity that Distributes the Program.
+
+"Licensed Patents" mean patent claims licensable by a Contributor which
+are necessarily infringed by the use or sale of its Contribution alone
+or when combined with the Program.
+
+"Program" means the Contributions Distributed in accordance with this
+Agreement.
+
+"Recipient" means anyone who receives the Program under this Agreement
+or any Secondary License (as applicable), including Contributors.
+
+"Derivative Works" shall mean any work, whether in Source Code or other
+form, that is based on (or derived from) the Program and for which the
+editorial revisions, annotations, elaborations, or other modifications
+represent, as a whole, an original work of authorship.
+
+"Modified Works" shall mean any work in Source Code or other form that
+results from an addition to, deletion from, or modification of the
+contents of the Program, including, for purposes of clarity any new file
+in Source Code form that contains any contents of the Program. Modified
+Works shall not include works that contain only declarations,
+interfaces, types, classes, structures, or files of the Program solely
+in each case in order to link to, bind by name, or subclass the Program
+or Modified Works thereof.
+
+"Distribute" means the acts of a) distributing or b) making available
+in any manner that enables the transfer of a copy.
+
+"Source Code" means the form of a Program preferred for making
+modifications, including but not limited to software source code,
+documentation source, and configuration files.
+
+"Secondary License" means either the GNU General Public License,
+Version 2.0, or any later versions of that license, including any
+exceptions or additional permissions as identified by the initial
+Contributor.
+
+2. GRANT OF RIGHTS
+
+ a) Subject to the terms of this Agreement, each Contributor hereby
+ grants Recipient a non-exclusive, worldwide, royalty-free copyright
+ license to reproduce, prepare Derivative Works of, publicly display,
+ publicly perform, Distribute and sublicense the Contribution of such
+ Contributor, if any, and such Derivative Works.
+
+ b) Subject to the terms of this Agreement, each Contributor hereby
+ grants Recipient a non-exclusive, worldwide, royalty-free patent
+ license under Licensed Patents to make, use, sell, offer to sell,
+ import and otherwise transfer the Contribution of such Contributor,
+ if any, in Source Code or other form. This patent license shall
+ apply to the combination of the Contribution and the Program if, at
+ the time the Contribution is added by the Contributor, such addition
+ of the Contribution causes such combination to be covered by the
+ Licensed Patents. The patent license shall not apply to any other
+ combinations which include the Contribution. No hardware per se is
+ licensed hereunder.
+
+ c) Recipient understands that although each Contributor grants the
+ licenses to its Contributions set forth herein, no assurances are
+ provided by any Contributor that the Program does not infringe the
+ patent or other intellectual property rights of any other entity.
+ Each Contributor disclaims any liability to Recipient for claims
+ brought by any other entity based on infringement of intellectual
+ property rights or otherwise. As a condition to exercising the
+ rights and licenses granted hereunder, each Recipient hereby
+ assumes sole responsibility to secure any other intellectual
+ property rights needed, if any. For example, if a third party
+ patent license is required to allow Recipient to Distribute the
+ Program, it is Recipient's responsibility to acquire that license
+ before distributing the Program.
+
+ d) Each Contributor represents that to its knowledge it has
+ sufficient copyright rights in its Contribution, if any, to grant
+ the copyright license set forth in this Agreement.
+
+ e) Notwithstanding the terms of any Secondary License, no
+ Contributor makes additional grants to any Recipient (other than
+ those set forth in this Agreement) as a result of such Recipient's
+ receipt of the Program under the terms of a Secondary License
+ (if permitted under the terms of Section 3).
+
+3. REQUIREMENTS
+
+3.1 If a Contributor Distributes the Program in any form, then:
+
+ a) the Program must also be made available as Source Code, in
+ accordance with section 3.2, and the Contributor must accompany
+ the Program with a statement that the Source Code for the Program
+ is available under this Agreement, and informs Recipients how to
+ obtain it in a reasonable manner on or through a medium customarily
+ used for software exchange; and
+
+ b) the Contributor may Distribute the Program under a license
+ different than this Agreement, provided that such license:
+ i) effectively disclaims on behalf of all other Contributors all
+ warranties and conditions, express and implied, including
+ warranties or conditions of title and non-infringement, and
+ implied warranties or conditions of merchantability and fitness
+ for a particular purpose;
+
+ ii) effectively excludes on behalf of all other Contributors all
+ liability for damages, including direct, indirect, special,
+ incidental and consequential damages, such as lost profits;
+
+ iii) does not attempt to limit or alter the recipients' rights
+ in the Source Code under section 3.2; and
+
+ iv) requires any subsequent distribution of the Program by any
+ party to be under a license that satisfies the requirements
+ of this section 3.
+
+3.2 When the Program is Distributed as Source Code:
+
+ a) it must be made available under this Agreement, or if the
+ Program (i) is combined with other material in a separate file or
+ files made available under a Secondary License, and (ii) the initial
+ Contributor attached to the Source Code the notice described in
+ Exhibit A of this Agreement, then the Program may be made available
+ under the terms of such Secondary Licenses, and
+
+ b) a copy of this Agreement must be included with each copy of
+ the Program.
+
+3.3 Contributors may not remove or alter any copyright, patent,
+trademark, attribution notices, disclaimers of warranty, or limitations
+of liability ("notices") contained within the Program from any copy of
+the Program which they Distribute, provided that Contributors may add
+their own appropriate notices.
+
+4. COMMERCIAL DISTRIBUTION
+
+Commercial distributors of software may accept certain responsibilities
+with respect to end users, business partners and the like. While this
+license is intended to facilitate the commercial use of the Program,
+the Contributor who includes the Program in a commercial product
+offering should do so in a manner which does not create potential
+liability for other Contributors. Therefore, if a Contributor includes
+the Program in a commercial product offering, such Contributor
+("Commercial Contributor") hereby agrees to defend and indemnify every
+other Contributor ("Indemnified Contributor") against any losses,
+damages and costs (collectively "Losses") arising from claims, lawsuits
+and other legal actions brought by a third party against the Indemnified
+Contributor to the extent caused by the acts or omissions of such
+Commercial Contributor in connection with its distribution of the Program
+in a commercial product offering. The obligations in this section do not
+apply to any claims or Losses relating to any actual or alleged
+intellectual property infringement. In order to qualify, an Indemnified
+Contributor must: a) promptly notify the Commercial Contributor in
+writing of such claim, and b) allow the Commercial Contributor to control,
+and cooperate with the Commercial Contributor in, the defense and any
+related settlement negotiations. The Indemnified Contributor may
+participate in any such claim at its own expense.
+
+For example, a Contributor might include the Program in a commercial
+product offering, Product X. That Contributor is then a Commercial
+Contributor. If that Commercial Contributor then makes performance
+claims, or offers warranties related to Product X, those performance
+claims and warranties are such Commercial Contributor's responsibility
+alone. Under this section, the Commercial Contributor would have to
+defend claims against the other Contributors related to those performance
+claims and warranties, and if a court requires any other Contributor to
+pay any damages as a result, the Commercial Contributor must pay
+those damages.
+
+5. NO WARRANTY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
+PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS"
+BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR
+IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF
+TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
+PURPOSE. Each Recipient is solely responsible for determining the
+appropriateness of using and distributing the Program and assumes all
+risks associated with its exercise of rights under this Agreement,
+including but not limited to the risks and costs of program errors,
+compliance with applicable laws, damage to or loss of data, programs
+or equipment, and unavailability or interruption of operations.
+
+6. DISCLAIMER OF LIABILITY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
+PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS
+SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST
+PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
+EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGES.
+
+7. GENERAL
+
+If any provision of this Agreement is invalid or unenforceable under
+applicable law, it shall not affect the validity or enforceability of
+the remainder of the terms of this Agreement, and without further
+action by the parties hereto, such provision shall be reformed to the
+minimum extent necessary to make such provision valid and enforceable.
+
+If Recipient institutes patent litigation against any entity
+(including a cross-claim or counterclaim in a lawsuit) alleging that the
+Program itself (excluding combinations of the Program with other software
+or hardware) infringes such Recipient's patent(s), then such Recipient's
+rights granted under Section 2(b) shall terminate as of the date such
+litigation is filed.
+
+All Recipient's rights under this Agreement shall terminate if it
+fails to comply with any of the material terms or conditions of this
+Agreement and does not cure such failure in a reasonable period of
+time after becoming aware of such noncompliance. If all Recipient's
+rights under this Agreement terminate, Recipient agrees to cease use
+and distribution of the Program as soon as reasonably practicable.
+However, Recipient's obligations under this Agreement and any licenses
+granted by Recipient relating to the Program shall continue and survive.
+
+Everyone is permitted to copy and distribute copies of this Agreement,
+but in order to avoid inconsistency the Agreement is copyrighted and
+may only be modified in the following manner. The Agreement Steward
+reserves the right to publish new versions (including revisions) of
+this Agreement from time to time. No one other than the Agreement
+Steward has the right to modify this Agreement. The Eclipse Foundation
+is the initial Agreement Steward. The Eclipse Foundation may assign the
+responsibility to serve as the Agreement Steward to a suitable separate
+entity. Each new version of the Agreement will be given a distinguishing
+version number. The Program (including Contributions) may always be
+Distributed subject to the version of the Agreement under which it was
+received. In addition, after a new version of the Agreement is published,
+Contributor may elect to Distribute the Program (including its
+Contributions) under the new version.
+
+Except as expressly stated in Sections 2(a) and 2(b) above, Recipient
+receives no rights or licenses to the intellectual property of any
+Contributor under this Agreement, whether expressly, by implication,
+estoppel or otherwise. All rights in the Program not expressly granted
+under this Agreement are reserved. Nothing in this Agreement is intended
+to be enforceable by any entity that is not a Contributor or Recipient.
+No third-party beneficiary rights are created under this Agreement.
+
+Exhibit A - Form of Secondary Licenses Notice
+
+"This Source Code may also be made available under the following
+Secondary Licenses when the conditions for such availability set forth
+in the Eclipse Public License, v. 2.0 are satisfied: {name license(s),
+version(s), and exceptions or additional permissions here}."
+
+ Simply including a copy of this Agreement, including this Exhibit A
+ is not sufficient to license the Source Code under Secondary Licenses.
+
+ If it is not possible or desirable to put the notice in a particular
+ file, then You may include the notice in a location (such as a LICENSE
+ file in a relevant directory) where a recipient would be likely to
+ look for such a notice.
+
+ You may add additional accurate notices of copyright ownership.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b64ed12
--- /dev/null
+++ b/README.md
@@ -0,0 +1,769 @@
+# yoltq
+
+An opinionated Datomic queue for building (more) reliable systems.
+Implements the
+[transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html)
+pattern.
+Supports retries, backoff, ordering and more.
+On-prem only.
+
+## Installation
+
+[![Clojars Project](https://img.shields.io/clojars/v/com.github.ivarref/yoltq.svg)](https://clojars.org/com.github.ivarref/yoltq)
+
+## 1-minute example
+
+```clojure
+(require '[com.github.ivarref.yoltq :as yq])
+
+(def conn (datomic.api/connect "..."))
+
+; Initialize system
+(yq/init! {:conn conn})
+
+; Add a queue consumer that will intentionally fail on the first attempt
+(yq/add-consumer! :q
+ (let [cnt (atom 0)]
+ (fn [payload]
+ (when (= 1 (swap! cnt inc))
+ ; A consumer throwing an exception is considered a queue job failure
+ (throw (ex-info "failed" {})))
+ ; Anything else than a throwing exception is considered a queue job success
+ ; This includes nil, false and everything else.
+ (log/info "got payload" payload))))
+
+; Start threadpool that picks up queue jobs
+(yq/start!)
+
+; Queue a job
+@(d/transact conn [(yq/put :q {:work 123})])
+
+; On your console you will see something like this:
+; 17:29:54.598 DEBUG queue item 613... for queue :q is pending status :init
+; 17:29:54.602 DEBUG queue item 613... for queue :q now has status :processing
+; 17:29:54.603 DEBUG queue item 613... for queue :q is now processing
+; 17:29:54.605 WARN queue-item 613... for queue :q now has status :error after 1 try in 4.8 ms
+; 17:29:54.607 WARN error message was: "failed" for queue-item 613...
+; 17:29:54.615 WARN ex-data was: {} for queue-item 613...
+; The item is so far failed...
+
+; But after approximately 10 seconds have elapsed, the item will be retried:
+; 17:30:05.596 DEBUG queue item 613... for queue :q now has status :processing
+; 17:30:05.597 DEBUG queue item 613... for queue :q is now processing
+; 17:30:05.597 INFO got payload {:work 123}
+; 17:30:05.599 INFO queue-item 613... for queue :q now has status :done after 2 tries in 5999.3 ms
+; And then it has succeeded.
+```
+
+## Rationale
+
+Integrating with external systems that may be unavailable can be tricky.
+Imagine the following code:
+
+```clojure
+(defn post-handler [user-input]
+ (let [db-item (process user-input)
+ ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds
+ :socket-timeout 10000 ; milliseconds
+ ...})] ; may throw exception
+ @(d/transact conn [(assoc db-item :some/ext-ref ext-ref)])))
+```
+
+What if the POST request fails? Should it be retried? For how long?
+Should it be allowed to fail? How do you then process failures later?
+
+PS: If you do not set connection/socket-timeout, there is a chance that
+clj-http/client will wait for all eternity in the case of a dropped TCP connection.
+
+The queue way to solve this would be:
+
+```clojure
+(defn get-ext-ref [{:keys [id]}]
+ (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds
+ :socket-timeout 10000 ; milliseconds
+ ...})] ; may throw exception
+ @(d/transact conn [[:db/cas [:some/id id]
+ :some/ext-ref
+ nil
+ ext-ref]])))
+
+(yq/add-consumer! :get-ext-ref get-ext-ref {:allow-cas-failure? true})
+
+(defn post-handler [user-input]
+ (let [{:some/keys [id] :as db-item} (process user-input)]
+ @(d/transact conn [db-item
+ (yq/put :get-ext-ref {:id id})])))
+```
+
+Here `post-handler` will always succeed as long as the transaction commits.
+
+`get-ext-ref` may fail multiple times if `ext-service` is down.
+This is fine as long as it eventually succeeds.
+
+There is a special case where `get-ext-ref` succeeds, but
+saving the new queue job status to the database fails.
+Thus `get-ext-ref` and any queue consumer should tolerate to
+be executed successfully several times.
+
+For `get-ext-ref` this is solved by using
+the database function
+[:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas)
+to achieve a write-once behaviour.
+The yoltq system treats cas failures as job successes
+when a consumer has `:allow-cas-failure?` set to `true` in its options.
+
+## How it works
+
+### Queue jobs
+
+Creating queue jobs is done by `@(d/transact conn [...other data... (yq/put :q {:work 123})])`.
+Inspecting `(yq/put :q {:work 123})]` you will see something like this:
+
+```clojure
+#:com.github.ivarref.yoltq{:id #uuid"614232a8-e031-45bb-8660-be146eaa32a2", ; Queue job id
+ :queue-name :q, ; Destination queue
+ :status :init, ; Status
+ :payload "{:work 123}", ; Payload persisted to the database with pr-str
+ :bindings "{}", ; Bindings that will be applied before executing consumer function
+ :lock #uuid"037d7da1-5158-4243-8f72-feb1e47e15ca", ; Lock to protect from multiple consumers
+ :tries 0, ; How many times the job has been executed
+ :init-time 4305758012289 ; Time of initialization (System/nanoTime)
+ }
+```
+
+This is the queue job as it will be stored into the database.
+You can see that the payload, i.e. the second argument of `yq/put`,
+is persisted into the database. Thus the payload must be `pr-str`-able (unless you have specified
+custom `:encode` and `:decode` functions that override this).
+
+
+A queue job will initially have status `:init`.
+It will then transition to the following statuses:
+
+* `:processing`: When the queue job begins processing in the queue consumer function.
+* `:done`: If the queue consumer function returns normally.
+* `:error`: If the queue consumer function throws an exception.
+
+### Queue consumers
+
+Queue jobs will be consumed by queue consumers. A consumer is a function taking a single argument,
+the payload. It can be added like this:
+
+```clojure
+(yq/add-consumer!
+ :q ; Queue to consume
+ (fn [payload] (println "got payload:" payload)) ; Queue consumer function
+ ; An optional map of queue opts
+ {:allow-cas-failure? true ; Treat [:db.cas ...] failures as success. This is one way for the
+ ; consumer function to ensure idempotence.
+ :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload.
+ ; Should return truthy for valid payloads.
+ ; The default function always returns true.
+ :max-retries 10}) ; Specify maximum number of times an item will be retried. Default: 10000.
+ ; If :max-retries is given as 0, the job will ~always be retried, i.e.
+ ; 9223372036854775807 times (Long/MAX_VALUE).
+```
+
+The `payload` will be deserialized from the database using `clojure.edn/read-string` before
+invocation, i.e. you will get back what you put into `yq/put`.
+
+The yoltq system treats a queue consumer function invocation as successful if it does not throw
+an exception. Any return value, be it `nil`, `false`, `true`, etc. is considered a success.
+
+### Listening for queue jobs
+
+When `(yq/start!)` is invoked, a threadpool is started.
+
+One thread is permanently allocated for listening to the
+[tx-report-queue](https://docs.datomic.com/on-prem/clojure/index.html#datomic.api/tx-report-queue)
+and responding to changes. This means that yoltq will respond
+and process newly created queue jobs fairly quickly.
+This also means that queue jobs in status `:init` will almost always be processed without
+any type of backoff.
+
+The threadpool also schedules polling jobs that will check for various statuses regularly:
+
+* Jobs in status `:error` that have waited for at least `:error-backoff-time` (default: 5 seconds) will be retried.
+* Jobs that have been in `:processing` for at least `:hung-backoff-time` (default: 30 minutes) will be considered hung and retried.
+* Old `:init-backoff-time` (default: 1 minute) `:init` jobs that have not been processed. Queue jobs can be left in status `:init` during application restart/upgrade, and thus the need for this strategy.
+
+
+### Retry and backoff strategy
+
+Yoltq assumes that if a queue consumer throws an exception for one item, it
+will also do the same for another item in the immediate future,
+assuming the remote system that the queue consumer represents is still down.
+Thus if there are ten failures for queue `:q`, it does not make sense to
+retry all of them at once.
+
+The retry polling job that runs regularly (`:poll-delay`, default: every 10 seconds)
+thus stops at the first failure.
+Each queue have their own polling job, so if one queue is down, it will *not* stop
+other queues from retrying.
+
+The retry polling job will continue to eagerly process queue jobs as long as it
+encounters only successes.
+
+While the `:error-backoff-time` of default 5 seconds may seem short, in practice
+if there is a lot of failed items and the external system is still down,
+the actual backoff time will be longer.
+
+
+### Stuck threads and stale jobs
+
+A single thread is dedicated to monitoring how much time a queue consumer
+spends on a single job. If this exceeds `:max-execute-time` (default: 5 minutes)
+the stack trace of the offending consumer will be logged as `:ERROR`.
+
+If a job is found stale, that is if the database spent time exceeds
+`:hung-backoff-time` (default: 30 minutes),
+the job will either be retried or marked as `:error`. This case may happen if the application
+is shut down abruptly during processing of queue jobs.
+
+
+### Giving up
+
+A queue job will remain in status `:error` once `:max-retries` (default: 10000) have been reached.
+If `:max-retries` is given as `0`, the job will be retried 9223372036854775807 times before
+giving up.
+Ideally this should not happen. ¯\\\_(ツ)\_/¯
+
+### Custom encoding and decoding
+
+Yoltq will use `pr-str` and `clojure.edn/read-string` by default to encode and decode data.
+You may specify `:encode` and `:decode` either globally or per queue to override this behaviour.
+The `:encode` function must return a byte array or a string.
+
+For example if you want to use [nippy](https://github.com/ptaoussanis/nippy):
+```clojure
+(require '[taoensso.nippy :as nippy])
+
+; Globally for all queues:
+(yq/init!
+ {:conn conn
+ :encode nippy/freeze
+ :decode nippy/thaw})
+
+; Or per queue:
+(yq/add-consumer!
+ :q ; Queue to consume
+ (fn [payload] (println "got payload:" payload)) ; Queue consumer function
+ {:encode nippy/freeze
+ :decode nippy/thaw}) ; Queue options, here with :encode and :decode
+```
+
+### Partitions
+
+Yoltq supports specifying which
+[partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions)
+queue entities should belong to.
+The default function is:
+```clojure
+(defn default-partition-fn [_queue-name]
+ (keyword "yoltq" (str "queue_" (.getValue (java.time.Year/now)))))
+```
+This is to say that there will be a single partition per year for yoltq.
+Yoltq will take care of creating the partition if it does not exist.
+
+You may override this function, either globally or per queue, with the keyword `:partition-fn`.
+E.g.:
+```clojure
+(yq/init! {:conn conn :partition-fn (fn [_queue-name] :my-partition)})
+```
+
+### All configuration options
+
+For an exhaustive list of all configuration options,
+see
+[yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21).
+
+# Groups of Jobs
+
+Yoltq supports grouping jobs in a queue, and tracking the progress of such a
+group of jobs. Consider this example: your system is used by the marketing
+department to send emails to groups of users. Multiple colleagues in the
+marketing department could potentially do this at the same time, but they want
+to see the progress of their _own_ campagne, not that of _all_ emails being
+sent. When adding the jobs to the queue, you can specify the `job-group`
+parameter, in this case indicate which marketeer is running the jobs:
+
+```clojure
+(doseq [uid user-ids]
+ @(d/transact conn [(yq/put :send-mail
+ ; Payload:
+ {:user-id uid :from ... :to ... :body ...}
+ ; Job options:
+ {:job-group :mail-campagne/for-marketeer-42})]))
+```
+
+When you want to know the progress of that specific job group, and display it in
+your user interface, you can use `job-group-progress`, which returns a structure
+similar to `queue-stats`:
+
+```clojure
+(yq/job-group-progress :send-mail :mail-campagne/for-marketeer-42)
+;; => [{:qname :send-mail
+;; :job-group :mail-campagne/for-marketeer-42
+;; :status :init
+;; :count 78}
+;; {:qname :send-mail
+;; :job-group :mail-campagne/for-marketeer-42
+;; :status :done
+;; :count 24}]
+```
+
+# Regular and REPL usage
+
+For a regular system and/or REPL session you'll want to do:
+
+```clojure
+(require '[com.github.ivarref.yoltq :as yq])
+
+(yq/init! {:conn conn})
+
+(yq/add-consumer! :q-one ...)
+(yq/add-consumer! :q-two ...)
+
+; Start yoltq system
+(yq/start!)
+
+; Oops I need another consumer. This works fine:
+(yq/add-consumer! :q-three ...)
+
+; When the application is shutting down:
+(yq/stop!)
+```
+
+You may invoke `yq/add-consumer!` and `yq/init!` on a live system as you like.
+
+If you change `:pool-size` or `:poll-delay` you will have to `(yq/stop!)` and
+`(yq/start!)` to make changes take effect.
+
+## Queue job dependencies and ordering
+
+It is possible to specify that one queue job must wait for another queue
+job to complete before it will be executed:
+
+```clojure
+@(d/transact conn [(yq/put :a
+ ; Payload:
+ {:id "a1"}
+ ; Job options:
+ {:id "a1"})])
+
+@(d/transact conn [(yq/put :b
+ ; Payload:
+ {:id "b1" :a-ref "a1"}
+ ; Jobs options:
+ {:depends-on [:a "a1"]})])
+
+; depends-on may also be specified as a function of the payload when
+; adding the consumer:
+(yq/add-consumer! :b
+ (fn [payload] ...)
+ {:depends-on (fn [payload]
+ [:a (:a-ref payload)])})
+```
+
+Here queue job `b1` will not execute before `a1` is `:done`.
+
+Note that queue-name plus `:id` in job options must be an unique value.
+In the example above that means `:a` plus `a1` must be unique.
+
+When specifying `:depends-on`, the referred job must at least exist in the database,
+otherwise `yq/put` will throw an exception.
+
+Other than this there is no attempt at ordering the execution of queue jobs.
+In fact the opposite is done in the poller to guard against the case that a single failing queue job
+could effectively take down the entire retry polling job.
+
+## Retrying jobs in the REPL
+
+```clojure
+(require '[com.github.ivarref.yoltq :as yq])
+
+; List jobs that are in state error:
+(yq/get-errors :q)
+
+; This will retry a single job that is in error, regardless
+; of how many times it has been retried earlier.
+; If the job fails, you will get the full stacktrace on the REPL.
+(yq/retry-one-error! :q)
+; Returns a map containing the new state of the job.
+; Returns nil if there are no (more) jobs in state error for this queue.
+```
+
+# Testing
+
+For testing you will probably want determinism over an extra threadpool
+by using the test queue:
+
+```clojure
+...
+(:require [clojure.test :refer :all]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.test-queue :as tq])
+
+; Enables the test queue and disables the threadpool for each test.
+; yq/start! and yq/stop! becomes a no-op.
+(use-fixtures :each tq/call-with-virtual-queue!)
+
+(deftest demo
+ (let [conn ...]
+ (yq/init! {:conn conn}) ; Setup
+ (yq/add-consumer! :q identity)
+
+ @(d/transact conn [(yq/put :q {:work 123})]) ; Add work
+
+ ; tq/consume! consumes one job and asserts that it succeeds.
+ ; It returns the return value of the consumer function
+ (is (= {:work 123} (tq/consume! :q)))
+
+ ; If you want to test the idempotence of your function,
+ ; you may force retry a consumer function:
+ ; This may for example be useful to verify that the
+ ; :db.cas logic is correct.
+ (is (= {:work 123} (tq/force-retry! :q)))))
+```
+
+## Logging and capturing bindings
+
+Yoltq can capture and restore dynamic bindings.
+It will capture during `yq/put` and restore them when the consumer function
+is invoked. This is specified in the `:capture-bindings` setting.
+It defaults to `['#taoensso.timbre/*context*]`,
+i.e. the [timbre](https://github.com/ptaoussanis/timbre) log context,
+if available, otherwise an empty vector.
+
+These dynamic bindings will be in place when yoltq logs errors, warnings
+etc. about failing consumer functions, possibly making troubleshooting
+easier.
+
+## Limitations
+
+Datomic does not have anything like `for update skip locked`.
+Thus consuming a queue should be limited to a single JVM process.
+This library will take queue jobs by compare-and-swapping a lock+state,
+process the item and then compare-and-swapping the lock+new-state.
+It does so eagerly, thus if you have multiple JVM consumers you will
+most likely get many locking conflicts. It should work, but it's far
+from optimal.
+
+## Alternatives
+
+I did not find any alternatives for Datomic.
+
+If I were using PostgreSQL or any other database that supports
+`for update skip locked`, I'd use a queue that uses this.
+For Clojure there is [proletarian](https://github.com/msolli/proletarian).
+
+For Redis there is [carmine](https://github.com/ptaoussanis/carmine).
+
+Note: I have not tried these libraries myself.
+
+## Other stuff
+
+If you liked this library, you may also like:
+
+* [conformity](https://github.com/avescodes/conformity):
+ A Clojure/Datomic library for idempotently transacting norms into your database – be they schema,
+ data, or otherwise.
+* [datomic-schema](https://github.com/ivarref/datomic-schema):
+ Simplified writing of Datomic schemas (works with conformity).
+* [double-trouble](https://github.com/ivarref/double-trouble):
+ Handle duplicate Datomic transactions with ease.
+* [gen-fn](https://github.com/ivarref/gen-fn):
+ Generate Datomic function literals from regular Clojure namespaces.
+* [rewriting-history](https://github.com/ivarref/rewriting-history):
+ A library to rewrite Datomic history.
+
+## Change log
+
+#### [Unreleased]
+
+#### [0.2.94] - 2025-09-22
+
+Added support for [groups of jobs](#groups-of-jobs).
+Thanks [Stefan van den Oord](https://github.com/svdo)!
+
+#### [0.2.85] - 2025-07-29
+
+Same as v0.2.82, but without the `v` prefix.
+
+#### [v0.2.82] - 2025-06-18
+
+Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will
+then not grab the datomic report queue, but use the one provided:
+
+```clojure
+(require '[com.github.ivarref.yoltq :as yq])
+(yq/init! {:conn conn
+ :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq)
+ ; ^^ can be any `java.util.concurrent.BlockingQueue` value
+ })
+
+(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id))
+
+```
+
+Added multicast support for `datomic.api/tx-report-queue`:
+```clojure
+(require '[com.github.ivarref.yoltq :as yq])
+(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1))
+; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue`
+
+(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2))
+; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue`
+; for the given `conn`
+
+(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true))
+; my-q3 sets the optional third argument, `send-end-token?`, to true.
+; The queue will then receive `:end` if the queue is stopped.
+; This can enable simpler consuming of queues:
+(future
+ (loop []
+ (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)]
+ (if (= q-item :end)
+ (println "Time to exit. Goodbye!")
+ (do
+ (println "Processing q-item" q-item)
+ (recur))))))
+
+; The default value for `send-end-token?` is `false`, i.e. the behaviour will be
+; identical to that of datomic.api/tx-report-queue.
+
+@(d/transact conn [{:db/doc "new-data"}])
+
+; Stop the queue:
+(yq/stop-multicast-consumer-id! conn :q-id-3)
+=> true
+; The multicaster thread will send `:end`.
+; The consumer thread will then print "Time to exit. Goodbye!".
+
+; if the queue is already stopped (or never was started), the `stop-multicaster...`
+; functions will return false:
+(yq/stop-multicast-consumer-id! conn :already-stopped-queue-or-typo)
+=> false
+
+; Stop all queues for all connections:
+(yq/stop-all-multicasters!)
+```
+
+`yq/get-tx-report-queue-multicast!` returns, like
+`datomic.api/tx-report-queue`,
+`java.util.concurrent.BlockingQueue` and starts a background thread that does
+the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!`
+returns the same `BlockingQueue`.
+
+Changed the default for `max-retries` from `10000` to `9223372036854775807`.
+
+Fixed reflection warnings.
+
+#### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64)
+
+Added support for `max-retries` being `0`, meaning the job should be retried forever
+(or at least 9223372036854775807 times).
+
+Changed the default for `max-retries` from `100` to `10000`.
+
+#### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63)
+Added custom `:encode` and `:decode` support.
+
+Added support for specifying `:partifion-fn` to specify
+which partition a queue item should belong to.
+It defaults to:
+```clojure
+(defn default-partition-fn [_queue-name]
+ (keyword "yoltq" (str "queue_" (.getValue (Year/now)))))
+```
+Yoltq takes care of creating the partition if it does not exist.
+
+#### 2022-11-15 v0.2.62 [diff](https://github.com/ivarref/yoltq/compare/v0.2.61...v0.2.62)
+Added function `processing-time-stats`:
+
+```clojure
+(ns com.github.ivarref.yoltq)
+
+(defn processing-time-stats
+ "Gather processing time statistics.
+
+ Optional keyword arguments:
+ * :age-days — last number of days to look at data from. Defaults to 30.
+ Use nil to have no limit.
+
+ * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues.
+
+ * :duration->long - Specify what unit should be used for values.
+ Must take a java.time.Duration as input and return a long.
+
+ Defaults to (fn [duration] (.toSeconds duration).
+ I.e. the default unit is seconds.
+
+ Example return value:
+ {:queue-a {:avg 1
+ :max 10
+ :min 0
+ :p50 ...
+ :p90 ...
+ :p95 ...
+ :p99 ...}}"
+ [{:keys [age-days queue-name now db duration->long]
+ :or {age-days 30
+ now (ZonedDateTime/now ZoneOffset/UTC)
+ duration->long (fn [duration] (.toSeconds duration))}}]
+ ...)
+```
+
+#### 2022-09-07 v0.2.61 [diff](https://github.com/ivarref/yoltq/compare/v0.2.60...v0.2.61)
+Added function `retry-stats`:
+
+```clojure
+(ns com.github.ivarref.yoltq)
+
+(defn retry-stats
+ "Gather retry statistics.
+
+ Optional keyword arguments:
+ * :age-days — last number of days to look at data from. Defaults to 30.
+ * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues.
+
+ Example return value:
+ {:queue-a {:ok 100, :retries 2, :retry-percentage 2.0}
+ :queue-b {:ok 100, :retries 75, :retry-percentage 75.0}}
+
+ From the example value above, we can see that :queue-b fails at a much higher rate than :queue-a.
+ Assuming that the queue consumers are correctly implemented, this means that the service representing :queue-b
+ is much more unstable than the one representing :queue-a. This again implies
+ that you will probably want to fix the downstream service of :queue-b, if that is possible.
+ "
+ [{:keys [age-days queue-name now]
+ :or {age-days 30
+ now (ZonedDateTime/now ZoneOffset/UTC)}}]
+ ...)
+```
+
+#### 2022-08-18 v0.2.60 [diff](https://github.com/ivarref/yoltq/compare/v0.2.59...v0.2.60)
+Improved: Added config option `:healthy-allowed-error-time`:
+```
+ ; If you are dealing with a flaky downstream service, you may not want
+ ; yoltq to mark itself as unhealthy on the first failure encounter with
+ ; the downstream service. Change this setting to let yoltq mark itself
+ ; as healthy even though a queue item has been failing for some time.
+ :healthy-allowed-error-time (Duration/ofMinutes 15)
+```
+
+#### 2022-08-15 v0.2.59 [diff](https://github.com/ivarref/yoltq/compare/v0.2.58...v0.2.59)
+Fixed:
+* Race condition that made the following possible: `stop!` would terminate the slow thread
+watcher, and a stuck thread could keep `stop!` from completing!
+
+#### 2022-06-30 v0.2.58 [diff](https://github.com/ivarref/yoltq/compare/v0.2.57...v0.2.58)
+Slightly more safe EDN printing and parsing.
+Recommended reading:
+[Pitfalls and bumps in Clojure's Extensible Data Notation (EDN)](https://nitor.com/en/articles/pitfalls-and-bumps-clojures-extensible-data-notation-edn)
+
+#### 2022-06-29 v0.2.57 [diff](https://github.com/ivarref/yoltq/compare/v0.2.56...v0.2.57)
+Added `(get-errors qname)` and `(retry-one-error! qname)`.
+
+Improved:
+`unhealthy?` will return `false` for the first 10 minutes of the application lifetime.
+This was done in order to push new code while a queue was in error in an earlier
+version of the code. In this way rolling upgrades are possible regardless if there
+are queue errors.
+Can you tell that this issue hit me? ¯\\\_(ツ)\_/¯
+
+#### 2022-06-22 v0.2.56 [diff](https://github.com/ivarref/yoltq/compare/v0.2.55...v0.2.56)
+Added support for `:yoltq/queue-id` metadata on functions. I.e. it's possible to write
+the following:
+```clojure
+(defn my-consumer
+ {:yoltq/queue-id :some-queue}
+ [payload]
+ :work-work-work)
+
+(yq/add-consumer! #'my-consumer ; <-- will resolve to :some-queue
+ my-consumer)
+
+@(d/transact conn [(yq/put #'my-consumer ; <-- will resolve to :some-queue
+ {:id "a"})])
+```
+
+The idea here is that it is simpler to jump to var definitions than going via keywords,
+which essentially refers to a var/function anyway.
+
+#### 2022-03-29 v0.2.55 [diff](https://github.com/ivarref/yoltq/compare/v0.2.54...v0.2.55)
+Added: `unhealthy?` function which returns `true` if there are queues in error,
+or `false` otherwise.
+
+#### 2022-03-28 v0.2.54 [diff](https://github.com/ivarref/yoltq/compare/v0.2.51...v0.2.54)
+Fixed: Schedules should now be using milliseconds and not nanoseconds.
+
+#### 2022-03-28 v0.2.51 [diff](https://github.com/ivarref/yoltq/compare/v0.2.48...v0.2.51)
+* Don't OOM on migrating large amounts of data.
+* Respect `:auto-migrate? false`.
+
+#### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48)
+* Auto migration is done in the background.
+* Only poll for current version of jobs, thus no races for auto migration.
+
+#### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46)
+* Critical bugfix that in some cases can lead to stalled jobs.
+```
+Started using (System/currentTimeMillis) and not (System/nanoTime)
+when storing time in the database.
+```
+
+* Bump Clojure to `1.11.0`.
+
+#### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41)
+* Added function `healthy?` that returns:
+```
+ true if no errors
+ false if one or more errors
+ nil if error-poller is yet to be executed.
+```
+
+* Added default functions for `:on-system-error` and `:on-system-recovery`
+ that simply logs that the system is in error (ERROR level) or has
+ recovered (INFO level).
+
+* Added function `queue-stats` that returns a nicely "formatted"
+ vector of queue stats, for example:
+```
+ (queue-stats)
+ =>
+ [{:qname :add-message-thread, :status :done, :count 10274}
+ {:qname :add-message-thread, :status :init, :count 30}
+ {:qname :add-message-thread, :status :processing, :count 1}
+ {:qname :send-message, :status :done, :count 21106}
+ {:qname :send-message, :status :init, :count 56}]
+```
+
+#### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39)
+Added `:valid-payload?` option for queue consumers.
+
+#### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37)
+Improved error reporting.
+
+#### 2021-09-24 v0.2.33
+
+First publicly announced release.
+
+## Making a new release
+
+Go to https://github.com/ivarref/yoltq/actions/workflows/release.yml and press `Run workflow`.
+
+## License
+
+Copyright © 2021-2022 Ivar Refsdal
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License 2.0 which is available at
+http://www.eclipse.org/legal/epl-2.0.
+
+This Source Code may also be made available under the following Secondary
+Licenses when the conditions for such availability set forth in the Eclipse
+Public License, v. 2.0 are satisfied: GNU General Public License as published by
+the Free Software Foundation, either version 2 of the License, or (at your
+option) any later version, with the GNU Classpath Exception which is available
+at https://www.gnu.org/software/classpath/license.html.
diff --git a/build.edn b/build.edn
new file mode 100644
index 0000000..3e1f016
--- /dev/null
+++ b/build.edn
@@ -0,0 +1,34 @@
+{:lib
+ com.github.ivarref/yoltq
+
+ :version
+ "0.2.{{git/commit-count}}"
+
+ :github-actions?
+ true
+
+ :scm
+ {:connection "scm:git:git://github.com/ivarref/yoltq.git"
+ :developerConnection "scm:git:ssh://git@github.com/ivarref/yoltq.git"
+ :url "https://github.com/ivarref/yoltq"}
+
+ :documents
+ [{:file "README.md"
+ :match-exactly "#### [Unreleased]"
+ :action :append-after
+ :text "\n#### [{{version}}] - {{now/yyyy}}-{{now/mm}}-{{now/dd}}"}
+ {:file "README.md"
+ :match-exactly "com.github.ivarref/yoltq {:git/tag"
+ :action :replace
+ :keep-indent? true
+ :text
+ "com.github.ivarref/yoltq {:git/tag \"{{version}}\" :git/sha \"{{git/head-long-sha}}\"}"}
+ {:file "README.md"
+ :match-exactly "com.github.ivarref/yoltq {:mvn/version"
+ :action :replace
+ :keep-indent? true
+ :text "com.github.ivarref/yoltq {:mvn/version \"{{version}}\"}"}]
+
+ :licenses
+ [{:name "Eclipse Public License - v 2.0"
+ :url "https://www.eclipse.org/legal/epl-2.0/"}]} \ No newline at end of file
diff --git a/deps.edn b/deps.edn
new file mode 100644
index 0000000..f0488fa
--- /dev/null
+++ b/deps.edn
@@ -0,0 +1,40 @@
+{:deps
+ {com.github.ivarref/double-trouble {:mvn/version "0.1.102"}
+ org.clojure/tools.logging {:mvn/version "1.2.4"}
+ org.clojure/clojure {:mvn/version "1.11.1"}
+ com.datomic/peer {:mvn/version "1.0.7364"}}
+
+ :paths
+ ["src"]
+
+ :aliases
+ {:test
+ {:extra-paths ["test"]
+ :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"}
+ com.taoensso/timbre {:mvn/version "5.2.1"}
+ com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
+ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
+ org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
+ com.taoensso/nippy {:mvn/version "3.2.0"}
+ io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}}
+ :exec-fn cognitect.test-runner.api/test
+ :jvm-opts ["-DDISABLE_SPY=true"
+ "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"]
+ :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]}
+
+ :repl
+ {:extra-paths ["test"]
+ :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"}
+ ivarref/datomic-schema {:mvn/version "0.2.0"}
+ com.taoensso/timbre {:mvn/version "5.2.1"}
+ com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
+ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
+ org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
+ com.taoensso/nippy {:mvn/version "3.2.0"}}
+ :exec-fn rebel-readline.tool/repl
+ :exec-args {}
+ :main-opts ["-m" "rebel-readline.main"]}
+
+ :build
+ {:deps {com.github.liquidz/build.edn {:mvn/version "0.11.241"}}
+ :ns-default build-edn.main}}}
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
new file mode 100644
index 0000000..8c8ca7a
--- /dev/null
+++ b/src/com/github/ivarref/yoltq.clj
@@ -0,0 +1,539 @@
+(ns com.github.ivarref.yoltq
+ (:require
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.error-poller :as errpoller]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq.migrate :as migrate]
+ [com.github.ivarref.yoltq.poller :as poller]
+ [com.github.ivarref.yoltq.report-queue :as rq]
+ [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d])
+ (:import (datomic Connection)
+ (java.lang.management ManagementFactory)
+ (java.time Duration Instant ZoneOffset ZonedDateTime)
+ (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit)))
+
+(defonce ^:dynamic *config* (atom nil))
+(defonce threadpool (atom nil))
+(defonce ^:dynamic *running?* (atom false))
+(defonce ^:dynamic *test-mode* false)
+
+(def default-opts
+ (-> {; Default number of times a queue job will be retried before giving up
+ ; Can be overridden on a per-consumer basis with
+ ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200})
+ ; If you want no limit on the number of retries, specify
+ ; the value `0`. That will set the effective retry limit to
+ ; 9223372036854775807 times.
+ :max-retries 9223372036854775807
+
+ ; Minimum amount of time to wait before a failed queue job is retried
+ :error-backoff-time (Duration/ofSeconds 5)
+
+ ; Max time a queue job can execute before an error is logged
+ :max-execute-time (Duration/ofMinutes 5)
+
+ ; Amount of time an in progress queue job can run before it is considered failed
+ ; and will be marked as such.
+ :hung-backoff-time (Duration/ofMinutes 30)
+
+ ; Most queue jobs in init state will be consumed by the tx-report-queue listener.
+ ; However, in the case where an init job was added right before the application
+ ; was shut down and did not have time to be processed by the tx-report-queue listener,
+ ; it will be consumer by the init poller. This init poller backs off by
+ ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could
+ ; otherwise occur if competing with the tx-report-queue listener.
+ :init-backoff-time (Duration/ofSeconds 60)
+
+ ; If you are dealing with a flaky downstream service, you may not want
+ ; yoltq to mark itself as unhealthy on the first failure encounter with
+ ; the downstream service. Change this setting to let yoltq mark itself
+ ; as healthy even though a queue item has been failing for some time.
+ :healthy-allowed-error-time (Duration/ofMinutes 15)
+
+ ; How frequent polling for init, error and hung jobs should be done.
+ :poll-delay (Duration/ofSeconds 10)
+
+ ; Specifies the number of threads available for executing queue and polling jobs.
+ ; The final thread pool will be this size + 2.
+ ;
+ ; One thread is permanently allocated for listening to the
+ ; tx-report-queue.
+ ;
+ ; Another thread is permanently allocated for checking :max-execute-time.
+ ; This means that if all executing queue jobs are stuck and the thread pool is unavailable
+ ; as such, at least an error will be logged about this. The log entry will
+ ; contain the stacktrace of the stuck threads.
+ :pool-size 4
+
+ :capture-bindings (if-let [s (resolve (symbol "taoensso.timbre/*context*"))]
+ [s]
+ [])
+
+ ; How often should the system be polled for failed queue jobs
+ :system-error-poll-delay (Duration/ofMinutes 1)
+
+ ; How often should the system invoke
+ :system-error-callback-backoff (Duration/ofHours 1)
+
+ ; Should old, possibly stalled jobs be automatically be migrated
+ ; as part of `start!`?
+ :auto-migrate? true}
+
+ u/duration->millis))
+
+
+(defn init! [{:keys [conn tx-report-queue] :as cfg}]
+ (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil")))
+ (when (some? tx-report-queue)
+ (assert (instance? BlockingQueue tx-report-queue)
+ (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue")))
+ (locking threadpool
+ @(d/transact conn i/schema)
+ (let [new-cfg (swap! *config*
+ (fn [old-conf]
+ (-> (merge-with (fn [_ b] b)
+ {:running-queues (atom #{})
+ :start-execute-time (atom {})
+ :system-error (atom {})
+ :healthy? (atom nil)
+ :slow? (atom nil)
+ :slow-thread-watcher-done? (promise)}
+ default-opts
+ (if *test-mode* old-conf (select-keys old-conf [:handlers]))
+ cfg)
+ u/duration->millis)))]
+ new-cfg)))
+
+
+(defn get-queue-id
+ [queue-id-or-var]
+ (cond (and (var? queue-id-or-var)
+ (keyword? (:yoltq/queue-id (meta queue-id-or-var))))
+ (:yoltq/queue-id (meta queue-id-or-var))
+
+ (keyword? queue-id-or-var)
+ queue-id-or-var
+
+ :else
+ (throw (ex-info (str "Could not get queue-id for " queue-id-or-var) {:queue-id queue-id-or-var}))))
+
+(defn add-consumer!
+ ([queue-id f]
+ (add-consumer! queue-id f {}))
+ ([queue-id f opts]
+ (swap! *config* (fn [old-config] (assoc-in old-config [:handlers (get-queue-id queue-id)] (merge opts {:f f}))))))
+
+
+(defn put
+ ([queue-id payload] (put queue-id payload {}))
+ ([queue-id payload opts]
+ (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*]
+ (when (and *test-mode* bootstrap-poller!)
+ (bootstrap-poller! conn))
+ (i/put cfg (get-queue-id queue-id) payload opts))))
+
+
+(defn- do-start! []
+ (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate? slow-thread-watcher-done?] :as cfg} @*config*]
+ (when auto-migrate?
+ (future (migrate/migrate! cfg)))
+ (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size)))
+ queue-listener-ready (promise)]
+ (reset! *running?* true)
+ (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS)
+ (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS)
+ (.execute ^ScheduledExecutorService pool
+ (fn []
+ (try
+ (log/debug "report-queue-listener starting")
+ (rq/report-queue-listener *running?* queue-listener-ready pool *config*)
+ (finally
+ (log/debug "report-queue-listener exiting")
+ (deliver queue-listener-ready :finally)))))
+ (future (try
+ (slow-executor/show-slow-threads pool *config*)
+ (finally
+ (deliver slow-thread-watcher-done? :done))))
+ (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)]
+ (cond (= :timeout q-listener-retval)
+ (do
+ (log/error "Timed out waiting for report-queue-listener to start")
+ (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start")))
+
+ (= :finally q-listener-retval)
+ (do
+ (log/error "report-queue-listener did not start")
+ (throw (IllegalStateException. "report-queue-listener did not start")))
+
+ (= :ready q-listener-retval)
+ (do
+ (log/debug "report-queue-listener is ready"))
+
+ :else
+ (do
+ (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))
+ (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))))))))))
+
+
+(defn start! []
+ (locking threadpool
+ (cond (true? *test-mode*)
+ (log/info "test mode enabled, doing nothing for start!")
+
+ (true? @*running?*)
+ nil
+
+ (false? @*running?*)
+ (do-start!))))
+
+
+(defn stop! []
+ (locking threadpool
+ (cond (true? *test-mode*)
+ (log/info "test mode enabled, doing nothing for stop!")
+
+ (false? @*running?*)
+ nil
+
+ (true? @*running?*)
+ (do
+ (reset! *running?* false)
+ (when-let [^ExecutorService tp @threadpool]
+ (log/debug "shutting down threadpool")
+ (.shutdown tp)
+ (while (not (.awaitTermination tp 1 TimeUnit/SECONDS))
+ (log/trace "waiting for threadpool to stop"))
+ (log/debug "stopped!")
+ (reset! threadpool nil))
+ (when-let [wait-slow-threads (some->> *config* deref :slow-thread-watcher-done?)]
+ (log/debug "waiting for slow-thread-watcher to stop ...")
+ @wait-slow-threads
+ (log/debug "waiting for slow-thread-watcher to stop ... OK"))))))
+
+
+(defn healthy? []
+ (cond
+ (< (.toMinutes (Duration/ofMillis (.getUptime (ManagementFactory/getRuntimeMXBean)))) 10)
+ true
+
+ (false? (some->> @*config*
+ :healthy?
+ (deref)))
+ false
+
+ (true? (some->> @*config*
+ :slow?
+ (deref)))
+ false
+
+ :else
+ true))
+
+(defn unhealthy?
+ "Returns `true` if there are queues in error or a thread is slow, and the application has been up for over 10 minutes, otherwise `false`."
+ []
+ (false? (healthy?)))
+
+(defn queue-stats []
+ (let [{:keys [conn]} @*config*
+ db (d/db conn)]
+ (->> (d/q '[:find ?e ?qname ?status
+ :in $
+ :where
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/status ?status]]
+ db)
+ (mapv (partial zipmap [:e :qname :status]))
+ (mapv #(select-keys % [:qname :status]))
+ (mapv (fn [qitem] {qitem 1}))
+ (reduce (partial merge-with +) {})
+ (mapv (fn [[{:keys [qname status]} v]]
+ (array-map
+ :qname qname
+ :status status
+ :count v)))
+ (sort-by (juxt :qname :status))
+ (vec))))
+
+(defn job-group-progress [queue-name job-group]
+ (let [{:keys [conn]} @*config*
+ db (d/db conn)]
+ (->> (d/q '[:find ?e ?qname ?job-group ?status
+ :keys :e :qname :job-group :status
+ :in $ ?qname ?job-group
+ :where
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/job-group ?job-group]
+ [?e :com.github.ivarref.yoltq/status ?status]]
+ db queue-name job-group)
+ (mapv #(select-keys % [:qname :job-group :status]))
+ (mapv (fn [qitem] {qitem 1}))
+ (reduce (partial merge-with +) {})
+ (mapv (fn [[{:keys [qname job-group status]} v]]
+ (array-map
+ :qname qname
+ :job-group job-group
+ :status status
+ :count v)))
+ (sort-by (juxt :qname :job-group :status))
+ (vec))))
+
+(defn get-errors [qname]
+ (let [{:keys [conn]} @*config*
+ db (d/db conn)]
+ (->> (d/q '[:find [?id ...]
+ :in $ ?qname ?status
+ :where
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/status ?status]
+ [?e :com.github.ivarref.yoltq/id ?id]]
+ db
+ qname
+ :error)
+ (mapv (partial u/get-queue-item db)))))
+
+(defn retry-one-error! [qname]
+ (let [{:keys [handlers] :as cfg} @*config*
+ _ (assert (contains? handlers qname) "Queue not found")
+ cfg (assoc-in cfg [:handlers qname :max-retries] Long/MAX_VALUE)]
+ (poller/poll-once! cfg qname :error)))
+
+(defn retry-stats
+ "Gather retry statistics.
+
+ Optional keyword arguments:
+ * :age-days — last number of days to look at data from. Defaults to 30.
+ * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues.
+
+ Example return value:
+ {:queue-a {:ok 100, :retries 2, :retry-percentage 2.0}
+ :queue-b {:ok 100, :retries 75, :retry-percentage 75.0}}
+
+ From the example value above, we can see that :queue-b fails at a much higher rate than :queue-a.
+ Assuming that the queue consumers are correctly implemented, this means that the service representing :queue-b
+ is much more unstable than the one representing :queue-a. This again implies
+ that you will probably want to fix the downstream service of :queue-b, if that is possible.
+ "
+ [{:keys [age-days queue-name now db]
+ :or {age-days 30
+ now (ZonedDateTime/now ZoneOffset/UTC)}}]
+ (let [{:keys [conn]} @*config*
+ db (or db (d/db conn))]
+ (->> (d/query {:query {:find '[?qname ?status ?tries ?init-time]
+ :in (into '[$] (when queue-name '[?qname]))
+ :where '[[?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/status ?status]
+ [?e :com.github.ivarref.yoltq/tries ?tries]
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]]}
+ :args (remove nil? [db queue-name])})
+ (mapv (partial zipmap [:qname :status :tries :init-time]))
+ (mapv #(update % :init-time (fn [init-time] (.atZone (Instant/ofEpochMilli init-time) ZoneOffset/UTC))))
+ (mapv #(assoc % :age-days (.toDays (Duration/between (:init-time %) now))))
+ (filter #(<= (:age-days %) age-days))
+ (group-by :qname)
+ (mapv (fn [[q values]]
+ {q (let [{:keys [ok retries] :as m} (->> values
+ (mapv (fn [{:keys [tries status]}]
+ (condp = status
+ u/status-init {}
+ u/status-processing {:processing 1 :retries (dec tries)}
+ u/status-done {:ok 1 :retries (dec tries)}
+ u/status-error {:error 1 :retries (dec tries)})))
+ (reduce (partial merge-with +) {}))]
+ (into (sorted-map) (merge m
+ (when (pos-int? ok)
+ {:retry-percentage (double (* 100 (/ retries ok)))}))))}))
+ (into (sorted-map)))))
+
+(defn- percentile [n values]
+ (let [idx (int (Math/floor (* (count values) (/ n 100))))]
+ (nth values idx)))
+
+(defn processing-time-stats
+ "Gather processing time statistics. Default unit is seconds.
+
+ Optional keyword arguments:
+ * :age-days — last number of days to look at data from. Defaults to 30.
+ Use nil to have no limit.
+
+ * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues.
+
+ * :duration->long - Specify what unit should be used for values.
+ Must take a java.time.Duration as input and return a long.
+
+ Defaults to (fn [duration] (.toSeconds duration).
+ I.e. the default unit is seconds.
+
+ Example return value:
+ {:queue-a {:avg 1
+ :max 10
+ :min 0
+ :p50 ...
+ :p90 ...
+ :p95 ...
+ :p99 ...}}"
+ [{:keys [age-days queue-name now db duration->long]
+ :or {age-days 30
+ now (ZonedDateTime/now ZoneOffset/UTC)
+ duration->long (fn [duration] (.toSeconds ^Duration duration))}}]
+ (let [{:keys [conn]} @*config*
+ db (or db (d/db conn))
+ ->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)]
+ (->> (d/query {:query {:find '[?qname ?status ?init-time ?done-time]
+ :in (into '[$ ?status] (when queue-name '[?qname]))
+ :where '[[?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/status ?status]
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]
+ [?e :com.github.ivarref.yoltq/done-time ?done-time]]}
+ :args (vec (remove nil? [db u/status-done queue-name]))})
+ (mapv (partial zipmap [:qname :status :init-time :done-time]))
+ (mapv #(update % :init-time ->zdt))
+ (mapv #(update % :done-time ->zdt))
+ (mapv #(assoc % :age-days (.toDays (Duration/between (:init-time %) now))))
+ (mapv #(assoc % :spent-time (duration->long (Duration/between (:init-time %) (:done-time %)))))
+ (filter #(or (nil? age-days) (<= (:age-days %) age-days)))
+ (group-by :qname)
+ (mapv (fn [[q values]]
+ (let [values (vec (sort (mapv :spent-time values)))]
+ {q (sorted-map
+ :max (apply max values)
+ :avg (int (Math/floor (/ (reduce + 0 values) (count values))))
+ :p50 (percentile 50 values)
+ :p90 (percentile 90 values)
+ :p95 (percentile 95 values)
+ :p99 (percentile 99 values)
+ :min (apply min values))})))
+ (into (sorted-map)))))
+
+(defn get-tx-report-queue-multicast!
+ "Multicast the datomic.api/tx-report-queue to different consumers.
+ A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer.
+ Repeated calls using the same `conn` and `id` returns the same queue.
+
+ The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread
+ to send `:end` if the queue is stopped.
+ The default value for `send-end-token?` is `false`.
+
+ A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`.
+
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ ([conn id]
+ (get-tx-report-queue-multicast! conn id false))
+ ([conn id send-end-token?]
+ (assert (instance? Connection conn))
+ (assert (boolean? send-end-token?))
+ (rq/get-tx-report-queue-multicast! conn id send-end-token?)))
+
+(defn stop-multicast-consumer-id!
+ "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`.
+ If this is the last report destination for the given `conn`, the multicaster thread will exit.
+ Repeated calls are no-op.
+
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if the queue was stopped.
+ Return `false` if the queue does not exist."
+ [conn id]
+ (assert (instance? Connection conn))
+ (rq/stop-multicast-consumer-id! conn id))
+
+(defn stop-multicaster!
+ "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`.
+ The multicaster thread will exit.
+ Repeated calls are no-op.
+
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if any queue belonging to `conn` was stopped.
+ Returns `false` is `conn` did not have any associated queues."
+ [conn]
+ (assert (instance? Connection conn))
+ (rq/stop-multicaster! conn))
+
+(defn stop-all-multicasters!
+ "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`.
+ All multicaster threads will exit.
+ Repeated calls are no-op.
+
+ The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!`
+ was called.
+
+ Returns `true` if any queue was stopped.
+ Returns `false` if no queues existed."
+ []
+ (rq/stop-all-multicasters!))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"ivarref.yoltq.report-queue"} :info]
+ [#{"ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (stop!)
+ (future (let [received (atom [])
+ uri (str "datomic:mem://demo")]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)
+ started-consuming? (promise)
+ n 1]
+ (init! {:conn conn
+ :error-backoff-time (Duration/ofSeconds 1)
+ :poll-delay (Duration/ofSeconds 1)
+ :max-execute-time (Duration/ofSeconds 3)
+ :slow-thread-show-stacktrace? false})
+ (add-consumer! :q (fn [_]
+ (deliver started-consuming? true)
+ (log/info "sleeping...")
+ (Thread/sleep (.toMillis (Duration/ofSeconds 60)))
+ (log/info "done sleeping")))
+ (start!)
+ @(d/transact conn [(put :q {:work 123})])
+ @started-consuming?
+ (stop!)
+ nil)))))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq.migrate"} :warn]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://demo")]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)
+ started-consuming? (promise)
+ n 1]
+ (init! {:conn conn
+ :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true)
+ :slow-thread-show-stacktrace? false})
+ (add-consumer! :q (fn [_]
+ (deliver started-consuming? true)))
+ (log/info "begin start! ...")
+ (start!)
+ (log/info "begin start! ... Done")
+ (Thread/sleep 2000)
+ (log/info "*******************************************")
+ @(d/transact conn [(put :q {:work 123})])
+ @started-consuming?
+ (stop-multicaster! conn)
+ (log/info "*******************************************")
+ (stop!)
+ (log/info "stop! done")
+ nil)))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj
new file mode 100644
index 0000000..dffff28
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/error_poller.clj
@@ -0,0 +1,124 @@
+(ns com.github.ivarref.yoltq.error-poller
+ (:require [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d]))
+
+
+(defn get-state [v]
+ (case v
+ [:error :none] :recovery
+ [:error :some] :error
+ [:error :all] :error
+ [:recovery :none] :recovery
+ [:recovery :some] :recovery
+ [:recovery :all] :error
+ nil))
+
+
+(defn handle-error-count [{:keys [errors last-notify state]
+ :or {errors []
+ last-notify 0
+ state :recovery}}
+ {:keys [system-error-min-count system-error-callback-backoff]
+ :or {system-error-min-count 3}}
+ now-ms
+ error-count]
+ (let [new-errors (->> (conj errors error-count)
+ (take-last system-error-min-count)
+ (vec))
+ classify (fn [coll]
+ (cond
+ (not= system-error-min-count (count coll))
+ :missing
+
+ (every? pos-int? coll)
+ :all
+
+ (every? zero? coll)
+ :none
+
+ :else
+ :some))
+ old-state state]
+ (merge
+ {:errors new-errors
+ :last-notify last-notify}
+ (when-let [new-state (get-state [old-state (classify new-errors)])]
+ (merge
+ {:state new-state}
+ (when (and (= old-state :recovery)
+ (= new-state :error))
+ {:run-callback :error
+ :last-notify now-ms})
+
+ (when (and (= new-state :error)
+ (= old-state :error)
+ (> now-ms
+ (+ last-notify system-error-callback-backoff)))
+ {:run-callback :error
+ :last-notify now-ms})
+
+ (when (and (= new-state :recovery)
+ (= old-state :error))
+ {:run-callback :recovery}))))))
+
+
+(defn do-poll-errors [{:keys [conn
+ system-error
+ on-system-error
+ on-system-recovery
+ healthy?
+ healthy-allowed-error-time]
+ :or {on-system-error (fn []
+ (log/error "There are yoltq queues which have errors")
+ nil)
+ on-system-recovery (fn []
+ (log/info "Yoltq recovered"))}
+ :as config}
+ now-ms]
+ (assert (some? conn) "expected :conn to be present")
+ (assert (some? system-error) "expected :system-error to be present")
+ (assert (nat-int? healthy-allowed-error-time) "expected :healthy-allowed-error-time to be present")
+ (let [max-init-time (- now-ms healthy-allowed-error-time)
+ error-count (or (d/q '[:find (count ?e) .
+ :in $ ?status ?max-init-time
+ :where
+ [?e :com.github.ivarref.yoltq/status ?status]
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]
+ [(<= ?init-time ?max-init-time)]]
+ (d/db conn)
+ u/status-error
+ max-init-time)
+ 0)]
+ (if (pos-int? error-count)
+ (do
+ (log/debug "poll-errors found" error-count "errors in system")
+ (reset! healthy? false))
+ (reset! healthy? true))
+ (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config now-ms error-count)]
+ (when run-callback
+ (cond (= run-callback :error)
+ (on-system-error)
+
+ (= run-callback :recovery)
+ (on-system-recovery)
+
+ :else
+ (log/error "unhandled callback-type" run-callback))
+ (log/debug "run-callback is" run-callback))
+ error-count)))
+
+
+(defn poll-errors [running? config-atom]
+ (try
+ (when @running?
+ (do-poll-errors @config-atom (ext/now-ms)))
+ (catch Throwable t
+ (log/error t "unexpected error in poll-errors:" (ex-message t))
+ nil)))
+
+
+(comment
+ (do-poll-errors @com.github.ivarref.yoltq/*config* (ext/now-ms)))
+
diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj
new file mode 100644
index 0000000..692b934
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/ext_sys.clj
@@ -0,0 +1,27 @@
+(ns com.github.ivarref.yoltq.ext-sys
+ (:require [datomic.api :as d])
+ (:refer-clojure :exclude [random-uuid])
+ (:import (java.util UUID)))
+
+
+(def ^:dynamic *now-ms-atom* nil)
+(def ^:dynamic *squuid-atom* nil)
+(def ^:dynamic *random-atom* nil)
+
+
+(defn now-ms []
+ (if *now-ms-atom*
+ @*now-ms-atom*
+ (System/currentTimeMillis)))
+
+
+(defn squuid []
+ (if *squuid-atom*
+ (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *squuid-atom* inc))))
+ (d/squuid)))
+
+
+(defn random-uuid []
+ (if *random-atom*
+ (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc))))
+ (UUID/randomUUID)))
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
new file mode 100644
index 0000000..ffb1ad8
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -0,0 +1,248 @@
+(ns com.github.ivarref.yoltq.impl
+ (:require [clojure.edn :as edn]
+ [clojure.string :as str]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.double-trouble :as dt]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d])
+ (:import (java.time Year)))
+
+(def schema
+ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
+ #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value}
+ #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/job-group, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes}
+ #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true}
+ #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true}
+ #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}])
+
+(defn pr-str-inner [x]
+ (binding [*print-dup* false
+ *print-meta* false
+ *print-readably* true
+ *print-length* nil
+ *print-level* nil
+ *print-namespace-maps* false]
+ (pr-str x)))
+
+(defn pr-str-safe [what x]
+ (try
+ (if (= x (edn/read-string (pr-str-inner x)))
+ (pr-str-inner x)
+ (throw (ex-info (str "Could not read-string " what) {:input x})))
+ (catch Exception e
+ (log/error "could not read-string" what ":" (ex-message e))
+ (throw e))))
+
+(defn default-partition-fn [_queue-keyword]
+ (keyword "yoltq" (str "queue_" (.getValue (Year/now)))))
+
+(defn put [{:keys [capture-bindings conn encode partition-fn]
+ :or {partition-fn default-partition-fn
+ encode (partial pr-str-safe :payload)}
+ :as config}
+ queue-name
+ payload
+ opts]
+ (if-let [q-config (get-in config [:handlers queue-name])]
+ (let [id (u/squuid)
+ encode (get q-config :encode encode)
+ partition-fn (get q-config :partition-fn partition-fn)
+ partition (partition-fn queue-name)
+ _ (assert (keyword? partition) "Partition must be a keyword")
+ depends-on (get q-config :depends-on (fn [_] nil))
+ valid-payload? (get q-config :valid-payload? (fn [_] true))
+ opts (merge
+ (when-let [deps (depends-on payload)]
+ {:depends-on deps})
+ (or opts {}))
+ str-bindings (->> (reduce (fn [o k]
+ (assoc o (symbol k) (deref k)))
+ {}
+ (or capture-bindings []))
+ (pr-str-safe :capture-bindings))
+ _ (when-not (valid-payload? payload)
+ (log/error "Payload was not valid. Payload was:" payload)
+ (throw (ex-info (str "Payload was not valid: " payload) {:payload payload})))
+ encoded (encode payload)
+ _ (when (not (or (bytes? encoded) (string? encoded)))
+ (log/error "Payload must be encoded to either a string or a byte array")
+ (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))]
+ (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init)
+ (do
+ (dt/ensure-partition! conn partition)
+ (merge
+ (if (bytes? encoded)
+ {:com.github.ivarref.yoltq/payload-bytes encoded}
+ {:com.github.ivarref.yoltq/payload encoded})
+ {:db/id (d/tempid partition)
+ :com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name queue-name
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/bindings str-bindings
+ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
+ :com.github.ivarref.yoltq/lock (u/random-uuid)
+ :com.github.ivarref.yoltq/tries 0
+ :com.github.ivarref.yoltq/init-time (u/now-ms)
+ :com.github.ivarref.yoltq/version "2"}
+ (when-let [[q ext-id] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
+ (d/db conn)
+ (pr-str-safe :depends-on [q ext-id]))
+ (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
+ (when-let [ext-id (:id opts)]
+ {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})
+ (when-let [job-group (:job-group opts)]
+ {:com.github.ivarref.yoltq/job-group job-group}))))
+ (do
+ (log/error "Did not find registered handler for queue" queue-name)
+ (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
+
+
+(defn depends-on-waiting? [{:keys [conn]}
+ q-item]
+ (let [db (d/db conn)]
+ (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db (:id q-item))]
+ (when-let [[q id :as depends-on] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]
+ [?e :com.github.ivarref.yoltq/status :done]]
+ db
+ (pr-str [q id]))
+ (log/info "queue item" (str (:id q-item)) "is waiting on" depends-on)
+ {:depends-on depends-on})))))
+
+
+(defn take! [{:keys [conn cas-failures hung-log-level tx-spent-time!]
+ :or {hung-log-level :error}}
+ {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}]
+ (when queue-item-info
+ (try
+ (cond to-error?
+ (log/logp hung-log-level "queue-item" (str id) "was hung and retried too many times. Giving up!")
+
+ was-hung?
+ (log/logp hung-log-level "queue-item" (str id) "was hung, retrying ...")
+
+ :else
+ nil)
+ (let [start-time (System/nanoTime)
+ {:keys [db-after]} @(d/transact conn tx)
+ _ (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time)))
+ {:com.github.ivarref.yoltq/keys [status] :as q-item} (u/get-queue-item db-after id)]
+ (log/debug "queue item" (str id) "for queue" queue-name "now has status" status)
+ q-item)
+ (catch Throwable t
+ (let [{:db/keys [error] :as m} (u/db-error-map t)]
+ (cond
+ (= :db.error/cas-failed error)
+ (do
+ (log/info "take! :db.error/cas-failed for queue item" (str id) "and attribute" (:a m))
+ (when cas-failures
+ (swap! cas-failures inc))
+ nil)
+
+ :else
+ (do
+ (log/error t "Unexpected failure for queue item" (str id) ":" (ex-message t))
+ nil)))))))
+
+
+(defn mark-status! [{:keys [conn tx-spent-time!]}
+ {:com.github.ivarref.yoltq/keys [id lock tries]}
+ new-status]
+ (try
+ (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status]
+ (if (= new-status u/status-done)
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)}
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})]
+ start-time (System/nanoTime)
+ {:keys [db-after]} @(d/transact conn tx)]
+ (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time)))
+ (u/get-queue-item db-after id))
+ (catch Throwable t
+ (log/error t "unexpected error in mark-status!: " (ex-message t))
+ nil)))
+
+
+(defn fmt [id queue-name new-status tries spent-ns]
+ (str/join " " ["queue-item" (str id)
+ "for queue" queue-name
+ "now has status" new-status
+ "after" tries (if (= 1 tries)
+ "try"
+ "tries")
+ "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"]))
+
+
+(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!]
+ :or {mark-status-fn! mark-status!
+ decode edn/read-string}
+ :as cfg}
+ {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}]
+ (when queue-item
+ (if (= :error status)
+ (assoc queue-item :failed? true)
+ (if-let [queue (get handlers queue-name)]
+ (let [{:keys [f allow-cas-failure?]} queue
+ decode (get queue :decode decode)]
+ (log/debug "queue item" (str id) "for queue" queue-name "is now processing")
+ (let [{:keys [retval exception]}
+ (try
+ (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name])
+ (let [payload (decode (or payload payload-bytes))
+ v (f payload)]
+ {:retval v})
+ (catch Throwable t
+ {:exception t})
+ (finally
+ (swap! start-execute-time dissoc (Thread/currentThread))))
+ {:db/keys [error] :as m} (u/db-error-map exception)]
+ (cond
+ (and (some? exception)
+ allow-cas-failure?
+ (= :db.error/cas-failed error)
+ (or (true? allow-cas-failure?)
+ (allow-cas-failure? (:a m))))
+ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
+ (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
+ (log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
+ (assoc q-item :retval retval :success? true :allow-cas-failure? true)))
+
+ (some? exception)
+ (when-let [q-item (mark-status-fn! cfg queue-item u/status-error)]
+ (let [{:com.github.ivarref.yoltq/keys [init-time error-time tries]} q-item
+ level (if (>= tries 3) :error :warn)]
+ (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time)))
+ (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id))
+ (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
+ (assoc q-item :exception exception)))
+
+ :else
+ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
+ (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
+ (log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
+ (assoc q-item :retval retval :success? true))))))
+ (do
+ (log/error "no handler for queue" queue-name)
+ nil)))))
diff --git a/src/com/github/ivarref/yoltq/migrate.clj b/src/com/github/ivarref/yoltq/migrate.clj
new file mode 100644
index 0000000..c97f679
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/migrate.clj
@@ -0,0 +1,61 @@
+(ns com.github.ivarref.yoltq.migrate
+ (:require [datomic.api :as d]
+ [clojure.tools.logging :as log]))
+
+(defn to->v2-ent [{:keys [conn]} now-ms id]
+ (log/info "Migrating id" id)
+ (let [attr-val (fn [attr]
+ (when-let [old (d/q '[:find ?time .
+ :in $ ?e ?a
+ :where
+ [?e ?a ?time]]
+ (d/db conn)
+ [:com.github.ivarref.yoltq/id id]
+ attr)]
+ (let [now-ms (or now-ms
+ (.getTime (d/q '[:find (max ?txinst) .
+ :in $ ?e ?a
+ :where
+ [?e ?a _ ?tx true]
+ [?tx :db/txInstant ?txinst]]
+ (d/history (d/db conn))
+ [:com.github.ivarref.yoltq/id id]
+ attr)))]
+ (log/info "Updating" id attr "to" now-ms)
+ [[:db/cas [:com.github.ivarref.yoltq/id id]
+ attr old now-ms]])))]
+ (vec (concat [[:db/cas [:com.github.ivarref.yoltq/id id]
+ :com.github.ivarref.yoltq/version nil "2"]]
+ (mapcat attr-val [:com.github.ivarref.yoltq/init-time
+ :com.github.ivarref.yoltq/processing-time
+ :com.github.ivarref.yoltq/done-time
+ :com.github.ivarref.yoltq/error-time])))))
+
+(defn to->v2 [{:keys [conn loop? now-ms]
+ :or {loop? true}
+ :as cfg}]
+ (loop [tx-vec []]
+ (if-let [id (some->> (d/q '[:find [?id ...]
+ :in $
+ :where
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [(missing? $ ?e :com.github.ivarref.yoltq/version)]]
+ (d/db conn))
+ (sort)
+ (not-empty)
+ (first))]
+ (let [tx (to->v2-ent cfg now-ms id)]
+ @(d/transact conn tx)
+ (if loop?
+ (recur (vec (take 10 (conj tx-vec tx))))
+ tx))
+ (do
+ (log/info "No items left to migrate")
+ tx-vec))))
+
+
+(defn migrate! [cfg]
+ (to->v2 cfg))
+
+(comment
+ (migrate! @com.github.ivarref.yoltq/*config*))
diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj
new file mode 100644
index 0000000..9cf81c7
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/poller.clj
@@ -0,0 +1,64 @@
+(ns com.github.ivarref.yoltq.poller
+ (:require [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.impl :as i]
+ [clojure.tools.logging :as log]))
+
+
+(defn poll-once! [cfg q status]
+ (when-let [item (case status
+ :init (u/get-init cfg q)
+ :error (u/get-error cfg q)
+ :hung (u/get-hung cfg q))]
+ (with-bindings (get item :bindings {})
+ (if (i/depends-on-waiting? cfg item)
+ nil
+ (some->> item
+ (i/take! cfg)
+ (i/execute! cfg))))))
+
+
+(defn poll-queue! [running?
+ {:keys [running-queues] :as cfg}
+ [queue-name status :as q]]
+ (try
+ (let [[old _] (swap-vals! running-queues conj q)]
+ (if-not (contains? old q)
+ (try
+ (log/debug "polling queue" queue-name "for status" status)
+ (let [start-time (u/now-ms)
+ last-res (loop [prev-res nil]
+ (when @running?
+ (let [res (poll-once! cfg queue-name status)]
+ (log/debug "poll-once! returned" res)
+ (if (and res (:success? res))
+ (recur res)
+ prev-res))))]
+ (let [spent-ms (- (u/now-ms) start-time)]
+ (log/trace "done polling queue" q "in" spent-ms "ms"))
+ last-res)
+ (finally
+ (swap! running-queues disj q)))
+ (log/debug "queue" q "is already being polled, doing nothing...")))
+ (catch Throwable t
+ (log/error t "poll-queue! crashed:" (ex-message t)))
+ (finally)))
+
+(comment
+ (def cfg @com.github.ivarref.yoltq/*config*))
+
+(comment
+ (poll-queue!
+ (atom true)
+ @com.github.ivarref.yoltq/*config*
+ [:add-message-thread :init]))
+
+(defn poll-all-queues! [running? config-atom pool]
+ (try
+ (when @running?
+ (let [{:keys [handlers]} @config-atom]
+ (doseq [q (shuffle (vec (for [q-name (keys handlers)
+ status [:init :error :hung]]
+ [q-name status])))]
+ (.execute pool (fn [] (poll-queue! running? @config-atom q))))))
+ (catch Throwable t
+ (log/error t "poll-all-queues! crashed:" (ex-message t)))))
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
new file mode 100644
index 0000000..f83e3ba
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -0,0 +1,459 @@
+(ns com.github.ivarref.yoltq.report-queue
+ (:require [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.impl :as i]
+ [datomic.api :as d]
+ [clojure.tools.logging :as log])
+ (:import (datomic Connection Datom)
+ (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit)))
+
+; Private API, subject to change!
+
+(defn process-poll-result! [cfg id-ident poll-result consumer]
+ (let [{:keys [tx-data db-after]} poll-result]
+ (when-let [new-ids (->> tx-data
+ (filter (fn [^Datom datom] (and
+ (= (.a datom) id-ident)
+ (.added datom))))
+ (mapv (fn [^Datom datom] (.v datom)))
+ (into [])
+ (not-empty))]
+ (doseq [id new-ids]
+ (consumer (fn []
+ (try
+ (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name bindings]} (u/get-queue-item db-after id)]
+ (with-bindings (or bindings {})
+ (if (i/depends-on-waiting? cfg {:id id})
+ nil
+ (some->>
+ (u/prepare-processing db-after id queue-name lock status)
+ (i/take! cfg)
+ (i/execute! cfg)))))
+ (catch Throwable t
+ (log/error t "Unexpected error in process-poll-result!")))))))))
+
+(defn report-queue-listener [running?
+ ready?
+ ^ScheduledExecutorService pool
+ config-atom]
+ (let [cfg @config-atom
+ conn (:conn cfg)
+ tx-report-queue-given (contains? cfg :tx-report-queue)
+ ^BlockingQueue q (if tx-report-queue-given
+ (get cfg :tx-report-queue)
+ (d/tx-report-queue conn))
+ id-ident (d/q '[:find ?e .
+ :where [?e :db/ident :com.github.ivarref.yoltq/id]]
+ (d/db conn))]
+ (assert (instance? BlockingQueue q))
+ (log/debug "tx-report-queue-given:" tx-report-queue-given)
+ (try
+ (let [running-local? (atom true)]
+ (while (and @running? @running-local?)
+ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
+ (if (= poll-result :end)
+ (do
+ (log/debug "report-queue-listener received :end token. Exiting")
+ (reset! running-local? false))
+ ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system."))
+ (process-poll-result! @config-atom
+ id-ident
+ poll-result
+ (fn [f]
+ (when @running?
+ (.execute ^ScheduledExecutorService pool f))))))
+ (deliver ready? :ready)))
+ (catch Throwable t
+ (log/error t "Unexpected error in report-queue-listener:" (.getMessage t)))
+ (finally
+ (if tx-report-queue-given
+ (log/debug "Remove tx-report-queue handled elsewhere")
+ (do
+ (log/debug "Remove tx-report-queue")
+ (d/remove-tx-report-queue conn)))))))
+
+; https://stackoverflow.com/a/14488425
+(defn- dissoc-in
+ "Dissociates an entry from a nested associative structure returning a new
+ nested structure. keys is a sequence of keys. Any empty maps that result
+ will not be present in the new structure."
+ [m [k & ks :as keys]]
+ (if ks
+ (if-let [nextmap (get m k)]
+ (let [newmap (dissoc-in nextmap ks)]
+ (if (seq newmap)
+ (assoc m k newmap)
+ (dissoc m k)))
+ m)
+ (dissoc m k)))
+
+(defn- queues-to-shutdown [old-state new-state]
+ (assert (map? old-state))
+ (assert (map? new-state))
+ (doseq [x (vals new-state)]
+ (assert (vector? x)))
+ (doseq [x (vals old-state)]
+ (assert (vector? x)))
+ (let [new-qs (into #{} (mapv second (vals new-state)))]
+ (reduce
+ (fn [o [send-end-token? old-q]]
+ ;(assert (boolean? send-end-token?))
+ ;(assert (instance? BlockingQueue old-q))
+ (if (contains? new-qs old-q)
+ o
+ (conj o [send-end-token? old-q])))
+ []
+ (vals old-state))))
+
+(comment
+ (queues-to-shutdown {:a [true 999] :b [false 777]}
+ {:a [true 123] :b [true 777]}))
+
+(defn- multicast-once [conn work-item old-state new-state]
+ (assert (map? old-state))
+ (assert (map? new-state))
+ (doseq [[send-end-token? q-to-shutdown] (queues-to-shutdown old-state new-state)]
+ (if send-end-token?
+ (do
+ #_(log/debug "offering :end token")
+ (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS)
+ (log/debug "Multicaster sent :end token")
+ (log/debug "Multicaster failed to send :end token")))
+ (do
+ (log/debug "Multicaster not sending :end token"))))
+ (when (seq new-state)
+ (if (some? work-item)
+ (reduce-kv
+ (fn [m id [send-end-token? q]]
+ (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)]
+ (if (true? ok-offer)
+ (assoc m id [send-end-token? q])
+ (log/error "Multicaster failed to offer item for connection" conn "and queue id" id))))
+ {}
+ new-state)
+ new-state)))
+
+(defonce ^:private multicast-state-lock (Object.))
+(defonce ^:private consumer-state-lock (Object.))
+(defonce ^:private multicast-state (atom {}))
+(defonce ^:private thread-count (atom 0))
+
+(defn- multicaster-loop [init-state conn ready?]
+ (assert (instance? Connection conn))
+ (let [input-queue (d/tx-report-queue conn)]
+ (deliver ready? true)
+ (loop [old-state init-state]
+ (let [work-item (.poll ^BlockingQueue input-queue 16 TimeUnit/MILLISECONDS)
+ new-state (locking multicast-state-lock
+ ; writer to `multicast-state` must be protected by `multicast-state-lock`
+ ; it should block minimally / spend minimum amount of time
+ (swap! multicast-state (fn [old-state] (update-in old-state [:iter-count conn] (fnil inc 0))))
+ (if-let [new-state (multicast-once conn work-item old-state (get-in @multicast-state [:queues conn] {}))]
+ new-state
+ (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))
+ (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec)))
+ (d/remove-tx-report-queue conn)
+ (log/debug "Multicaster removed tx-report-queue for conn" conn)
+ nil)))]
+ (if new-state
+ (recur new-state)
+ nil)))))
+
+(defn- start-multicaster! [conn]
+ (assert (instance? Connection conn))
+ (let [ready? (promise)]
+ (future
+ (log/debug "Multicaster starting for conn" conn)
+ (try
+ (swap! thread-count inc)
+ (let [new-state (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] (fnil inc 0))))]
+ (assert (= 1 (get-in new-state [:thread-count conn])))
+ ; "parent" thread holds `multicast-state-lock` and
+ ; waits for `ready?` promise, so effectively this new thread also holds
+ ; the lock until `ready?` is delivered. That is: it is safe
+ ; for this thread to modify multicast-state regardless of what other threads are doing
+ (multicaster-loop (get-in new-state [:queues conn]) conn ready?))
+ (catch Throwable t
+ (log/error t "Unexpected error in multicaster:" (.getMessage t))
+ (log/error "Multicaster exiting for conn"))
+ (finally
+ (swap! thread-count dec)
+ (log/debug "Multicaster exiting for conn" conn))))
+ (when (= :timeout (deref ready? 30000 :timeout))
+ (throw (RuntimeException. "Timed out waiting for multicaster to start")))))
+
+(defn- wait-multicast-thread-step
+ [conn state]
+ ; `get-tx-report-queue-multicast!` should return only when the multicaster thread
+ ; has picked up the new queue.
+ ;
+ ; Otherwise the following could happen:
+ ; 1. multicast thread is sleeping
+ ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true`
+ ; 3: user-thread (or somebody else) calls `stop-multicaster`.
+ ; The multicast-state atom is now identical as it was in step 1.
+ ; , Step 2 and 3 happened while the multicast thread was sleeping.
+ ; 4: The multicast thread is scheduled and does _not_ detect any state change.
+ ; Therefore the multicast thread does _not_ send out an :end token as one would expect.
+ ;
+ ; The new queue is written to memory at this point. No other thread can remove it because
+ ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock.
+ ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock,
+ ; we can be sure that no other thread changes or has changed the state.
+ ;
+ ; Once [:iter-count conn] has changed, we know that the multicaster thread
+ ; has seen the new queue. This means that we can be sure that the queue
+ ; will receive the `:end` token if the queue is stopped.
+ (let [start-ms (System/currentTimeMillis)
+ iter-count (get-in state [:iter-count conn] -1)]
+ (loop [spin-count 0]
+ (if (not= iter-count (locking multicast-state-lock
+ (get-in @multicast-state [:iter-count conn] -1)))
+ nil
+ (let [spent-ms (- (System/currentTimeMillis) start-ms)]
+ (if (> spent-ms 30000)
+ (throw (RuntimeException. "Timed out waiting for multicaster thread"))
+ (do
+ (Thread/sleep 16)
+ (recur (inc spin-count)))))))))
+
+(defn get-tx-report-queue-multicast!
+ "Multicast the datomic.api/tx-report-queue to different consumers.
+ A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer.
+ Repeated calls using the same `conn` and `id` returns the same queue.
+
+ The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread
+ to send `:end` if the queue is stopped. The default value is `false`.
+
+ A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`.
+
+ Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`."
+ ([conn id]
+ (get-tx-report-queue-multicast! conn id false))
+ ([conn id send-end-token?]
+ (assert (instance? Connection conn))
+ (locking consumer-state-lock
+ (let [[new-state the-q]
+ (locking multicast-state-lock
+ (assert (map? @multicast-state))
+ (if-let [existing-q (get-in @multicast-state [:queues conn id])]
+ (do
+ (let [new-state (swap! multicast-state
+ (fn [old-state]
+ (update-in old-state [:queues conn id] (fn [[end-token? q]]
+ (if (not= end-token? send-end-token?)
+ (log/debug "flipped `send-end-token?`")
+ (log/debug "identical `send-end-token?`"))
+ [send-end-token? q]))))]
+ (log/debug "Returning existing queue for id" id)
+ (assert (instance? BlockingQueue (second existing-q)))
+ [new-state (second existing-q)]))
+ (let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn]))
+ new-q (LinkedBlockingQueue.)
+ new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))]
+ (if needs-multicaster?
+ (do
+ (start-multicaster! conn)
+ (log/debug "Returning new queue for id" id "(multicaster thread started)")
+ [new-state new-q])
+ (do
+ (log/debug "Returning new queue for id" id "(multicaster thread already running)")
+ [new-state new-q])))))]
+ ; wait for multicaster thread to pick up current Queue
+ (wait-multicast-thread-step conn new-state)
+ the-q))))
+
+(defn- wait-multicast-threads-exit [[old-state new-state]]
+ (assert (map? old-state))
+ (assert (map? new-state))
+ (assert (map? (get old-state :queues {})))
+ (assert (map? (get new-state :queues {})))
+ (assert (map? (get old-state :thread-count {})))
+ (assert (map? (get new-state :thread-count {})))
+ (locking consumer-state-lock
+ ; No new multicast threads will be launched inside this block.
+ ; The lock is already held by parent function.
+ ;
+ ; Why do we need to _wait_ for multicaster thread(s) to exit after
+ ; removing all queue ids for a given connection?
+ ; Otherwise the following could happen:
+ ; 1. multicaster thread is sleeping
+ ; 2. user calls stop-multicaster!
+ ; One would expect that multicaster thread would exit, but it is still sleeping
+ ; 3. user calls get-tx-report-queue-multicast! with the same conn
+ ; The state is now empty, so a new multicaster thread is spawned.
+ ; 4. Now there is two multicaster threads for the same connection!
+ ; ... and since the datomic report queue can be shared between threads
+ ; it will seemingly work, but when the end event is sent, it will be
+ ; sent by multiple threads.
+ (let [old-conns (into #{} (keys (get old-state :queues {})))
+ new-conns (into #{} (keys (get new-state :queues {})))]
+ (assert (every?
+ (fn [x] (instance? Connection x))
+ old-conns))
+ (assert (every?
+ (fn [x] (instance? Connection x))
+ new-conns))
+ (doseq [old-conn old-conns]
+ (when-not (contains? new-conns old-conn)
+ (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)]
+ (assert (= 1 old-threadcount))
+ (let [start-ms (System/currentTimeMillis)]
+ (loop []
+ (if (= 0 (get-in @multicast-state [:thread-count old-conn]))
+ :ok
+ (do
+ (let [spent-ms (- (System/currentTimeMillis) start-ms)]
+ (if (> spent-ms 30000)
+ (throw (RuntimeException. "Timed out waiting for multicaster thread to exit"))
+ (do
+ (Thread/sleep 16)
+ (recur))))))))))))))
+
+(defn- all-queues [state]
+ (->> (mapcat (fn [[conn qmap]]
+ (mapv (fn [q-id] [conn q-id])
+ (keys qmap)))
+ (seq (get state :queues {})))
+ (into #{})))
+
+(comment
+ (do
+ (assert (= #{}
+ (all-queues {})))
+ (assert (= #{}
+ (all-queues {:queues {}})))
+ (assert (= #{[:conn-a :q-id]}
+ (all-queues {:queues {:conn-a {:q-id 1}}})))
+ (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]}
+ (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}})))
+ (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]}
+ (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}
+ :conn-b {:q-id-3 3}}})))))
+
+(defn- removed-queues? [old new]
+ (not= (all-queues old)
+ (all-queues new)))
+
+(defn stop-multicast-consumer-id! [conn id]
+ (assert (instance? Connection conn))
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state]
+ (let [new-state (dissoc-in old-state [:queues conn id])]
+ (if (= {} (get-in new-state [:queues conn]))
+ (dissoc-in old-state [:queues conn])
+ new-state))))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
+
+(defn stop-multicaster! [conn]
+ (assert (instance? Connection conn))
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
+
+(defn stop-all-multicasters! []
+ (let [did-remove? (atom nil)]
+ (locking consumer-state-lock
+ (wait-multicast-threads-exit
+ (locking multicast-state-lock
+ (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))]
+ (reset! did-remove? (removed-queues? old new))
+ [old new]))))
+ @did-remove?))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (require '[datomic.api :as d])
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (defonce conn (let [uri (str "datomic:mem://demo")
+ _ (d/delete-database uri)
+ _ (d/create-database uri)
+ conn (d/connect uri)]
+ conn))))
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (defn drain! [^BlockingQueue q]
+ (loop [items []]
+ (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)]
+ (recur (conj items elem))
+ items)))
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq"} :debug]
+ ;[#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (log/info "********************************")
+ (defonce conn (let [uri (str "datomic:mem://demo")
+ _ (d/delete-database uri)
+ _ (d/create-database uri)
+ conn (d/connect uri)]
+ conn))
+ (log/info "stop-all!")
+ (stop-all-multicasters!)
+ (assert (= 0 @thread-count))
+ (let [q1 (get-tx-report-queue-multicast! conn :q1 false)
+ q2 (get-tx-report-queue-multicast! conn :q2 false)
+ _ (get-tx-report-queue-multicast! conn :q1 true)]
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ (log/info "begin drain q1")
+ (stop-multicast-consumer-id! conn :q1)
+ (stop-multicast-consumer-id! conn :q1)
+ (println "thread count" @thread-count)
+ (let [qitems-2 (drain! q2)
+ qitems-1 (drain! q1)]
+ (assert (= :end (last qitems-1)))
+ (println "drain count q1:" (count qitems-1))
+ (println "drain count q2:" (count qitems-2))))))
+
+(comment
+ (do
+ (let [q (get-tx-report-queue-multicast! conn :q1 true)]
+ (log/debug "stopping id :q1")
+ (stop-multicaster-id! conn :q1)
+ (let [drained (drain! q)]
+ (println "drained:" drained)
+ (assert (= [:end] drained)))
+ @multicast-state)))
+
+(comment
+ (stop-all-multicasters!))
+
+(comment
+ (do
+ (let [q (get-tx-report-queue-multicast! conn :q2 false)]
+ (println "drain count:" (count (drain! q)))
+ @multicast-state
+ nil)))
+
+(comment
+ (get-tx-report-queue-multicast! conn :q1 false)
+ (get-tx-report-queue-multicast! conn :q1 true))
+
+(comment
+ (do
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ @(d/transact conn [{:db/doc "demo"}])
+ :yay)) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
new file mode 100644
index 0000000..53dfe89
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
@@ -0,0 +1,36 @@
+(ns com.github.ivarref.yoltq.slow-executor-detector
+ (:require [clojure.string :as str]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.ext-sys :as ext])
+ (:import (java.util.concurrent ExecutorService)))
+
+(defn- do-show-slow-threads [{:keys [start-execute-time
+ max-execute-time
+ slow?
+ slow-thread-show-stacktrace?]
+ :or {slow-thread-show-stacktrace? true}}]
+ (let [new-slow-val (atom false)]
+ (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time]
+ (when (> (ext/now-ms) (+ start-time max-execute-time))
+ (reset! new-slow-val true)
+ (log/error "thread" (.getName thread) "spent too much time on"
+ "queue item" (str queue-id)
+ "for queue" queue-name
+ (if slow-thread-show-stacktrace?
+ (str "stacktrace: \n" (str/join "\n" (mapv str (seq (.getStackTrace thread)))))
+ ""))))
+ (reset! slow? @new-slow-val)))
+
+(defn show-slow-threads [^ExecutorService pool config-atom]
+ (try
+ (while (not (.isTerminated pool))
+ (try
+ (do-show-slow-threads @config-atom)
+ (catch Throwable t
+ (log/error t "do-show-slow-threads crashed:" (ex-message t))))
+ (dotimes [_ 3]
+ (when (not (.isTerminated pool))
+ (Thread/sleep 1000))))
+ (log/debug "show-slow-threads exiting")
+ (catch Throwable t
+ (log/error t "reap! crashed:" (ex-message t)))))
diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj
new file mode 100644
index 0000000..ee9cd54
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/test_queue.clj
@@ -0,0 +1,191 @@
+(ns com.github.ivarref.yoltq.test-queue
+ (:require [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.report-queue :as rq]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq :as yq]
+ [datomic.api :as d]
+ [com.github.ivarref.yoltq.poller :as poller]
+ [clojure.test :as test]
+ [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.impl :as i])
+ (:import (java.util.concurrent BlockingQueue TimeUnit)
+ (datomic Datom)))
+
+
+(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn]
+ (let [ready? (promise)]
+ (future
+ (let [q (d/tx-report-queue conn)]
+ (try
+ (while @running?
+ (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)]
+ (swap! txq conj poll-result))
+ (deliver ready? true)
+ (reset! bootstrapped? true))
+ (catch Throwable t
+ (log/error t "test-poller crashed: " (ex-message t)))
+ (finally
+ (try
+ (d/remove-tx-report-queue conn)
+ (catch Throwable t
+ (log/warn t "remove-tx-report-queue failed:" (ex-message t))))
+ (deliver poller-exited? true)))))
+ @ready?))
+
+
+(defmacro with-virtual-queue!
+ [& body]
+ `(let [txq# (atom [])
+ poller-exited?# (promise)
+ bootstrapped?# (atom false)
+ running?# (atom true)
+ config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#)
+ :init-backoff-time 0
+ :hung-log-level :warn
+ :prev-consumed (atom {})
+ :tx-queue txq#})]
+ (with-bindings {#'yq/*config* config#
+ #'yq/*running?* (atom false)
+ #'yq/*test-mode* true
+ #'ext/*now-ms-atom* (atom 0)
+ #'ext/*random-atom* (atom 0)
+ #'ext/*squuid-atom* (atom 0)}
+ (try
+ ~@body
+ (finally
+ (reset! running?# false)
+ (when @bootstrapped?#
+ @poller-exited?#))))))
+
+
+(defn call-with-virtual-queue!
+ [f]
+ (with-virtual-queue!
+ (f)))
+
+
+(defn run-report-queue! [min-items]
+ (let [{:keys [tx-queue conn]} @yq/*config*
+ id-ident (d/q '[:find ?e .
+ :where [?e :db/ident :com.github.ivarref.yoltq/id]]
+ (d/db conn))]
+ (let [timeout (+ 3000 (System/currentTimeMillis))]
+ (while (and (< (System/currentTimeMillis) timeout)
+ (< (count @tx-queue) min-items))
+ (Thread/sleep 10)))
+ (when (< (count @tx-queue) min-items)
+ (let [msg (str "run-report-queue: timeout waiting for " min-items " items")]
+ (log/error msg)
+ (throw (ex-info msg {}))))
+ (let [res (atom [])]
+ (doseq [itm (first (swap-vals! tx-queue (constantly [])))]
+ (rq/process-poll-result!
+ @yq/*config*
+ id-ident
+ itm
+ (fn [f] (swap! res conj (f)))))
+ @res)))
+
+
+(defn run-one-report-queue! []
+ (first (run-report-queue! 1)))
+
+
+(defn run-queue-once! [q status]
+ (poller/poll-once! @yq/*config* q status))
+
+
+(defn put! [q payload]
+ @(d/transact (:conn @yq/*config*) [(yq/put q payload)]))
+
+
+(defn transact-result->maps [{:keys [tx-data db-after]}]
+ (let [m (->> tx-data
+ (group-by (fn [^Datom d] (.e d)))
+ (vals)
+ (mapv (fn [datoms]
+ (reduce (fn [o ^Datom d]
+ (if (.added d)
+ (assoc o (d/q '[:find ?r .
+ :in $ ?e
+ :where [?e :db/ident ?r]]
+ db-after
+ (.a d))
+ (.v d))
+ o))
+ {}
+ datoms))))]
+ m))
+
+(defn contains-queue-job?
+ [queue-id conn {::yq/keys [id queue-name status] :as m}]
+ (when (and (= queue-id queue-name)
+ (= status :init)
+ (d/q '[:find ?e .
+ :in $ ?id
+ :where
+ [?e ::yq/id ?id]
+ [?e ::yq/status :init]]
+ (d/db conn)
+ id))
+ m))
+
+
+(defn get-tx-q-job [q-id]
+ (let [{:keys [tx-queue conn]} @yq/*config*]
+ (loop [timeout (+ 3000 (System/currentTimeMillis))]
+ (if-let [job (->> @tx-queue
+ (mapcat transact-result->maps)
+ (filter (partial contains-queue-job? q-id conn))
+ (first))]
+ (u/get-queue-item (d/db conn) (::yq/id job))
+ (if (< (System/currentTimeMillis) timeout)
+ (do (Thread/sleep 10)
+ (recur timeout))
+ nil)))))
+
+(defmacro consume-expect! [queue-name expected-status]
+ `(if-let [job# (get-tx-q-job ~queue-name)]
+ (try
+ (with-bindings (:com.github.ivarref.yoltq/bindings job#)
+ (let [prep# (u/prepare-processing (d/db (:conn @yq/*config*))
+ (:com.github.ivarref.yoltq/id job#)
+ ~queue-name
+ (:com.github.ivarref.yoltq/lock job#)
+ (:com.github.ivarref.yoltq/status job#))]
+ (if-let [depends-on# (i/depends-on-waiting? @yq/*config* prep#)]
+ depends-on#
+ (let [res# (some->> prep#
+ (i/take! @yq/*config*)
+ (i/execute! @yq/*config*))]
+ (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#)
+ (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#)))
+ (if (:retval res#)
+ (:retval res#)
+ (:exception res#))))))
+ (catch Throwable t#
+ (log/error t# "unexpected error in consume-expect:" (ex-message t#))
+ (throw t#)))
+ (test/is false (str "No job found for queue " ~queue-name))))
+
+(defmacro consume! [queue-name]
+ `(consume-expect! ~queue-name :done))
+
+
+(defmacro force-retry! [queue-name]
+ `(if-let [job# (some-> @yq/*config* :prev-consumed deref (get ~queue-name))]
+ (let [db-res# @(d/transact (:conn @yq/*config*) [{:com.github.ivarref.yoltq/id (:com.github.ivarref.yoltq/id job#)
+ :com.github.ivarref.yoltq/status :init}])
+ res# (some->> (u/prepare-processing (:db-after db-res#)
+ (:com.github.ivarref.yoltq/id job#)
+ ~queue-name
+ (:com.github.ivarref.yoltq/lock job#)
+ :init)
+ (i/take! @yq/*config*)
+ (i/execute! @yq/*config*))]
+ (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#)
+ (test/is (= :done (:com.github.ivarref.yoltq/status res#)))
+ (if (:retval res#)
+ (:retval res#)
+ (:exception res#)))
+ (test/is false "Expected to have previously consumed something. Was nil.")))
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
new file mode 100644
index 0000000..9defd0e
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -0,0 +1,179 @@
+(ns com.github.ivarref.yoltq.utils
+ (:require [datomic.api :as d]
+ [clojure.edn :as edn]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [clojure.tools.logging :as log])
+ (:refer-clojure :exclude [random-uuid])
+ (:import (datomic Connection)
+ (java.time Duration)))
+
+
+(def status-init :init)
+(def status-processing :processing)
+(def status-done :done)
+(def status-error :error)
+
+(def current-version "2")
+
+(defn duration->millis [m]
+ (reduce-kv (fn [o k v]
+ (if (instance? Duration v)
+ (assoc o k (.toMillis v))
+ (assoc o k v)))
+ {}
+ m))
+
+
+(defn squuid []
+ (ext/squuid))
+
+
+(defn random-uuid []
+ (ext/random-uuid))
+
+
+(defn now-ms []
+ (ext/now-ms))
+
+
+(defn root-cause [e]
+ (if-let [root (ex-cause e)]
+ (root-cause root)
+ e))
+
+
+(defn db-error-map [^Throwable t]
+ (loop [e t]
+ (cond (nil? e) nil
+
+ (and (map? (ex-data e))
+ (contains? (ex-data e) :db/error))
+ (ex-data e)
+
+ :else
+ (recur (ex-cause e)))))
+
+
+(defn get-queue-item [db id]
+ (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id])
+ (dissoc :db/id)
+ (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {})))
+ (update :com.github.ivarref.yoltq/bindings
+ (fn [s]
+ (when s
+ (->> s
+ (edn/read-string)
+ (reduce-kv (fn [o k v]
+ (assoc o (resolve k) v))
+ {})))))))
+
+
+(defn prepare-processing [db id queue-name old-lock old-status]
+ (let [new-lock (random-uuid)]
+ {:id id
+ :lock new-lock
+ :queue-name queue-name
+ :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {})
+ :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing]
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]}))
+
+
+(defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name]
+ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
+ (str (if (nil? conn) "nil" conn))
+ "\nConfig was: " (str cfg)))
+ (let [db (or db (d/db conn))]
+ (if-let [ids (->> (d/q '[:find ?id ?lock
+ :in $ ?queue-name ?backoff ?current-version
+ :where
+ [?e :com.github.ivarref.yoltq/status :init]
+ [?e :com.github.ivarref.yoltq/queue-name ?queue-name]
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]
+ [(>= ?backoff ?init-time)]
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [?e :com.github.ivarref.yoltq/lock ?lock]
+ [?e :com.github.ivarref.yoltq/version ?current-version]]
+ db
+ queue-name
+ (- (now-ms) init-backoff-time)
+ current-version)
+ (not-empty))]
+ (let [[id old-lock] (rand-nth (into [] ids))]
+ (prepare-processing db id queue-name old-lock :init))
+ (log/debug "no new-items in :init status for queue" queue-name))))
+
+(defn- get-max-retries [cfg queue-name]
+ (let [v (get-in cfg [:handlers queue-name :max-retries] (:max-retries cfg))]
+ (if (and (number? v) (pos-int? v))
+ v
+ Long/MAX_VALUE)))
+
+(defn get-error [{:keys [conn db error-backoff-time] :as cfg} queue-name]
+ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
+ (str (if (nil? conn) "nil" conn))
+ "\nConfig was: " (str cfg)))
+ (let [db (or db (d/db conn))
+ max-retries (get-max-retries cfg queue-name)]
+ (when-let [ids (->> (d/q '[:find ?id ?lock
+ :in $ ?queue-name ?backoff ?max-tries ?current-version
+ :where
+ [?e :com.github.ivarref.yoltq/status :error]
+ [?e :com.github.ivarref.yoltq/queue-name ?queue-name]
+ [?e :com.github.ivarref.yoltq/error-time ?time]
+ [(>= ?backoff ?time)]
+ [?e :com.github.ivarref.yoltq/tries ?tries]
+ [(>= ?max-tries ?tries)]
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [?e :com.github.ivarref.yoltq/lock ?lock]
+ [?e :com.github.ivarref.yoltq/version ?current-version]]
+ db
+ queue-name
+ (- (now-ms) error-backoff-time)
+ max-retries
+ current-version)
+ (not-empty))]
+ (let [[id old-lock] (rand-nth (into [] ids))]
+ (prepare-processing db id queue-name old-lock :error)))))
+
+
+(defn get-hung [{:keys [conn db now hung-backoff-time] :as cfg} queue-name]
+ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
+ (str (if (nil? conn) "nil" conn))
+ "\nConfig was: " (str cfg)))
+ (let [now (or now (now-ms))
+ max-retries (get-max-retries cfg queue-name)
+ db (or db (d/db conn))]
+ (when-let [ids (->> (d/q '[:find ?id ?lock ?tries
+ :in $ ?qname ?backoff ?current-version
+ :where
+ [?e :com.github.ivarref.yoltq/status :processing]
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/processing-time ?time]
+ [(>= ?backoff ?time)]
+ [?e :com.github.ivarref.yoltq/tries ?tries]
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [?e :com.github.ivarref.yoltq/lock ?lock]
+ [?e :com.github.ivarref.yoltq/version ?current-version]]
+ db
+ queue-name
+ (- now hung-backoff-time)
+ current-version)
+ (not-empty))]
+ (let [new-lock (random-uuid)
+ [id old-lock tries _t] (rand-nth (into [] ids))
+ to-error? (>= tries max-retries)]
+ {:id id
+ :lock new-lock
+ :queue-name queue-name
+ :was-hung? true
+ :to-error? to-error?
+ :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {})
+ :tx (if (not to-error?)
+ [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}]
+ [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status status-processing status-error]
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}])}))))
diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj
new file mode 100644
index 0000000..4d92b81
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/error_poller_test.clj
@@ -0,0 +1,35 @@
+(ns com.github.ivarref.yoltq.error-poller-test
+ (:require [clojure.edn :as edn]
+ [clojure.test :refer [deftest is]]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.error-poller :as ep]
+ [com.github.ivarref.yoltq.log-init :as logconfig]))
+
+
+(deftest error-poller
+ (logconfig/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"*"} (edn/read-string
+ (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]])
+ (let [cfg {:system-error-callback-backoff 100}
+ time (atom 0)
+ tick! (fn [& [amount]]
+ (swap! time + (or amount 1)))
+ verify (fn [state now-ns error-count expected-callback]
+ (let [{:keys [errors state run-callback] :as res} (ep/handle-error-count state cfg now-ns error-count)]
+ (log/info errors "=>" state "::" run-callback)
+ (is (= expected-callback run-callback))
+ res))]
+ (-> {}
+ (verify (tick!) 0 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 :error)
+ (verify (tick! 100) 0 nil)
+ (verify (tick!) 0 :error)
+ (verify (tick!) 0 :recovery)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 :error)
+ (verify (tick! 100) 1 nil)
+ (verify (tick!) 1 :error))))
diff --git a/test/com/github/ivarref/yoltq/http_hang_demo.clj b/test/com/github/ivarref/yoltq/http_hang_demo.clj
new file mode 100644
index 0000000..06d877b
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/http_hang_demo.clj
@@ -0,0 +1,45 @@
+(ns com.github.ivarref.yoltq.http-hang-demo
+ (:require [datomic.api :as d]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.log-init])
+ (:import (java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers)))
+
+(comment
+ (do
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq*"} :debug]
+ [#{"*"} :info]])
+ (yq/stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://hello-world-" (java.util.UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)]
+ (init! {:conn conn
+ :error-backoff-time (Duration/ofSeconds 5)
+ :poll-delay 5
+ :system-error-poll-interval [1 TimeUnit/MINUTES]
+ :system-error-callback-backoff (Duration/ofHours 1)
+ :max-execute-time (Duration/ofSeconds 3)
+ :on-system-error (fn [] (log/error "system in error state"))
+ :on-system-recovery (fn [] (log/info "system recovered"))})
+ (yq/add-consumer! :slow (fn [_payload]
+ (log/info "start slow handler...")
+ ; sudo tc qdisc del dev wlp3s0 root netem
+ ; sudo tc qdisc add dev wlp3s0 root netem delay 10000ms
+ ; https://jvns.ca/blog/2017/04/01/slow-down-your-internet-with-tc/
+ (let [request (-> (HttpRequest/newBuilder)
+ (.uri (java.net.URI. "https://postman-echo.com/get"))
+ (.timeout (Duration/ofSeconds 10))
+ (.GET)
+ (.build))]
+ (log/info "body is:" (-> (.send (HttpClient/newHttpClient) request (HttpResponse$BodyHandlers/ofString))
+ (.body))))
+ (log/info "slow handler is done!")))
+ (yq/start!)
+ @(d/transact conn [(put :slow {:work 123})])
+ #_(dotimes [x 1] @(d/transact conn [(put :q {:work x})]))
+ nil)))) \ No newline at end of file
diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj
new file mode 100644
index 0000000..7eae557
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/log_init.clj
@@ -0,0 +1,66 @@
+(ns com.github.ivarref.yoltq.log-init
+ (:require [clojure.term.colors :as colors]
+ [taoensso.timbre :as timbre]
+ [clojure.string :as str]))
+
+(set! *warn-on-reflection* true)
+
+(def level-colors
+ {;:warn colors/red
+ :error colors/red})
+
+(def ^:dynamic *override-color* nil)
+
+(defn min-length [n s]
+ (loop [s s]
+ (if (>= (count s) n)
+ s
+ (recur (str s " ")))))
+
+(defn local-console-format-fn
+ "A simpler log format, suitable for readable logs during development. colorized stacktraces"
+ [data]
+ (try
+ (let [{:keys [level ?err msg_ ?ns-str ?file hostname_
+ timestamp_ ?line context]} data
+ ts (force timestamp_)]
+ (let [color-f (if (nil? *override-color*)
+ (get level-colors level str)
+ colors/green)
+ maybe-stacktrace (when ?err
+ (str "\n" (timbre/stacktrace ?err)))]
+ (cond-> (str #_(str ?ns-str ":" ?line)
+ #_(min-length (count "[yoltq:326] ")
+ (str
+ "["
+ (some-> ?ns-str
+ (str/split #"\.")
+ (last))
+ ":" ?line))
+ ts
+ " "
+ (color-f (min-length 5 (str/upper-case (name level))))
+ " "
+
+ (when-let [x-req-id (:x-request-id context)]
+ (str "[" x-req-id "] "))
+ #_(.getName ^Thread (Thread/currentThread))
+
+ (color-f (force msg_))
+
+ maybe-stacktrace))))
+
+
+ (catch Throwable t
+ (println "error in local-console-format-fn:" (ex-message t))
+ nil)))
+
+
+(defn init-logging! [min-levels]
+ (timbre/merge-config!
+ {:min-level min-levels
+ :timestamp-opts {:pattern "HH:mm:ss.SSS"
+ :timezone :jvm-default}
+ :output-fn (fn [data] (local-console-format-fn data))
+ :appenders {:println (timbre/println-appender {:stream :std-out})}}))
+
diff --git a/test/com/github/ivarref/yoltq/migrate_test.clj b/test/com/github/ivarref/yoltq/migrate_test.clj
new file mode 100644
index 0000000..0063631
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/migrate_test.clj
@@ -0,0 +1,92 @@
+(ns com.github.ivarref.yoltq.migrate-test
+ (:require [clojure.test :refer [deftest is]]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.migrate :as m]
+ [com.github.ivarref.yoltq.impl :as impl]
+ [com.github.ivarref.yoltq.test-utils :as tu]
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d]))
+
+
+(deftest to-v2-migration
+ (with-bindings {#'ext/*squuid-atom* (atom 0)}
+ (let [conn (tu/empty-conn)]
+ @(d/transact conn impl/schema)
+ @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid)
+ :com.github.ivarref.yoltq/queue-name :dummy
+ :com.github.ivarref.yoltq/status u/status-processing
+ :com.github.ivarref.yoltq/init-time 1
+ :com.github.ivarref.yoltq/processing-time 2}])
+ @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid)
+ :com.github.ivarref.yoltq/queue-name :dummy
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/init-time 3}])
+ (is (= [[[:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000001"]
+ :com.github.ivarref.yoltq/version
+ nil
+ "2"]
+ [:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000001"]
+ :com.github.ivarref.yoltq/init-time
+ 1
+ 1000]
+ [:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000001"]
+ :com.github.ivarref.yoltq/processing-time
+ 2
+ 1000]]
+ [[:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000002"]
+ :com.github.ivarref.yoltq/version
+ nil
+ "2"]
+ [:db/cas
+ [:com.github.ivarref.yoltq/id
+ #uuid "00000000-0000-0000-0000-000000000002"]
+ :com.github.ivarref.yoltq/init-time
+ 3
+ 1000]]]
+ (m/migrate! {:conn conn
+ :now-ms 1000
+ :loop? true})))
+ (is (= []
+ (m/migrate! {:conn conn
+ :now-ms 1000
+ :loop? true}))))))
+
+
+(deftest to-v2-migration-real-time
+ (with-bindings {#'ext/*squuid-atom* (atom 0)}
+ (let [conn (tu/empty-conn)
+ id (u/squuid)]
+ @(d/transact conn impl/schema)
+ @(d/transact conn [{:com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name :dummy
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/init-time 1}])
+ (Thread/sleep 100)
+ @(d/transact conn [{:com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/init-time 2}])
+ (let [tx-times (->> (d/q '[:find [?txinst ...]
+ :in $ ?e
+ :where
+ [?e :com.github.ivarref.yoltq/init-time _ ?tx true]
+ [?tx :db/txInstant ?txinst]]
+ (d/history (d/db conn))
+ [:com.github.ivarref.yoltq/id id])
+ (sort)
+ (vec))]
+ (is (= 2 (count tx-times)))
+ (m/migrate! {:conn conn})
+ (is (= (.getTime (last tx-times))
+ (d/q '[:find ?init-time .
+ :in $ ?e
+ :where
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]]
+ (d/db conn)
+ [:com.github.ivarref.yoltq/id id])))))))
diff --git a/test/com/github/ivarref/yoltq/readme_demo.clj b/test/com/github/ivarref/yoltq/readme_demo.clj
new file mode 100644
index 0000000..eae0a3e
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/readme_demo.clj
@@ -0,0 +1,48 @@
+(ns com.github.ivarref.yoltq.readme-demo
+ (:require [clojure.tools.logging :as log]
+ [datomic.api :as d]
+ [com.github.ivarref.yoltq :as yq])
+ (:import (java.util UUID)))
+
+
+(defonce conn
+ (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (d/connect uri)))
+
+
+(com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq*"} :debug]
+ [#{"*"} :info]])
+
+
+(yq/stop!)
+
+(yq/init! {:conn conn})
+
+
+(yq/add-consumer! :q
+ (let [cnt (atom 0)]
+ (fn [payload]
+ (when (= 1 (swap! cnt inc))
+ (throw (ex-info "failed" {})))
+ (log/info "got payload" payload))))
+
+(yq/start!)
+
+@(d/transact conn [(yq/put :q {:work 123})])
+
+(comment
+ (yq/add-consumer! :q (fn [_] (throw (ex-info "always fail" {})))))
+
+(comment
+ @(d/transact conn [(yq/put :q {:work 123})]))
+
+(comment
+ (do
+ (yq/add-consumer! :q (fn [_] :ok))
+ nil))
diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj
new file mode 100644
index 0000000..0c1b2f0
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/test_utils.clj
@@ -0,0 +1,80 @@
+(ns com.github.ivarref.yoltq.test-utils
+ (:require [com.github.ivarref.yoltq.log-init :as logconfig]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq :as yq]
+ [datomic.api :as d]
+ [clojure.string :as str]
+ [com.github.ivarref.yoltq.impl :as i]
+ [clojure.edn :as edn]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [clojure.pprint :as pp])
+ (:import (java.util UUID)
+ (java.time Duration)))
+
+
+(logconfig/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"*"} (edn/read-string
+ (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]])
+
+
+(defn empty-conn []
+ (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (d/connect uri)))
+
+
+(defn break []
+ (log/info (str/join "*" (repeat 60 ""))))
+
+
+(defn clear []
+ (.print System/out "\033[H\033[2J")
+ (.flush System/out)
+ (break))
+
+
+(defn put-transact! [id payload]
+ @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload {})]))
+
+
+(defn advance! [tp]
+ (assert (some? ext/*now-ms-atom*) "Expected to be running in test-mode!")
+ (swap! ext/*now-ms-atom* + (if (number? tp)
+ tp
+ (.toMillis ^Duration tp))))
+
+
+(defn done-count []
+ (d/q '[:find (count ?e) .
+ :where
+ [?e :com.github.ivarref.yoltq/id _]
+ [?e :com.github.ivarref.yoltq/status :done]]
+ (d/db (:conn @yq/*config*))))
+
+
+(defn pp [x]
+ (pp/pprint x)
+ x)
+
+(defn get-init [& args]
+ (apply u/get-init @yq/*config* args))
+
+
+(defn get-error [& args]
+ (apply u/get-error @yq/*config* args))
+
+
+(defn get-hung [& args]
+ (apply u/get-hung @yq/*config* args))
+
+
+(defn take! [& args]
+ (apply i/take! @yq/*config* args))
+
+
+(defn execute! [& args]
+ (apply i/execute! @yq/*config* args))
+
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
new file mode 100644
index 0000000..a2ed269
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -0,0 +1,474 @@
+(ns com.github.ivarref.yoltq.virtual-test
+ (:require
+ [clojure.string :as str]
+ [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.error-poller :as error-poller]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq.migrate :as migrate]
+ [com.github.ivarref.yoltq.test-queue :as tq]
+ [com.github.ivarref.yoltq.test-utils :as u]
+ [com.github.ivarref.yoltq.utils :as uu]
+ [datomic-schema.core]
+ [datomic.api :as d]
+ [taoensso.nippy :as nippy]
+ [taoensso.timbre :as timbre])
+ (:import (java.time Duration LocalDateTime)))
+
+
+(use-fixtures :each tq/call-with-virtual-queue!)
+
+
+(deftest happy-case-1
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (is (= {:work 123} (tq/consume! :q)))))
+
+(deftest happy-case-no-migration-for-new-entities
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (is (= {:work 123} (tq/consume! :q)))
+ (is (= [] (migrate/migrate! @yq/*config*)))))
+
+(deftest happy-case-tx-report-q
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (is (= {:work 123} (:retval (tq/run-one-report-queue!))))
+ (is (= 1 (u/done-count)))))
+
+
+(deftest happy-case-poller
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :handlers {:q {:f (fn [payload] payload)}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (= {:work 123} (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :retval)))))
+
+
+(deftest happy-case-queue-fn-allow-cas-failure
+ (let [conn (u/empty-conn)
+ invoke-count (atom 0)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q
+ (fn [{:keys [id]}]
+ (swap! invoke-count inc)
+ @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]]))
+ {:allow-cas-failure? #{:e/val}})
+ @(d/transact conn #d/schema [[:e/id :one :string :id]
+ [:e/val :one :string]])
+ @(d/transact conn [{:e/id "demo"}
+ (yq/put :q {:id "demo"})])
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (is (nil? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!))))
+ (swap! yq/*config* dissoc :mark-status-fn!)
+
+ ; (mark-status! :done) failed but f succeeded.
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+ (is (some? (u/get-hung :q)))
+ (is (true? (some->> (u/get-hung :q)
+ (u/take!)
+ (u/execute!)
+ :allow-cas-failure?)))
+ (is (= 2 @invoke-count))))
+
+
+(deftest backoff-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (is (nil? (u/get-init :q)))
+
+ (u/advance! (dec (:init-backoff-time yq/default-opts)))
+ (is (nil? (u/get-init :q)))
+ (u/advance! 1)
+ (is (some? (u/get-init :q)))
+
+ (is (some? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :exception)))
+
+ (u/advance! (dec (:error-backoff-time @yq/*config*)))
+ (is (nil? (u/get-error :q)))
+ (u/advance! 1)
+ (is (some? (u/get-error :q)))))
+
+
+(deftest get-hung-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (is (= :processing (some->> (u/get-init :q)
+ (u/take!)
+ :com.github.ivarref.yoltq/status)))
+
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (dec (:hung-backoff-time yq/default-opts)))
+ (is (nil? (u/get-hung :q)))
+ (u/advance! 1)
+ (is (some? (u/get-hung :q)))))
+
+
+(deftest basic-locking
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :cas-failures (atom 0)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (let [job (u/get-init :q)]
+ (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status)))
+ (u/take! job)
+ (is (= 1 @(:cas-failures @yq/*config*))))))
+
+
+(deftest retry-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f
+ (let [c (atom 0)]
+ (fn [_]
+ (if (<= (swap! c inc) 2)
+ (throw (ex-info "janei" {}))
+ ::ok)))}}})
+ (u/put-transact! :q {:work 123})
+
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))))
+
+
+(deftest max-retries-test
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (yq/init! {:conn conn
+ :error-backoff-time 0})
+ (yq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 1})
+ (tq/put! :q {:work 123})
+ (is (some? (:exception (tq/run-one-report-queue!))))
+
+ (dotimes [_ 10]
+ (tq/run-queue-once! :q :error))
+ (is (= 2 @call-count))))
+
+
+(deftest max-retries-test-two
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (yq/init! {:conn conn
+ :error-backoff-time 0})
+ (yq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 3})
+ (tq/put! :q {:work 123})
+ (is (some? (:exception (tq/run-one-report-queue!))))
+
+ (timbre/with-level :fatal
+ (dotimes [_ 20]
+ (tq/run-queue-once! :q :error)))
+ (is (= 4 @call-count))))
+
+
+(deftest hung-to-error
+ (let [conn (u/empty-conn)
+ call-count (atom 0)
+ missed-mark-status (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q
+ (fn [_]
+ (if (= 1 (swap! call-count inc))
+ (throw (ex-info "error" {}))
+ (log/info "return OK"))))
+ (tq/put! :q {:id "demo"})
+ (tq/run-one-report-queue!) ; now in status :error
+
+
+ (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (reset! missed-mark-status new-status)
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (tq/run-queue-once! :q :error)
+ (swap! yq/*config* dissoc :mark-status-fn!)
+ (is (= :done @missed-mark-status))
+
+ (is (nil? (uu/get-hung @yq/*config* :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+
+ (is (some? (uu/get-hung @yq/*config* :q)))
+
+ (is (= 2 @call-count))
+
+ (is (true? (some->> (uu/get-hung (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q)
+ (i/take! @yq/*config*)
+ (i/execute! @yq/*config*)
+ :failed?)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (uu/get-error @yq/*config* :q)))
+ (is (nil? (uu/get-error (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q)))))
+
+
+(deftest consume-expect-test
+ (let [conn (u/empty-conn)
+ seen (atom #{})]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [payload]
+ (when (= #{1 2} (swap! seen conj payload))
+ (throw (ex-info "oops" {})))
+ payload))
+
+ @(d/transact conn [(yq/put :q 1)])
+ @(d/transact conn [(yq/put :q 2)])
+
+ (is (= 1 (tq/consume-expect! :q :done)))
+ (tq/consume-expect! :q :error)))
+
+
+(def ^:dynamic *some-binding* nil)
+
+
+(deftest binding-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn
+ :capture-bindings [#'*some-binding* #'timbre/*context*]})
+ (yq/add-consumer! :q (fn [_] *some-binding*))
+ (binding [timbre/*context* {:x-request-id "wooho"}]
+ (binding [*some-binding* 1]
+ @(d/transact conn [(yq/put :q nil)]))
+ (binding [*some-binding* 2]
+ @(d/transact conn [(yq/put :q nil)]))
+ @(d/transact conn [(yq/put :q nil)]))
+
+ (is (= 1 (tq/consume-expect! :q :done)))
+ (is (= 2 (tq/consume-expect! :q :done)))
+ (is (nil? (tq/consume-expect! :q :done)))))
+
+
+(deftest default-binding-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*)))
+ (binding [timbre/*context* {:x-request-id "123"}]
+ @(d/transact conn [(yq/put :q nil)]))
+ (is (= "123" (tq/consume-expect! :q :done)))))
+
+
+(deftest force-retry-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (let [cnt (atom 0)]
+ (fn [_] (swap! cnt inc))))
+ @(d/transact conn [(yq/put :q nil)])
+ (is (= 1 (tq/consume! :q)))
+ (is (= 2 (tq/force-retry! :q)))
+ (is (= 3 (tq/force-retry! :q)))))
+
+
+(deftest ext-id-no-duplicates
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity)
+ @(d/transact conn [(yq/put :q nil {:id "123"})])
+ (is (thrown? Exception @(d/transact conn [(yq/put :q nil {:id "123"})])))))
+
+
+(deftest depends-on
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :a identity)
+ (yq/add-consumer! :b identity)
+ @(d/transact conn [(yq/put :a {:id "a1"} {:id "a1"})])
+ @(d/transact conn [(yq/put :b {:id "b1"} {:depends-on [:a "a1"]})])
+
+ ; can't consume :b yet:
+ (is (= {:depends-on [:a "a1"]} (tq/consume! :b)))
+
+ (is (= {:id "a1"} (tq/consume! :a)))
+ (is (= {:id "b1"} (tq/consume! :b)))))
+
+
+(deftest depends-on-queue-level
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :a identity)
+ (yq/add-consumer! :b identity {:depends-on (fn [{:keys [id]}] [:a id])})
+ @(d/transact conn [(yq/put :a {:id "1"} {:id "1"})])
+ @(d/transact conn [(yq/put :b {:id "1"})])
+
+ ; can't consume :b yet:
+ (is (= {:depends-on [:a "1"]} (tq/consume! :b)))
+
+ (is (= {:id "1"} (tq/consume! :a)))
+ (is (= {:id "1"} (tq/consume! :b)))))
+
+
+(deftest verify-can-read-string
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :a identity)
+ (timbre/with-level :fatal
+ (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})]))))))
+
+
+(deftest payload-verifier
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q identity
+ {:valid-payload? (fn [{:keys [id]}]
+ (some? id))})
+ @(d/transact conn [(yq/put :q {:id "a"})])
+ (timbre/with-level :fatal
+ (is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))
+
+
+(defn my-consumer
+ {:yoltq/queue-id :some-q}
+ [state payload]
+ (swap! state conj payload))
+
+(deftest queue-id-can-be-var
+ (let [conn (u/empty-conn)
+ received (atom #{})]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! #'my-consumer (partial my-consumer received))
+ @(d/transact conn [(yq/put #'my-consumer {:id "a"})])
+ (tq/consume! :some-q)
+ (is (= #{{:id "a"}} @received))
+ #_(timbre/with-level :fatal
+ (is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))
+
+(deftest healthy-allowed-error-time-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [_] (throw (ex-info "" {}))))
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume-expect! :q :error)
+ (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms))))
+ (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms)))))
+ (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms)))))))
+
+(deftest global-encode-decode
+ (let [conn (u/empty-conn)
+ ldt (LocalDateTime/now)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :encode nippy/freeze
+ :decode nippy/thaw})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q {:work ldt})])
+ (tq/consume! :q)
+ (is (= @got-work {:work ldt}))))
+
+(deftest queue-encode-decode
+ (let [conn (u/empty-conn)
+ ldt (LocalDateTime/now)
+ got-work (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work))
+ {:encode nippy/freeze
+ :decode nippy/thaw})
+ @(d/transact conn [(yq/put :q {:work ldt})])
+ (tq/consume! :q)
+ (is (= @got-work {:work ldt}))))
+
+(deftest global-partition
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :partition-fn (fn [_queue-name] :my-part)})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume! :q)
+ (is (some? (d/q '[:find ?e .
+ :in $ ?part
+ :where
+ [?e :db/ident ?part]]
+ (d/db conn)
+ :my-part)))
+ (is (= @got-work {:work 123}))))
+
+(deftest partition-per-queue
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work))
+ {:partition-fn (fn [_queue-name] :my-part)})
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume! :q)
+ (is (some? (d/q '[:find ?e .
+ :in $ ?part
+ :where
+ [?e :db/ident ?part]]
+ (d/db conn)
+ :my-part)))
+ (is (= @got-work {:work 123}))))
+
+(deftest string-encode-decode
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :encode (fn [x] (str/join (reverse x)))
+ :decode (fn [x] (str/join (reverse x)))})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q "asdf")])
+ (tq/consume! :q)
+ (is (= @got-work "asdf"))))
+
+(deftest job-group-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q1 identity)
+ (yq/add-consumer! :q2 identity)
+ @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1})
+ (yq/put :q1 {:work 456} {:job-group :group2})
+ (yq/put :q2 {:work 789} {:job-group :group1})])
+ (is (= [{:qname :q1
+ :job-group :group1
+ :status :init
+ :count 1}]
+ (yq/job-group-progress :q1 :group1)))
+
+ (is (= {:work 123} (tq/consume! :q1)))
+
+ (is (= [{:qname :q1
+ :job-group :group1
+ :status :done
+ :count 1}]
+ (yq/job-group-progress :q1 :group1)))))