diff options
23 files changed, 3954 insertions, 0 deletions
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..1350854 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,52 @@ +name: Tag and Release +on: workflow_dispatch + +jobs: + tag-and-release: + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + with: + # NOTE: Fetch all for counting commits + fetch-depth: 0 + - uses: actions/setup-java@v3 + with: + distribution: 'adopt' + java-version: 21 + - uses: DeLaGuardo/setup-clojure@13.4 + with: + cli: 1.12.0.1530 + + - name: Show versions + run: | + java -version + clojure --version + + - name: deploy to clojars + # NOTE: Specify ID to refer outputs from other steps + id: deploy + run: | + clojure -T:build deploy + env: + CLOJARS_PASSWORD: ${{secrets.CLOJARS_PASSWORD}} + CLOJARS_USERNAME: ${{secrets.CLOJARS_USERNAME}} + + - uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + # NOTE: Refer outputs + tag_name: ${{ steps.deploy.outputs.version }} + release_name: ${{ steps.deploy.outputs.version }} + body: released + draft: false + prerelease: false + + - run: | + clojure -T:build update-documents + git diff + git config --global user.email "github-actions@example.com" + git config --global user.name "github-actions" + git add -A + git commit -m "Update for release" + git push
\ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..707e1be --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +.idea/ +*.iml +.cpcache/ +.nrepl-port +target/ +.connkey +tree.txt +.db.url +.stage-url.txt +*.pom.asc +*.pom +temp/ +.clj-kondo/ +.rebel_readline_history
\ No newline at end of file @@ -0,0 +1,277 @@ +Eclipse Public License - v 2.0 + + THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE + PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION + OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + + a) in the case of the initial Contributor, the initial content + Distributed under this Agreement, and + + b) in the case of each subsequent Contributor: + i) changes to the Program, and + ii) additions to the Program; + where such changes and/or additions to the Program originate from + and are Distributed by that particular Contributor. A Contribution + "originates" from a Contributor if it was added to the Program by + such Contributor itself or anyone acting on such Contributor's behalf. + Contributions do not include changes or additions to the Program that + are not Modified Works. + +"Contributor" means any person or entity that Distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which +are necessarily infringed by the use or sale of its Contribution alone +or when combined with the Program. + +"Program" means the Contributions Distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement +or any Secondary License (as applicable), including Contributors. + +"Derivative Works" shall mean any work, whether in Source Code or other +form, that is based on (or derived from) the Program and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. + +"Modified Works" shall mean any work in Source Code or other form that +results from an addition to, deletion from, or modification of the +contents of the Program, including, for purposes of clarity any new file +in Source Code form that contains any contents of the Program. Modified +Works shall not include works that contain only declarations, +interfaces, types, classes, structures, or files of the Program solely +in each case in order to link to, bind by name, or subclass the Program +or Modified Works thereof. + +"Distribute" means the acts of a) distributing or b) making available +in any manner that enables the transfer of a copy. + +"Source Code" means the form of a Program preferred for making +modifications, including but not limited to software source code, +documentation source, and configuration files. + +"Secondary License" means either the GNU General Public License, +Version 2.0, or any later versions of that license, including any +exceptions or additional permissions as identified by the initial +Contributor. + +2. GRANT OF RIGHTS + + a) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free copyright + license to reproduce, prepare Derivative Works of, publicly display, + publicly perform, Distribute and sublicense the Contribution of such + Contributor, if any, and such Derivative Works. + + b) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free patent + license under Licensed Patents to make, use, sell, offer to sell, + import and otherwise transfer the Contribution of such Contributor, + if any, in Source Code or other form. This patent license shall + apply to the combination of the Contribution and the Program if, at + the time the Contribution is added by the Contributor, such addition + of the Contribution causes such combination to be covered by the + Licensed Patents. The patent license shall not apply to any other + combinations which include the Contribution. No hardware per se is + licensed hereunder. + + c) Recipient understands that although each Contributor grants the + licenses to its Contributions set forth herein, no assurances are + provided by any Contributor that the Program does not infringe the + patent or other intellectual property rights of any other entity. + Each Contributor disclaims any liability to Recipient for claims + brought by any other entity based on infringement of intellectual + property rights or otherwise. As a condition to exercising the + rights and licenses granted hereunder, each Recipient hereby + assumes sole responsibility to secure any other intellectual + property rights needed, if any. For example, if a third party + patent license is required to allow Recipient to Distribute the + Program, it is Recipient's responsibility to acquire that license + before distributing the Program. + + d) Each Contributor represents that to its knowledge it has + sufficient copyright rights in its Contribution, if any, to grant + the copyright license set forth in this Agreement. + + e) Notwithstanding the terms of any Secondary License, no + Contributor makes additional grants to any Recipient (other than + those set forth in this Agreement) as a result of such Recipient's + receipt of the Program under the terms of a Secondary License + (if permitted under the terms of Section 3). + +3. REQUIREMENTS + +3.1 If a Contributor Distributes the Program in any form, then: + + a) the Program must also be made available as Source Code, in + accordance with section 3.2, and the Contributor must accompany + the Program with a statement that the Source Code for the Program + is available under this Agreement, and informs Recipients how to + obtain it in a reasonable manner on or through a medium customarily + used for software exchange; and + + b) the Contributor may Distribute the Program under a license + different than this Agreement, provided that such license: + i) effectively disclaims on behalf of all other Contributors all + warranties and conditions, express and implied, including + warranties or conditions of title and non-infringement, and + implied warranties or conditions of merchantability and fitness + for a particular purpose; + + ii) effectively excludes on behalf of all other Contributors all + liability for damages, including direct, indirect, special, + incidental and consequential damages, such as lost profits; + + iii) does not attempt to limit or alter the recipients' rights + in the Source Code under section 3.2; and + + iv) requires any subsequent distribution of the Program by any + party to be under a license that satisfies the requirements + of this section 3. + +3.2 When the Program is Distributed as Source Code: + + a) it must be made available under this Agreement, or if the + Program (i) is combined with other material in a separate file or + files made available under a Secondary License, and (ii) the initial + Contributor attached to the Source Code the notice described in + Exhibit A of this Agreement, then the Program may be made available + under the terms of such Secondary Licenses, and + + b) a copy of this Agreement must be included with each copy of + the Program. + +3.3 Contributors may not remove or alter any copyright, patent, +trademark, attribution notices, disclaimers of warranty, or limitations +of liability ("notices") contained within the Program from any copy of +the Program which they Distribute, provided that Contributors may add +their own appropriate notices. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities +with respect to end users, business partners and the like. While this +license is intended to facilitate the commercial use of the Program, +the Contributor who includes the Program in a commercial product +offering should do so in a manner which does not create potential +liability for other Contributors. Therefore, if a Contributor includes +the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and indemnify every +other Contributor ("Indemnified Contributor") against any losses, +damages and costs (collectively "Losses") arising from claims, lawsuits +and other legal actions brought by a third party against the Indemnified +Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program +in a commercial product offering. The obligations in this section do not +apply to any claims or Losses relating to any actual or alleged +intellectual property infringement. In order to qualify, an Indemnified +Contributor must: a) promptly notify the Commercial Contributor in +writing of such claim, and b) allow the Commercial Contributor to control, +and cooperate with the Commercial Contributor in, the defense and any +related settlement negotiations. The Indemnified Contributor may +participate in any such claim at its own expense. + +For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those performance +claims and warranties, and if a court requires any other Contributor to +pay any damages as a result, the Commercial Contributor must pay +those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS" +BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR +IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF +TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR +PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all +risks associated with its exercise of rights under this Agreement, +including but not limited to the risks and costs of program errors, +compliance with applicable laws, damage to or loss of data, programs +or equipment, and unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS +SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST +PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further +action by the parties hereto, such provision shall be reformed to the +minimum extent necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other software +or hardware) infringes such Recipient's patent(s), then such Recipient's +rights granted under Section 2(b) shall terminate as of the date such +litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of +time after becoming aware of such noncompliance. If all Recipient's +rights under this Agreement terminate, Recipient agrees to cease use +and distribution of the Program as soon as reasonably practicable. +However, Recipient's obligations under this Agreement and any licenses +granted by Recipient relating to the Program shall continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, +but in order to avoid inconsistency the Agreement is copyrighted and +may only be modified in the following manner. The Agreement Steward +reserves the right to publish new versions (including revisions) of +this Agreement from time to time. No one other than the Agreement +Steward has the right to modify this Agreement. The Eclipse Foundation +is the initial Agreement Steward. The Eclipse Foundation may assign the +responsibility to serve as the Agreement Steward to a suitable separate +entity. Each new version of the Agreement will be given a distinguishing +version number. The Program (including Contributions) may always be +Distributed subject to the version of the Agreement under which it was +received. In addition, after a new version of the Agreement is published, +Contributor may elect to Distribute the Program (including its +Contributions) under the new version. + +Except as expressly stated in Sections 2(a) and 2(b) above, Recipient +receives no rights or licenses to the intellectual property of any +Contributor under this Agreement, whether expressly, by implication, +estoppel or otherwise. All rights in the Program not expressly granted +under this Agreement are reserved. Nothing in this Agreement is intended +to be enforceable by any entity that is not a Contributor or Recipient. +No third-party beneficiary rights are created under this Agreement. + +Exhibit A - Form of Secondary Licenses Notice + +"This Source Code may also be made available under the following +Secondary Licenses when the conditions for such availability set forth +in the Eclipse Public License, v. 2.0 are satisfied: {name license(s), +version(s), and exceptions or additional permissions here}." + + Simply including a copy of this Agreement, including this Exhibit A + is not sufficient to license the Source Code under Secondary Licenses. + + If it is not possible or desirable to put the notice in a particular + file, then You may include the notice in a location (such as a LICENSE + file in a relevant directory) where a recipient would be likely to + look for such a notice. + + You may add additional accurate notices of copyright ownership. diff --git a/README.md b/README.md new file mode 100644 index 0000000..b64ed12 --- /dev/null +++ b/README.md @@ -0,0 +1,769 @@ +# yoltq + +An opinionated Datomic queue for building (more) reliable systems. +Implements the +[transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html) +pattern. +Supports retries, backoff, ordering and more. +On-prem only. + +## Installation + +[](https://clojars.org/com.github.ivarref/yoltq) + +## 1-minute example + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) + +(def conn (datomic.api/connect "...")) + +; Initialize system +(yq/init! {:conn conn}) + +; Add a queue consumer that will intentionally fail on the first attempt +(yq/add-consumer! :q + (let [cnt (atom 0)] + (fn [payload] + (when (= 1 (swap! cnt inc)) + ; A consumer throwing an exception is considered a queue job failure + (throw (ex-info "failed" {}))) + ; Anything else than a throwing exception is considered a queue job success + ; This includes nil, false and everything else. + (log/info "got payload" payload)))) + +; Start threadpool that picks up queue jobs +(yq/start!) + +; Queue a job +@(d/transact conn [(yq/put :q {:work 123})]) + +; On your console you will see something like this: +; 17:29:54.598 DEBUG queue item 613... for queue :q is pending status :init +; 17:29:54.602 DEBUG queue item 613... for queue :q now has status :processing +; 17:29:54.603 DEBUG queue item 613... for queue :q is now processing +; 17:29:54.605 WARN queue-item 613... for queue :q now has status :error after 1 try in 4.8 ms +; 17:29:54.607 WARN error message was: "failed" for queue-item 613... +; 17:29:54.615 WARN ex-data was: {} for queue-item 613... +; The item is so far failed... + +; But after approximately 10 seconds have elapsed, the item will be retried: +; 17:30:05.596 DEBUG queue item 613... for queue :q now has status :processing +; 17:30:05.597 DEBUG queue item 613... for queue :q is now processing +; 17:30:05.597 INFO got payload {:work 123} +; 17:30:05.599 INFO queue-item 613... for queue :q now has status :done after 2 tries in 5999.3 ms +; And then it has succeeded. +``` + +## Rationale + +Integrating with external systems that may be unavailable can be tricky. +Imagine the following code: + +```clojure +(defn post-handler [user-input] + (let [db-item (process user-input) + ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds + :socket-timeout 10000 ; milliseconds + ...})] ; may throw exception + @(d/transact conn [(assoc db-item :some/ext-ref ext-ref)]))) +``` + +What if the POST request fails? Should it be retried? For how long? +Should it be allowed to fail? How do you then process failures later? + +PS: If you do not set connection/socket-timeout, there is a chance that +clj-http/client will wait for all eternity in the case of a dropped TCP connection. + +The queue way to solve this would be: + +```clojure +(defn get-ext-ref [{:keys [id]}] + (let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; milliseconds + :socket-timeout 10000 ; milliseconds + ...})] ; may throw exception + @(d/transact conn [[:db/cas [:some/id id] + :some/ext-ref + nil + ext-ref]]))) + +(yq/add-consumer! :get-ext-ref get-ext-ref {:allow-cas-failure? true}) + +(defn post-handler [user-input] + (let [{:some/keys [id] :as db-item} (process user-input)] + @(d/transact conn [db-item + (yq/put :get-ext-ref {:id id})]))) +``` + +Here `post-handler` will always succeed as long as the transaction commits. + +`get-ext-ref` may fail multiple times if `ext-service` is down. +This is fine as long as it eventually succeeds. + +There is a special case where `get-ext-ref` succeeds, but +saving the new queue job status to the database fails. +Thus `get-ext-ref` and any queue consumer should tolerate to +be executed successfully several times. + +For `get-ext-ref` this is solved by using +the database function +[:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas) +to achieve a write-once behaviour. +The yoltq system treats cas failures as job successes +when a consumer has `:allow-cas-failure?` set to `true` in its options. + +## How it works + +### Queue jobs + +Creating queue jobs is done by `@(d/transact conn [...other data... (yq/put :q {:work 123})])`. +Inspecting `(yq/put :q {:work 123})]` you will see something like this: + +```clojure +#:com.github.ivarref.yoltq{:id #uuid"614232a8-e031-45bb-8660-be146eaa32a2", ; Queue job id + :queue-name :q, ; Destination queue + :status :init, ; Status + :payload "{:work 123}", ; Payload persisted to the database with pr-str + :bindings "{}", ; Bindings that will be applied before executing consumer function + :lock #uuid"037d7da1-5158-4243-8f72-feb1e47e15ca", ; Lock to protect from multiple consumers + :tries 0, ; How many times the job has been executed + :init-time 4305758012289 ; Time of initialization (System/nanoTime) + } +``` + +This is the queue job as it will be stored into the database. +You can see that the payload, i.e. the second argument of `yq/put`, +is persisted into the database. Thus the payload must be `pr-str`-able (unless you have specified +custom `:encode` and `:decode` functions that override this). + + +A queue job will initially have status `:init`. +It will then transition to the following statuses: + +* `:processing`: When the queue job begins processing in the queue consumer function. +* `:done`: If the queue consumer function returns normally. +* `:error`: If the queue consumer function throws an exception. + +### Queue consumers + +Queue jobs will be consumed by queue consumers. A consumer is a function taking a single argument, +the payload. It can be added like this: + +```clojure +(yq/add-consumer! + :q ; Queue to consume + (fn [payload] (println "got payload:" payload)) ; Queue consumer function + ; An optional map of queue opts + {:allow-cas-failure? true ; Treat [:db.cas ...] failures as success. This is one way for the + ; consumer function to ensure idempotence. + :valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload. + ; Should return truthy for valid payloads. + ; The default function always returns true. + :max-retries 10}) ; Specify maximum number of times an item will be retried. Default: 10000. + ; If :max-retries is given as 0, the job will ~always be retried, i.e. + ; 9223372036854775807 times (Long/MAX_VALUE). +``` + +The `payload` will be deserialized from the database using `clojure.edn/read-string` before +invocation, i.e. you will get back what you put into `yq/put`. + +The yoltq system treats a queue consumer function invocation as successful if it does not throw +an exception. Any return value, be it `nil`, `false`, `true`, etc. is considered a success. + +### Listening for queue jobs + +When `(yq/start!)` is invoked, a threadpool is started. + +One thread is permanently allocated for listening to the +[tx-report-queue](https://docs.datomic.com/on-prem/clojure/index.html#datomic.api/tx-report-queue) +and responding to changes. This means that yoltq will respond +and process newly created queue jobs fairly quickly. +This also means that queue jobs in status `:init` will almost always be processed without +any type of backoff. + +The threadpool also schedules polling jobs that will check for various statuses regularly: + +* Jobs in status `:error` that have waited for at least `:error-backoff-time` (default: 5 seconds) will be retried. +* Jobs that have been in `:processing` for at least `:hung-backoff-time` (default: 30 minutes) will be considered hung and retried. +* Old `:init-backoff-time` (default: 1 minute) `:init` jobs that have not been processed. Queue jobs can be left in status `:init` during application restart/upgrade, and thus the need for this strategy. + + +### Retry and backoff strategy + +Yoltq assumes that if a queue consumer throws an exception for one item, it +will also do the same for another item in the immediate future, +assuming the remote system that the queue consumer represents is still down. +Thus if there are ten failures for queue `:q`, it does not make sense to +retry all of them at once. + +The retry polling job that runs regularly (`:poll-delay`, default: every 10 seconds) +thus stops at the first failure. +Each queue have their own polling job, so if one queue is down, it will *not* stop +other queues from retrying. + +The retry polling job will continue to eagerly process queue jobs as long as it +encounters only successes. + +While the `:error-backoff-time` of default 5 seconds may seem short, in practice +if there is a lot of failed items and the external system is still down, +the actual backoff time will be longer. + + +### Stuck threads and stale jobs + +A single thread is dedicated to monitoring how much time a queue consumer +spends on a single job. If this exceeds `:max-execute-time` (default: 5 minutes) +the stack trace of the offending consumer will be logged as `:ERROR`. + +If a job is found stale, that is if the database spent time exceeds +`:hung-backoff-time` (default: 30 minutes), +the job will either be retried or marked as `:error`. This case may happen if the application +is shut down abruptly during processing of queue jobs. + + +### Giving up + +A queue job will remain in status `:error` once `:max-retries` (default: 10000) have been reached. +If `:max-retries` is given as `0`, the job will be retried 9223372036854775807 times before +giving up. +Ideally this should not happen. ¯\\\_(ツ)\_/¯ + +### Custom encoding and decoding + +Yoltq will use `pr-str` and `clojure.edn/read-string` by default to encode and decode data. +You may specify `:encode` and `:decode` either globally or per queue to override this behaviour. +The `:encode` function must return a byte array or a string. + +For example if you want to use [nippy](https://github.com/ptaoussanis/nippy): +```clojure +(require '[taoensso.nippy :as nippy]) + +; Globally for all queues: +(yq/init! + {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + +; Or per queue: +(yq/add-consumer! + :q ; Queue to consume + (fn [payload] (println "got payload:" payload)) ; Queue consumer function + {:encode nippy/freeze + :decode nippy/thaw}) ; Queue options, here with :encode and :decode +``` + +### Partitions + +Yoltq supports specifying which +[partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions) +queue entities should belong to. +The default function is: +```clojure +(defn default-partition-fn [_queue-name] + (keyword "yoltq" (str "queue_" (.getValue (java.time.Year/now))))) +``` +This is to say that there will be a single partition per year for yoltq. +Yoltq will take care of creating the partition if it does not exist. + +You may override this function, either globally or per queue, with the keyword `:partition-fn`. +E.g.: +```clojure +(yq/init! {:conn conn :partition-fn (fn [_queue-name] :my-partition)}) +``` + +### All configuration options + +For an exhaustive list of all configuration options, +see +[yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21). + +# Groups of Jobs + +Yoltq supports grouping jobs in a queue, and tracking the progress of such a +group of jobs. Consider this example: your system is used by the marketing +department to send emails to groups of users. Multiple colleagues in the +marketing department could potentially do this at the same time, but they want +to see the progress of their _own_ campagne, not that of _all_ emails being +sent. When adding the jobs to the queue, you can specify the `job-group` +parameter, in this case indicate which marketeer is running the jobs: + +```clojure +(doseq [uid user-ids] + @(d/transact conn [(yq/put :send-mail + ; Payload: + {:user-id uid :from ... :to ... :body ...} + ; Job options: + {:job-group :mail-campagne/for-marketeer-42})])) +``` + +When you want to know the progress of that specific job group, and display it in +your user interface, you can use `job-group-progress`, which returns a structure +similar to `queue-stats`: + +```clojure +(yq/job-group-progress :send-mail :mail-campagne/for-marketeer-42) +;; => [{:qname :send-mail +;; :job-group :mail-campagne/for-marketeer-42 +;; :status :init +;; :count 78} +;; {:qname :send-mail +;; :job-group :mail-campagne/for-marketeer-42 +;; :status :done +;; :count 24}] +``` + +# Regular and REPL usage + +For a regular system and/or REPL session you'll want to do: + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) + +(yq/init! {:conn conn}) + +(yq/add-consumer! :q-one ...) +(yq/add-consumer! :q-two ...) + +; Start yoltq system +(yq/start!) + +; Oops I need another consumer. This works fine: +(yq/add-consumer! :q-three ...) + +; When the application is shutting down: +(yq/stop!) +``` + +You may invoke `yq/add-consumer!` and `yq/init!` on a live system as you like. + +If you change `:pool-size` or `:poll-delay` you will have to `(yq/stop!)` and +`(yq/start!)` to make changes take effect. + +## Queue job dependencies and ordering + +It is possible to specify that one queue job must wait for another queue +job to complete before it will be executed: + +```clojure +@(d/transact conn [(yq/put :a + ; Payload: + {:id "a1"} + ; Job options: + {:id "a1"})]) + +@(d/transact conn [(yq/put :b + ; Payload: + {:id "b1" :a-ref "a1"} + ; Jobs options: + {:depends-on [:a "a1"]})]) + +; depends-on may also be specified as a function of the payload when +; adding the consumer: +(yq/add-consumer! :b + (fn [payload] ...) + {:depends-on (fn [payload] + [:a (:a-ref payload)])}) +``` + +Here queue job `b1` will not execute before `a1` is `:done`. + +Note that queue-name plus `:id` in job options must be an unique value. +In the example above that means `:a` plus `a1` must be unique. + +When specifying `:depends-on`, the referred job must at least exist in the database, +otherwise `yq/put` will throw an exception. + +Other than this there is no attempt at ordering the execution of queue jobs. +In fact the opposite is done in the poller to guard against the case that a single failing queue job +could effectively take down the entire retry polling job. + +## Retrying jobs in the REPL + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) + +; List jobs that are in state error: +(yq/get-errors :q) + +; This will retry a single job that is in error, regardless +; of how many times it has been retried earlier. +; If the job fails, you will get the full stacktrace on the REPL. +(yq/retry-one-error! :q) +; Returns a map containing the new state of the job. +; Returns nil if there are no (more) jobs in state error for this queue. +``` + +# Testing + +For testing you will probably want determinism over an extra threadpool +by using the test queue: + +```clojure +... +(:require [clojure.test :refer :all] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.test-queue :as tq]) + +; Enables the test queue and disables the threadpool for each test. +; yq/start! and yq/stop! becomes a no-op. +(use-fixtures :each tq/call-with-virtual-queue!) + +(deftest demo + (let [conn ...] + (yq/init! {:conn conn}) ; Setup + (yq/add-consumer! :q identity) + + @(d/transact conn [(yq/put :q {:work 123})]) ; Add work + + ; tq/consume! consumes one job and asserts that it succeeds. + ; It returns the return value of the consumer function + (is (= {:work 123} (tq/consume! :q))) + + ; If you want to test the idempotence of your function, + ; you may force retry a consumer function: + ; This may for example be useful to verify that the + ; :db.cas logic is correct. + (is (= {:work 123} (tq/force-retry! :q))))) +``` + +## Logging and capturing bindings + +Yoltq can capture and restore dynamic bindings. +It will capture during `yq/put` and restore them when the consumer function +is invoked. This is specified in the `:capture-bindings` setting. +It defaults to `['#taoensso.timbre/*context*]`, +i.e. the [timbre](https://github.com/ptaoussanis/timbre) log context, +if available, otherwise an empty vector. + +These dynamic bindings will be in place when yoltq logs errors, warnings +etc. about failing consumer functions, possibly making troubleshooting +easier. + +## Limitations + +Datomic does not have anything like `for update skip locked`. +Thus consuming a queue should be limited to a single JVM process. +This library will take queue jobs by compare-and-swapping a lock+state, +process the item and then compare-and-swapping the lock+new-state. +It does so eagerly, thus if you have multiple JVM consumers you will +most likely get many locking conflicts. It should work, but it's far +from optimal. + +## Alternatives + +I did not find any alternatives for Datomic. + +If I were using PostgreSQL or any other database that supports +`for update skip locked`, I'd use a queue that uses this. +For Clojure there is [proletarian](https://github.com/msolli/proletarian). + +For Redis there is [carmine](https://github.com/ptaoussanis/carmine). + +Note: I have not tried these libraries myself. + +## Other stuff + +If you liked this library, you may also like: + +* [conformity](https://github.com/avescodes/conformity): + A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, + data, or otherwise. +* [datomic-schema](https://github.com/ivarref/datomic-schema): + Simplified writing of Datomic schemas (works with conformity). +* [double-trouble](https://github.com/ivarref/double-trouble): + Handle duplicate Datomic transactions with ease. +* [gen-fn](https://github.com/ivarref/gen-fn): + Generate Datomic function literals from regular Clojure namespaces. +* [rewriting-history](https://github.com/ivarref/rewriting-history): + A library to rewrite Datomic history. + +## Change log + +#### [Unreleased] + +#### [0.2.94] - 2025-09-22 + +Added support for [groups of jobs](#groups-of-jobs). +Thanks [Stefan van den Oord](https://github.com/svdo)! + +#### [0.2.85] - 2025-07-29 + +Same as v0.2.82, but without the `v` prefix. + +#### [v0.2.82] - 2025-06-18 + +Added support for specifying `tx-report-queue` as a keyword in `init!`. Yoltq will +then not grab the datomic report queue, but use the one provided: + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(yq/init! {:conn conn + :tx-report-queue (yq/get-tx-report-queue-multicast! conn :yoltq) + ; ^^ can be any `java.util.concurrent.BlockingQueue` value + }) + +(another-tx-report-consumer! (yq/get-tx-report-queue-multicast! conn :another-consumer-id)) + +``` + +Added multicast support for `datomic.api/tx-report-queue`: +```clojure +(require '[com.github.ivarref.yoltq :as yq]) +(def my-q1 (yq/get-tx-report-queue-multicast! conn :q-id-1)) +; ^^ consume my-q1 just like you would do `datomic.api/tx-report-queue` + +(def my-q2 (yq/get-tx-report-queue-multicast! conn :q-id-2)) +; Both my-q1 and my-q2 will receive everything from `datomic.api/tx-report-queue` +; for the given `conn` + +(def my-q3 (yq/get-tx-report-queue-multicast! conn :q-id-3 true)) +; my-q3 sets the optional third argument, `send-end-token?`, to true. +; The queue will then receive `:end` if the queue is stopped. +; This can enable simpler consuming of queues: +(future + (loop [] + (let [q-item (.take ^java.util.concurrent.BlockingQueue my-q3)] + (if (= q-item :end) + (println "Time to exit. Goodbye!") + (do + (println "Processing q-item" q-item) + (recur)))))) + +; The default value for `send-end-token?` is `false`, i.e. the behaviour will be +; identical to that of datomic.api/tx-report-queue. + +@(d/transact conn [{:db/doc "new-data"}]) + +; Stop the queue: +(yq/stop-multicast-consumer-id! conn :q-id-3) +=> true +; The multicaster thread will send `:end`. +; The consumer thread will then print "Time to exit. Goodbye!". + +; if the queue is already stopped (or never was started), the `stop-multicaster...` +; functions will return false: +(yq/stop-multicast-consumer-id! conn :already-stopped-queue-or-typo) +=> false + +; Stop all queues for all connections: +(yq/stop-all-multicasters!) +``` + +`yq/get-tx-report-queue-multicast!` returns, like +`datomic.api/tx-report-queue`, +`java.util.concurrent.BlockingQueue` and starts a background thread that does +the multicasting as needed. Identical calls to `yq/get-tx-report-queue-multicast!` +returns the same `BlockingQueue`. + +Changed the default for `max-retries` from `10000` to `9223372036854775807`. + +Fixed reflection warnings. + +#### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64) + +Added support for `max-retries` being `0`, meaning the job should be retried forever +(or at least 9223372036854775807 times). + +Changed the default for `max-retries` from `100` to `10000`. + +#### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63) +Added custom `:encode` and `:decode` support. + +Added support for specifying `:partifion-fn` to specify +which partition a queue item should belong to. +It defaults to: +```clojure +(defn default-partition-fn [_queue-name] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) +``` +Yoltq takes care of creating the partition if it does not exist. + +#### 2022-11-15 v0.2.62 [diff](https://github.com/ivarref/yoltq/compare/v0.2.61...v0.2.62) +Added function `processing-time-stats`: + +```clojure +(ns com.github.ivarref.yoltq) + +(defn processing-time-stats + "Gather processing time statistics. + + Optional keyword arguments: + * :age-days — last number of days to look at data from. Defaults to 30. + Use nil to have no limit. + + * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues. + + * :duration->long - Specify what unit should be used for values. + Must take a java.time.Duration as input and return a long. + + Defaults to (fn [duration] (.toSeconds duration). + I.e. the default unit is seconds. + + Example return value: + {:queue-a {:avg 1 + :max 10 + :min 0 + :p50 ... + :p90 ... + :p95 ... + :p99 ...}}" + [{:keys [age-days queue-name now db duration->long] + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC) + duration->long (fn [duration] (.toSeconds duration))}}] + ...) +``` + +#### 2022-09-07 v0.2.61 [diff](https://github.com/ivarref/yoltq/compare/v0.2.60...v0.2.61) +Added function `retry-stats`: + +```clojure +(ns com.github.ivarref.yoltq) + +(defn retry-stats + "Gather retry statistics. + + Optional keyword arguments: + * :age-days — last number of days to look at data from. Defaults to 30. + * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues. + + Example return value: + {:queue-a {:ok 100, :retries 2, :retry-percentage 2.0} + :queue-b {:ok 100, :retries 75, :retry-percentage 75.0}} + + From the example value above, we can see that :queue-b fails at a much higher rate than :queue-a. + Assuming that the queue consumers are correctly implemented, this means that the service representing :queue-b + is much more unstable than the one representing :queue-a. This again implies + that you will probably want to fix the downstream service of :queue-b, if that is possible. + " + [{:keys [age-days queue-name now] + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC)}}] + ...) +``` + +#### 2022-08-18 v0.2.60 [diff](https://github.com/ivarref/yoltq/compare/v0.2.59...v0.2.60) +Improved: Added config option `:healthy-allowed-error-time`: +``` + ; If you are dealing with a flaky downstream service, you may not want + ; yoltq to mark itself as unhealthy on the first failure encounter with + ; the downstream service. Change this setting to let yoltq mark itself + ; as healthy even though a queue item has been failing for some time. + :healthy-allowed-error-time (Duration/ofMinutes 15) +``` + +#### 2022-08-15 v0.2.59 [diff](https://github.com/ivarref/yoltq/compare/v0.2.58...v0.2.59) +Fixed: +* Race condition that made the following possible: `stop!` would terminate the slow thread +watcher, and a stuck thread could keep `stop!` from completing! + +#### 2022-06-30 v0.2.58 [diff](https://github.com/ivarref/yoltq/compare/v0.2.57...v0.2.58) +Slightly more safe EDN printing and parsing. +Recommended reading: +[Pitfalls and bumps in Clojure's Extensible Data Notation (EDN)](https://nitor.com/en/articles/pitfalls-and-bumps-clojures-extensible-data-notation-edn) + +#### 2022-06-29 v0.2.57 [diff](https://github.com/ivarref/yoltq/compare/v0.2.56...v0.2.57) +Added `(get-errors qname)` and `(retry-one-error! qname)`. + +Improved: +`unhealthy?` will return `false` for the first 10 minutes of the application lifetime. +This was done in order to push new code while a queue was in error in an earlier +version of the code. In this way rolling upgrades are possible regardless if there +are queue errors. +Can you tell that this issue hit me? ¯\\\_(ツ)\_/¯ + +#### 2022-06-22 v0.2.56 [diff](https://github.com/ivarref/yoltq/compare/v0.2.55...v0.2.56) +Added support for `:yoltq/queue-id` metadata on functions. I.e. it's possible to write +the following: +```clojure +(defn my-consumer + {:yoltq/queue-id :some-queue} + [payload] + :work-work-work) + +(yq/add-consumer! #'my-consumer ; <-- will resolve to :some-queue + my-consumer) + +@(d/transact conn [(yq/put #'my-consumer ; <-- will resolve to :some-queue + {:id "a"})]) +``` + +The idea here is that it is simpler to jump to var definitions than going via keywords, +which essentially refers to a var/function anyway. + +#### 2022-03-29 v0.2.55 [diff](https://github.com/ivarref/yoltq/compare/v0.2.54...v0.2.55) +Added: `unhealthy?` function which returns `true` if there are queues in error, +or `false` otherwise. + +#### 2022-03-28 v0.2.54 [diff](https://github.com/ivarref/yoltq/compare/v0.2.51...v0.2.54) +Fixed: Schedules should now be using milliseconds and not nanoseconds. + +#### 2022-03-28 v0.2.51 [diff](https://github.com/ivarref/yoltq/compare/v0.2.48...v0.2.51) +* Don't OOM on migrating large amounts of data. +* Respect `:auto-migrate? false`. + +#### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48) +* Auto migration is done in the background. +* Only poll for current version of jobs, thus no races for auto migration. + +#### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46) +* Critical bugfix that in some cases can lead to stalled jobs. +``` +Started using (System/currentTimeMillis) and not (System/nanoTime) +when storing time in the database. +``` + +* Bump Clojure to `1.11.0`. + +#### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41) +* Added function `healthy?` that returns: +``` + true if no errors + false if one or more errors + nil if error-poller is yet to be executed. +``` + +* Added default functions for `:on-system-error` and `:on-system-recovery` + that simply logs that the system is in error (ERROR level) or has + recovered (INFO level). + +* Added function `queue-stats` that returns a nicely "formatted" + vector of queue stats, for example: +``` + (queue-stats) + => + [{:qname :add-message-thread, :status :done, :count 10274} + {:qname :add-message-thread, :status :init, :count 30} + {:qname :add-message-thread, :status :processing, :count 1} + {:qname :send-message, :status :done, :count 21106} + {:qname :send-message, :status :init, :count 56}] +``` + +#### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39) +Added `:valid-payload?` option for queue consumers. + +#### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37) +Improved error reporting. + +#### 2021-09-24 v0.2.33 + +First publicly announced release. + +## Making a new release + +Go to https://github.com/ivarref/yoltq/actions/workflows/release.yml and press `Run workflow`. + +## License + +Copyright © 2021-2022 Ivar Refsdal + +This program and the accompanying materials are made available under the +terms of the Eclipse Public License 2.0 which is available at +http://www.eclipse.org/legal/epl-2.0. + +This Source Code may also be made available under the following Secondary +Licenses when the conditions for such availability set forth in the Eclipse +Public License, v. 2.0 are satisfied: GNU General Public License as published by +the Free Software Foundation, either version 2 of the License, or (at your +option) any later version, with the GNU Classpath Exception which is available +at https://www.gnu.org/software/classpath/license.html. diff --git a/build.edn b/build.edn new file mode 100644 index 0000000..3e1f016 --- /dev/null +++ b/build.edn @@ -0,0 +1,34 @@ +{:lib + com.github.ivarref/yoltq + + :version + "0.2.{{git/commit-count}}" + + :github-actions? + true + + :scm + {:connection "scm:git:git://github.com/ivarref/yoltq.git" + :developerConnection "scm:git:ssh://git@github.com/ivarref/yoltq.git" + :url "https://github.com/ivarref/yoltq"} + + :documents + [{:file "README.md" + :match-exactly "#### [Unreleased]" + :action :append-after + :text "\n#### [{{version}}] - {{now/yyyy}}-{{now/mm}}-{{now/dd}}"} + {:file "README.md" + :match-exactly "com.github.ivarref/yoltq {:git/tag" + :action :replace + :keep-indent? true + :text + "com.github.ivarref/yoltq {:git/tag \"{{version}}\" :git/sha \"{{git/head-long-sha}}\"}"} + {:file "README.md" + :match-exactly "com.github.ivarref/yoltq {:mvn/version" + :action :replace + :keep-indent? true + :text "com.github.ivarref/yoltq {:mvn/version \"{{version}}\"}"}] + + :licenses + [{:name "Eclipse Public License - v 2.0" + :url "https://www.eclipse.org/legal/epl-2.0/"}]}
\ No newline at end of file diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..f0488fa --- /dev/null +++ b/deps.edn @@ -0,0 +1,40 @@ +{:deps + {com.github.ivarref/double-trouble {:mvn/version "0.1.102"} + org.clojure/tools.logging {:mvn/version "1.2.4"} + org.clojure/clojure {:mvn/version "1.11.1"} + com.datomic/peer {:mvn/version "1.0.7364"}} + + :paths + ["src"] + + :aliases + {:test + {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :exec-fn cognitect.test-runner.api/test + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + + :repl + {:extra-paths ["test"] + :extra-deps {com.bhauman/rebel-readline {:mvn/version "0.1.5"} + ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.2.1"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + com.taoensso/nippy {:mvn/version "3.2.0"}} + :exec-fn rebel-readline.tool/repl + :exec-args {} + :main-opts ["-m" "rebel-readline.main"]} + + :build + {:deps {com.github.liquidz/build.edn {:mvn/version "0.11.241"}} + :ns-default build-edn.main}}} diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj new file mode 100644 index 0000000..8c8ca7a --- /dev/null +++ b/src/com/github/ivarref/yoltq.clj @@ -0,0 +1,539 @@ +(ns com.github.ivarref.yoltq + (:require + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.error-poller :as errpoller] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.poller :as poller] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) + (:import (datomic Connection) + (java.lang.management ManagementFactory) + (java.time Duration Instant ZoneOffset ZonedDateTime) + (java.util.concurrent BlockingQueue ExecutorService Executors ScheduledExecutorService TimeUnit))) + +(defonce ^:dynamic *config* (atom nil)) +(defonce threadpool (atom nil)) +(defonce ^:dynamic *running?* (atom false)) +(defonce ^:dynamic *test-mode* false) + +(def default-opts + (-> {; Default number of times a queue job will be retried before giving up + ; Can be overridden on a per-consumer basis with + ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200}) + ; If you want no limit on the number of retries, specify + ; the value `0`. That will set the effective retry limit to + ; 9223372036854775807 times. + :max-retries 9223372036854775807 + + ; Minimum amount of time to wait before a failed queue job is retried + :error-backoff-time (Duration/ofSeconds 5) + + ; Max time a queue job can execute before an error is logged + :max-execute-time (Duration/ofMinutes 5) + + ; Amount of time an in progress queue job can run before it is considered failed + ; and will be marked as such. + :hung-backoff-time (Duration/ofMinutes 30) + + ; Most queue jobs in init state will be consumed by the tx-report-queue listener. + ; However, in the case where an init job was added right before the application + ; was shut down and did not have time to be processed by the tx-report-queue listener, + ; it will be consumer by the init poller. This init poller backs off by + ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could + ; otherwise occur if competing with the tx-report-queue listener. + :init-backoff-time (Duration/ofSeconds 60) + + ; If you are dealing with a flaky downstream service, you may not want + ; yoltq to mark itself as unhealthy on the first failure encounter with + ; the downstream service. Change this setting to let yoltq mark itself + ; as healthy even though a queue item has been failing for some time. + :healthy-allowed-error-time (Duration/ofMinutes 15) + + ; How frequent polling for init, error and hung jobs should be done. + :poll-delay (Duration/ofSeconds 10) + + ; Specifies the number of threads available for executing queue and polling jobs. + ; The final thread pool will be this size + 2. + ; + ; One thread is permanently allocated for listening to the + ; tx-report-queue. + ; + ; Another thread is permanently allocated for checking :max-execute-time. + ; This means that if all executing queue jobs are stuck and the thread pool is unavailable + ; as such, at least an error will be logged about this. The log entry will + ; contain the stacktrace of the stuck threads. + :pool-size 4 + + :capture-bindings (if-let [s (resolve (symbol "taoensso.timbre/*context*"))] + [s] + []) + + ; How often should the system be polled for failed queue jobs + :system-error-poll-delay (Duration/ofMinutes 1) + + ; How often should the system invoke + :system-error-callback-backoff (Duration/ofHours 1) + + ; Should old, possibly stalled jobs be automatically be migrated + ; as part of `start!`? + :auto-migrate? true} + + u/duration->millis)) + + +(defn init! [{:keys [conn tx-report-queue] :as cfg}] + (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (when (some? tx-report-queue) + (assert (instance? BlockingQueue tx-report-queue) + (str "Expected :tx-report-queue to be of type java.util.concurrent.BlockingQueue"))) + (locking threadpool + @(d/transact conn i/schema) + (let [new-cfg (swap! *config* + (fn [old-conf] + (-> (merge-with (fn [_ b] b) + {:running-queues (atom #{}) + :start-execute-time (atom {}) + :system-error (atom {}) + :healthy? (atom nil) + :slow? (atom nil) + :slow-thread-watcher-done? (promise)} + default-opts + (if *test-mode* old-conf (select-keys old-conf [:handlers])) + cfg) + u/duration->millis)))] + new-cfg))) + + +(defn get-queue-id + [queue-id-or-var] + (cond (and (var? queue-id-or-var) + (keyword? (:yoltq/queue-id (meta queue-id-or-var)))) + (:yoltq/queue-id (meta queue-id-or-var)) + + (keyword? queue-id-or-var) + queue-id-or-var + + :else + (throw (ex-info (str "Could not get queue-id for " queue-id-or-var) {:queue-id queue-id-or-var})))) + +(defn add-consumer! + ([queue-id f] + (add-consumer! queue-id f {})) + ([queue-id f opts] + (swap! *config* (fn [old-config] (assoc-in old-config [:handlers (get-queue-id queue-id)] (merge opts {:f f})))))) + + +(defn put + ([queue-id payload] (put queue-id payload {})) + ([queue-id payload opts] + (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] + (when (and *test-mode* bootstrap-poller!) + (bootstrap-poller! conn)) + (i/put cfg (get-queue-id queue-id) payload opts)))) + + +(defn- do-start! [] + (let [{:keys [poll-delay pool-size system-error-poll-delay auto-migrate? slow-thread-watcher-done?] :as cfg} @*config*] + (when auto-migrate? + (future (migrate/migrate! cfg))) + (let [pool (reset! threadpool (Executors/newScheduledThreadPool (+ 1 pool-size))) + queue-listener-ready (promise)] + (reset! *running?* true) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/MILLISECONDS) + (.scheduleAtFixedRate ^ScheduledExecutorService pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/MILLISECONDS) + (.execute ^ScheduledExecutorService pool + (fn [] + (try + (log/debug "report-queue-listener starting") + (rq/report-queue-listener *running?* queue-listener-ready pool *config*) + (finally + (log/debug "report-queue-listener exiting") + (deliver queue-listener-ready :finally))))) + (future (try + (slow-executor/show-slow-threads pool *config*) + (finally + (deliver slow-thread-watcher-done? :done)))) + (let [q-listener-retval (deref queue-listener-ready 30000 :timeout)] + (cond (= :timeout q-listener-retval) + (do + (log/error "Timed out waiting for report-queue-listener to start") + (throw (IllegalStateException. "Timed out waiting for report-queue-listener to start"))) + + (= :finally q-listener-retval) + (do + (log/error "report-queue-listener did not start") + (throw (IllegalStateException. "report-queue-listener did not start"))) + + (= :ready q-listener-retval) + (do + (log/debug "report-queue-listener is ready")) + + :else + (do + (log/error (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval))) + (throw (IllegalStateException. (str "Unexpected queue-listener-retval: " (pr-str q-listener-retval)))))))))) + + +(defn start! [] + (locking threadpool + (cond (true? *test-mode*) + (log/info "test mode enabled, doing nothing for start!") + + (true? @*running?*) + nil + + (false? @*running?*) + (do-start!)))) + + +(defn stop! [] + (locking threadpool + (cond (true? *test-mode*) + (log/info "test mode enabled, doing nothing for stop!") + + (false? @*running?*) + nil + + (true? @*running?*) + (do + (reset! *running?* false) + (when-let [^ExecutorService tp @threadpool] + (log/debug "shutting down threadpool") + (.shutdown tp) + (while (not (.awaitTermination tp 1 TimeUnit/SECONDS)) + (log/trace "waiting for threadpool to stop")) + (log/debug "stopped!") + (reset! threadpool nil)) + (when-let [wait-slow-threads (some->> *config* deref :slow-thread-watcher-done?)] + (log/debug "waiting for slow-thread-watcher to stop ...") + @wait-slow-threads + (log/debug "waiting for slow-thread-watcher to stop ... OK")))))) + + +(defn healthy? [] + (cond + (< (.toMinutes (Duration/ofMillis (.getUptime (ManagementFactory/getRuntimeMXBean)))) 10) + true + + (false? (some->> @*config* + :healthy? + (deref))) + false + + (true? (some->> @*config* + :slow? + (deref))) + false + + :else + true)) + +(defn unhealthy? + "Returns `true` if there are queues in error or a thread is slow, and the application has been up for over 10 minutes, otherwise `false`." + [] + (false? (healthy?))) + +(defn queue-stats [] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?status + :in $ + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status]] + db) + (mapv (partial zipmap [:e :qname :status])) + (mapv #(select-keys % [:qname :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname status]} v]] + (array-map + :qname qname + :status status + :count v))) + (sort-by (juxt :qname :status)) + (vec)))) + +(defn job-group-progress [queue-name job-group] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?job-group ?status + :keys :e :qname :job-group :status + :in $ ?qname ?job-group + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/job-group ?job-group] + [?e :com.github.ivarref.yoltq/status ?status]] + db queue-name job-group) + (mapv #(select-keys % [:qname :job-group :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname job-group status]} v]] + (array-map + :qname qname + :job-group job-group + :status status + :count v))) + (sort-by (juxt :qname :job-group :status)) + (vec)))) + +(defn get-errors [qname] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find [?id ...] + :in $ ?qname ?status + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/id ?id]] + db + qname + :error) + (mapv (partial u/get-queue-item db))))) + +(defn retry-one-error! [qname] + (let [{:keys [handlers] :as cfg} @*config* + _ (assert (contains? handlers qname) "Queue not found") + cfg (assoc-in cfg [:handlers qname :max-retries] Long/MAX_VALUE)] + (poller/poll-once! cfg qname :error))) + +(defn retry-stats + "Gather retry statistics. + + Optional keyword arguments: + * :age-days — last number of days to look at data from. Defaults to 30. + * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues. + + Example return value: + {:queue-a {:ok 100, :retries 2, :retry-percentage 2.0} + :queue-b {:ok 100, :retries 75, :retry-percentage 75.0}} + + From the example value above, we can see that :queue-b fails at a much higher rate than :queue-a. + Assuming that the queue consumers are correctly implemented, this means that the service representing :queue-b + is much more unstable than the one representing :queue-a. This again implies + that you will probably want to fix the downstream service of :queue-b, if that is possible. + " + [{:keys [age-days queue-name now db] + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC)}}] + (let [{:keys [conn]} @*config* + db (or db (d/db conn))] + (->> (d/query {:query {:find '[?qname ?status ?tries ?init-time] + :in (into '[$] (when queue-name '[?qname])) + :where '[[?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/tries ?tries] + [?e :com.github.ivarref.yoltq/init-time ?init-time]]} + :args (remove nil? [db queue-name])}) + (mapv (partial zipmap [:qname :status :tries :init-time])) + (mapv #(update % :init-time (fn [init-time] (.atZone (Instant/ofEpochMilli init-time) ZoneOffset/UTC)))) + (mapv #(assoc % :age-days (.toDays (Duration/between (:init-time %) now)))) + (filter #(<= (:age-days %) age-days)) + (group-by :qname) + (mapv (fn [[q values]] + {q (let [{:keys [ok retries] :as m} (->> values + (mapv (fn [{:keys [tries status]}] + (condp = status + u/status-init {} + u/status-processing {:processing 1 :retries (dec tries)} + u/status-done {:ok 1 :retries (dec tries)} + u/status-error {:error 1 :retries (dec tries)}))) + (reduce (partial merge-with +) {}))] + (into (sorted-map) (merge m + (when (pos-int? ok) + {:retry-percentage (double (* 100 (/ retries ok)))}))))})) + (into (sorted-map))))) + +(defn- percentile [n values] + (let [idx (int (Math/floor (* (count values) (/ n 100))))] + (nth values idx))) + +(defn processing-time-stats + "Gather processing time statistics. Default unit is seconds. + + Optional keyword arguments: + * :age-days — last number of days to look at data from. Defaults to 30. + Use nil to have no limit. + + * :queue-name — only gather statistics for this queue name. Defaults to nil, meaning all queues. + + * :duration->long - Specify what unit should be used for values. + Must take a java.time.Duration as input and return a long. + + Defaults to (fn [duration] (.toSeconds duration). + I.e. the default unit is seconds. + + Example return value: + {:queue-a {:avg 1 + :max 10 + :min 0 + :p50 ... + :p90 ... + :p95 ... + :p99 ...}}" + [{:keys [age-days queue-name now db duration->long] + :or {age-days 30 + now (ZonedDateTime/now ZoneOffset/UTC) + duration->long (fn [duration] (.toSeconds ^Duration duration))}}] + (let [{:keys [conn]} @*config* + db (or db (d/db conn)) + ->zdt #(.atZone (Instant/ofEpochMilli %) ZoneOffset/UTC)] + (->> (d/query {:query {:find '[?qname ?status ?init-time ?done-time] + :in (into '[$ ?status] (when queue-name '[?qname])) + :where '[[?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [?e :com.github.ivarref.yoltq/done-time ?done-time]]} + :args (vec (remove nil? [db u/status-done queue-name]))}) + (mapv (partial zipmap [:qname :status :init-time :done-time])) + (mapv #(update % :init-time ->zdt)) + (mapv #(update % :done-time ->zdt)) + (mapv #(assoc % :age-days (.toDays (Duration/between (:init-time %) now)))) + (mapv #(assoc % :spent-time (duration->long (Duration/between (:init-time %) (:done-time %))))) + (filter #(or (nil? age-days) (<= (:age-days %) age-days))) + (group-by :qname) + (mapv (fn [[q values]] + (let [values (vec (sort (mapv :spent-time values)))] + {q (sorted-map + :max (apply max values) + :avg (int (Math/floor (/ (reduce + 0 values) (count values)))) + :p50 (percentile 50 values) + :p90 (percentile 90 values) + :p95 (percentile 95 values) + :p99 (percentile 99 values) + :min (apply min values))}))) + (into (sorted-map))))) + +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. + The default value for `send-end-token?` is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (assert (boolean? send-end-token?)) + (rq/get-tx-report-queue-multicast! conn id send-end-token?))) + +(defn stop-multicast-consumer-id! + "Stop forwarding reports from datomic.api/tx-report-queue to the queue identified by `conn` and `id`. + If this is the last report destination for the given `conn`, the multicaster thread will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if the queue was stopped. + Return `false` if the queue does not exist." + [conn id] + (assert (instance? Connection conn)) + (rq/stop-multicast-consumer-id! conn id)) + +(defn stop-multicaster! + "Stop forwarding reports from datomic.api/tx-report-queue to any queues belonging to `conn`. + The multicaster thread will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue belonging to `conn` was stopped. + Returns `false` is `conn` did not have any associated queues." + [conn] + (assert (instance? Connection conn)) + (rq/stop-multicaster! conn)) + +(defn stop-all-multicasters! + "Stop forwarding all reports from datomic.api/tx-report-queue for any `conn`. + All multicaster threads will exit. + Repeated calls are no-op. + + The multicaster thread will send `:end` if `send-end-token?` was `true` when `get-tx-report-queue-multicast!` + was called. + + Returns `true` if any queue was stopped. + Returns `false` if no queues existed." + [] + (rq/stop-all-multicasters!)) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"ivarref.yoltq.report-queue"} :info] + [#{"ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (future (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 1) + :poll-delay (Duration/ofSeconds 1) + :max-execute-time (Duration/ofSeconds 3) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true) + (log/info "sleeping...") + (Thread/sleep (.toMillis (Duration/ofSeconds 60))) + (log/info "done sleeping"))) + (start!) + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop!) + nil))))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq.migrate"} :warn] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri) + started-consuming? (promise) + n 1] + (init! {:conn conn + :tx-report-queue (get-tx-report-queue-multicast! conn :yoltq true) + :slow-thread-show-stacktrace? false}) + (add-consumer! :q (fn [_] + (deliver started-consuming? true))) + (log/info "begin start! ...") + (start!) + (log/info "begin start! ... Done") + (Thread/sleep 2000) + (log/info "*******************************************") + @(d/transact conn [(put :q {:work 123})]) + @started-consuming? + (stop-multicaster! conn) + (log/info "*******************************************") + (stop!) + (log/info "stop! done") + nil))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj new file mode 100644 index 0000000..dffff28 --- /dev/null +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -0,0 +1,124 @@ +(ns com.github.ivarref.yoltq.error-poller + (:require [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d])) + + +(defn get-state [v] + (case v + [:error :none] :recovery + [:error :some] :error + [:error :all] :error + [:recovery :none] :recovery + [:recovery :some] :recovery + [:recovery :all] :error + nil)) + + +(defn handle-error-count [{:keys [errors last-notify state] + :or {errors [] + last-notify 0 + state :recovery}} + {:keys [system-error-min-count system-error-callback-backoff] + :or {system-error-min-count 3}} + now-ms + error-count] + (let [new-errors (->> (conj errors error-count) + (take-last system-error-min-count) + (vec)) + classify (fn [coll] + (cond + (not= system-error-min-count (count coll)) + :missing + + (every? pos-int? coll) + :all + + (every? zero? coll) + :none + + :else + :some)) + old-state state] + (merge + {:errors new-errors + :last-notify last-notify} + (when-let [new-state (get-state [old-state (classify new-errors)])] + (merge + {:state new-state} + (when (and (= old-state :recovery) + (= new-state :error)) + {:run-callback :error + :last-notify now-ms}) + + (when (and (= new-state :error) + (= old-state :error) + (> now-ms + (+ last-notify system-error-callback-backoff))) + {:run-callback :error + :last-notify now-ms}) + + (when (and (= new-state :recovery) + (= old-state :error)) + {:run-callback :recovery})))))) + + +(defn do-poll-errors [{:keys [conn + system-error + on-system-error + on-system-recovery + healthy? + healthy-allowed-error-time] + :or {on-system-error (fn [] + (log/error "There are yoltq queues which have errors") + nil) + on-system-recovery (fn [] + (log/info "Yoltq recovered"))} + :as config} + now-ms] + (assert (some? conn) "expected :conn to be present") + (assert (some? system-error) "expected :system-error to be present") + (assert (nat-int? healthy-allowed-error-time) "expected :healthy-allowed-error-time to be present") + (let [max-init-time (- now-ms healthy-allowed-error-time) + error-count (or (d/q '[:find (count ?e) . + :in $ ?status ?max-init-time + :where + [?e :com.github.ivarref.yoltq/status ?status] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(<= ?init-time ?max-init-time)]] + (d/db conn) + u/status-error + max-init-time) + 0)] + (if (pos-int? error-count) + (do + (log/debug "poll-errors found" error-count "errors in system") + (reset! healthy? false)) + (reset! healthy? true)) + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config now-ms error-count)] + (when run-callback + (cond (= run-callback :error) + (on-system-error) + + (= run-callback :recovery) + (on-system-recovery) + + :else + (log/error "unhandled callback-type" run-callback)) + (log/debug "run-callback is" run-callback)) + error-count))) + + +(defn poll-errors [running? config-atom] + (try + (when @running? + (do-poll-errors @config-atom (ext/now-ms))) + (catch Throwable t + (log/error t "unexpected error in poll-errors:" (ex-message t)) + nil))) + + +(comment + (do-poll-errors @com.github.ivarref.yoltq/*config* (ext/now-ms))) + diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj new file mode 100644 index 0000000..692b934 --- /dev/null +++ b/src/com/github/ivarref/yoltq/ext_sys.clj @@ -0,0 +1,27 @@ +(ns com.github.ivarref.yoltq.ext-sys + (:require [datomic.api :as d]) + (:refer-clojure :exclude [random-uuid]) + (:import (java.util UUID))) + + +(def ^:dynamic *now-ms-atom* nil) +(def ^:dynamic *squuid-atom* nil) +(def ^:dynamic *random-atom* nil) + + +(defn now-ms [] + (if *now-ms-atom* + @*now-ms-atom* + (System/currentTimeMillis))) + + +(defn squuid [] + (if *squuid-atom* + (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *squuid-atom* inc)))) + (d/squuid))) + + +(defn random-uuid [] + (if *random-atom* + (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc)))) + (UUID/randomUUID))) diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj new file mode 100644 index 0000000..ffb1ad8 --- /dev/null +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -0,0 +1,248 @@ +(ns com.github.ivarref.yoltq.impl + (:require [clojure.edn :as edn] + [clojure.string :as str] + [clojure.tools.logging :as log] + [com.github.ivarref.double-trouble :as dt] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) + (:import (java.time Year))) + +(def schema + [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} + #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} + #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/job-group, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} + #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}]) + +(defn pr-str-inner [x] + (binding [*print-dup* false + *print-meta* false + *print-readably* true + *print-length* nil + *print-level* nil + *print-namespace-maps* false] + (pr-str x))) + +(defn pr-str-safe [what x] + (try + (if (= x (edn/read-string (pr-str-inner x))) + (pr-str-inner x) + (throw (ex-info (str "Could not read-string " what) {:input x}))) + (catch Exception e + (log/error "could not read-string" what ":" (ex-message e)) + (throw e)))) + +(defn default-partition-fn [_queue-keyword] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) + +(defn put [{:keys [capture-bindings conn encode partition-fn] + :or {partition-fn default-partition-fn + encode (partial pr-str-safe :payload)} + :as config} + queue-name + payload + opts] + (if-let [q-config (get-in config [:handlers queue-name])] + (let [id (u/squuid) + encode (get q-config :encode encode) + partition-fn (get q-config :partition-fn partition-fn) + partition (partition-fn queue-name) + _ (assert (keyword? partition) "Partition must be a keyword") + depends-on (get q-config :depends-on (fn [_] nil)) + valid-payload? (get q-config :valid-payload? (fn [_] true)) + opts (merge + (when-let [deps (depends-on payload)] + {:depends-on deps}) + (or opts {})) + str-bindings (->> (reduce (fn [o k] + (assoc o (symbol k) (deref k))) + {} + (or capture-bindings [])) + (pr-str-safe :capture-bindings)) + _ (when-not (valid-payload? payload) + (log/error "Payload was not valid. Payload was:" payload) + (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + encoded (encode payload) + _ (when (not (or (bytes? encoded) (string? encoded))) + (log/error "Payload must be encoded to either a string or a byte array") + (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))] + (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) + (do + (dt/ensure-partition! conn partition) + (merge + (if (bytes? encoded) + {:com.github.ivarref.yoltq/payload-bytes encoded} + {:com.github.ivarref.yoltq/payload encoded}) + {:db/id (d/tempid partition) + :com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/bindings str-bindings + :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ms) + :com.github.ivarref.yoltq/version "2"} + (when-let [[q ext-id] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] + (d/db conn) + (pr-str-safe :depends-on [q ext-id])) + (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) + (when-let [ext-id (:id opts)] + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}) + (when-let [job-group (:job-group opts)] + {:com.github.ivarref.yoltq/job-group job-group})))) + (do + (log/error "Did not find registered handler for queue" queue-name) + (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) + + +(defn depends-on-waiting? [{:keys [conn]} + q-item] + (let [db (d/db conn)] + (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db (:id q-item))] + (when-let [[q id :as depends-on] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id] + [?e :com.github.ivarref.yoltq/status :done]] + db + (pr-str [q id])) + (log/info "queue item" (str (:id q-item)) "is waiting on" depends-on) + {:depends-on depends-on}))))) + + +(defn take! [{:keys [conn cas-failures hung-log-level tx-spent-time!] + :or {hung-log-level :error}} + {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}] + (when queue-item-info + (try + (cond to-error? + (log/logp hung-log-level "queue-item" (str id) "was hung and retried too many times. Giving up!") + + was-hung? + (log/logp hung-log-level "queue-item" (str id) "was hung, retrying ...") + + :else + nil) + (let [start-time (System/nanoTime) + {:keys [db-after]} @(d/transact conn tx) + _ (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) + {:com.github.ivarref.yoltq/keys [status] :as q-item} (u/get-queue-item db-after id)] + (log/debug "queue item" (str id) "for queue" queue-name "now has status" status) + q-item) + (catch Throwable t + (let [{:db/keys [error] :as m} (u/db-error-map t)] + (cond + (= :db.error/cas-failed error) + (do + (log/info "take! :db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) + (when cas-failures + (swap! cas-failures inc)) + nil) + + :else + (do + (log/error t "Unexpected failure for queue item" (str id) ":" (ex-message t)) + nil))))))) + + +(defn mark-status! [{:keys [conn tx-spent-time!]} + {:com.github.ivarref.yoltq/keys [id lock tries]} + new-status] + (try + (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status] + (if (= new-status u/status-done) + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)} + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})] + start-time (System/nanoTime) + {:keys [db-after]} @(d/transact conn tx)] + (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) + (u/get-queue-item db-after id)) + (catch Throwable t + (log/error t "unexpected error in mark-status!: " (ex-message t)) + nil))) + + +(defn fmt [id queue-name new-status tries spent-ns] + (str/join " " ["queue-item" (str id) + "for queue" queue-name + "now has status" new-status + "after" tries (if (= 1 tries) + "try" + "tries") + "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) + + +(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!] + :or {mark-status-fn! mark-status! + decode edn/read-string} + :as cfg} + {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}] + (when queue-item + (if (= :error status) + (assoc queue-item :failed? true) + (if-let [queue (get handlers queue-name)] + (let [{:keys [f allow-cas-failure?]} queue + decode (get queue :decode decode)] + (log/debug "queue item" (str id) "for queue" queue-name "is now processing") + (let [{:keys [retval exception]} + (try + (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) + (let [payload (decode (or payload payload-bytes)) + v (f payload)] + {:retval v}) + (catch Throwable t + {:exception t}) + (finally + (swap! start-execute-time dissoc (Thread/currentThread)))) + {:db/keys [error] :as m} (u/db-error-map exception)] + (cond + (and (some? exception) + allow-cas-failure? + (= :db.error/cas-failed error) + (or (true? allow-cas-failure?) + (allow-cas-failure? (:a m)))) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) + (assoc q-item :retval retval :success? true :allow-cas-failure? true))) + + (some? exception) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-error)] + (let [{:com.github.ivarref.yoltq/keys [init-time error-time tries]} q-item + level (if (>= tries 3) :error :warn)] + (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) + (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) + (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) + (assoc q-item :exception exception))) + + :else + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) + (assoc q-item :retval retval :success? true)))))) + (do + (log/error "no handler for queue" queue-name) + nil))))) diff --git a/src/com/github/ivarref/yoltq/migrate.clj b/src/com/github/ivarref/yoltq/migrate.clj new file mode 100644 index 0000000..c97f679 --- /dev/null +++ b/src/com/github/ivarref/yoltq/migrate.clj @@ -0,0 +1,61 @@ +(ns com.github.ivarref.yoltq.migrate + (:require [datomic.api :as d] + [clojure.tools.logging :as log])) + +(defn to->v2-ent [{:keys [conn]} now-ms id] + (log/info "Migrating id" id) + (let [attr-val (fn [attr] + (when-let [old (d/q '[:find ?time . + :in $ ?e ?a + :where + [?e ?a ?time]] + (d/db conn) + [:com.github.ivarref.yoltq/id id] + attr)] + (let [now-ms (or now-ms + (.getTime (d/q '[:find (max ?txinst) . + :in $ ?e ?a + :where + [?e ?a _ ?tx true] + [?tx :db/txInstant ?txinst]] + (d/history (d/db conn)) + [:com.github.ivarref.yoltq/id id] + attr)))] + (log/info "Updating" id attr "to" now-ms) + [[:db/cas [:com.github.ivarref.yoltq/id id] + attr old now-ms]])))] + (vec (concat [[:db/cas [:com.github.ivarref.yoltq/id id] + :com.github.ivarref.yoltq/version nil "2"]] + (mapcat attr-val [:com.github.ivarref.yoltq/init-time + :com.github.ivarref.yoltq/processing-time + :com.github.ivarref.yoltq/done-time + :com.github.ivarref.yoltq/error-time]))))) + +(defn to->v2 [{:keys [conn loop? now-ms] + :or {loop? true} + :as cfg}] + (loop [tx-vec []] + (if-let [id (some->> (d/q '[:find [?id ...] + :in $ + :where + [?e :com.github.ivarref.yoltq/id ?id] + [(missing? $ ?e :com.github.ivarref.yoltq/version)]] + (d/db conn)) + (sort) + (not-empty) + (first))] + (let [tx (to->v2-ent cfg now-ms id)] + @(d/transact conn tx) + (if loop? + (recur (vec (take 10 (conj tx-vec tx)))) + tx)) + (do + (log/info "No items left to migrate") + tx-vec)))) + + +(defn migrate! [cfg] + (to->v2 cfg)) + +(comment + (migrate! @com.github.ivarref.yoltq/*config*)) diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj new file mode 100644 index 0000000..9cf81c7 --- /dev/null +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -0,0 +1,64 @@ +(ns com.github.ivarref.yoltq.poller + (:require [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i] + [clojure.tools.logging :as log])) + + +(defn poll-once! [cfg q status] + (when-let [item (case status + :init (u/get-init cfg q) + :error (u/get-error cfg q) + :hung (u/get-hung cfg q))] + (with-bindings (get item :bindings {}) + (if (i/depends-on-waiting? cfg item) + nil + (some->> item + (i/take! cfg) + (i/execute! cfg)))))) + + +(defn poll-queue! [running? + {:keys [running-queues] :as cfg} + [queue-name status :as q]] + (try + (let [[old _] (swap-vals! running-queues conj q)] + (if-not (contains? old q) + (try + (log/debug "polling queue" queue-name "for status" status) + (let [start-time (u/now-ms) + last-res (loop [prev-res nil] + (when @running? + (let [res (poll-once! cfg queue-name status)] + (log/debug "poll-once! returned" res) + (if (and res (:success? res)) + (recur res) + prev-res))))] + (let [spent-ms (- (u/now-ms) start-time)] + (log/trace "done polling queue" q "in" spent-ms "ms")) + last-res) + (finally + (swap! running-queues disj q))) + (log/debug "queue" q "is already being polled, doing nothing..."))) + (catch Throwable t + (log/error t "poll-queue! crashed:" (ex-message t))) + (finally))) + +(comment + (def cfg @com.github.ivarref.yoltq/*config*)) + +(comment + (poll-queue! + (atom true) + @com.github.ivarref.yoltq/*config* + [:add-message-thread :init])) + +(defn poll-all-queues! [running? config-atom pool] + (try + (when @running? + (let [{:keys [handlers]} @config-atom] + (doseq [q (shuffle (vec (for [q-name (keys handlers) + status [:init :error :hung]] + [q-name status])))] + (.execute pool (fn [] (poll-queue! running? @config-atom q)))))) + (catch Throwable t + (log/error t "poll-all-queues! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj new file mode 100644 index 0000000..f83e3ba --- /dev/null +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -0,0 +1,459 @@ +(ns com.github.ivarref.yoltq.report-queue + (:require [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i] + [datomic.api :as d] + [clojure.tools.logging :as log]) + (:import (datomic Connection Datom) + (java.util.concurrent LinkedBlockingQueue ScheduledExecutorService BlockingQueue TimeUnit))) + +; Private API, subject to change! + +(defn process-poll-result! [cfg id-ident poll-result consumer] + (let [{:keys [tx-data db-after]} poll-result] + (when-let [new-ids (->> tx-data + (filter (fn [^Datom datom] (and + (= (.a datom) id-ident) + (.added datom)))) + (mapv (fn [^Datom datom] (.v datom))) + (into []) + (not-empty))] + (doseq [id new-ids] + (consumer (fn [] + (try + (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name bindings]} (u/get-queue-item db-after id)] + (with-bindings (or bindings {}) + (if (i/depends-on-waiting? cfg {:id id}) + nil + (some->> + (u/prepare-processing db-after id queue-name lock status) + (i/take! cfg) + (i/execute! cfg))))) + (catch Throwable t + (log/error t "Unexpected error in process-poll-result!"))))))))) + +(defn report-queue-listener [running? + ready? + ^ScheduledExecutorService pool + config-atom] + (let [cfg @config-atom + conn (:conn cfg) + tx-report-queue-given (contains? cfg :tx-report-queue) + ^BlockingQueue q (if tx-report-queue-given + (get cfg :tx-report-queue) + (d/tx-report-queue conn)) + id-ident (d/q '[:find ?e . + :where [?e :db/ident :com.github.ivarref.yoltq/id]] + (d/db conn))] + (assert (instance? BlockingQueue q)) + (log/debug "tx-report-queue-given:" tx-report-queue-given) + (try + (let [running-local? (atom true)] + (while (and @running? @running-local?) + (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] + (if (= poll-result :end) + (do + (log/debug "report-queue-listener received :end token. Exiting") + (reset! running-local? false)) + ;(log/warn "yoltq report-queue-listener received :end token. If the rest of the system is kept running, it will result in a partially broken system.")) + (process-poll-result! @config-atom + id-ident + poll-result + (fn [f] + (when @running? + (.execute ^ScheduledExecutorService pool f)))))) + (deliver ready? :ready))) + (catch Throwable t + (log/error t "Unexpected error in report-queue-listener:" (.getMessage t))) + (finally + (if tx-report-queue-given + (log/debug "Remove tx-report-queue handled elsewhere") + (do + (log/debug "Remove tx-report-queue") + (d/remove-tx-report-queue conn))))))) + +; https://stackoverflow.com/a/14488425 +(defn- dissoc-in + "Dissociates an entry from a nested associative structure returning a new + nested structure. keys is a sequence of keys. Any empty maps that result + will not be present in the new structure." + [m [k & ks :as keys]] + (if ks + (if-let [nextmap (get m k)] + (let [newmap (dissoc-in nextmap ks)] + (if (seq newmap) + (assoc m k newmap) + (dissoc m k))) + m) + (dissoc m k))) + +(defn- queues-to-shutdown [old-state new-state] + (assert (map? old-state)) + (assert (map? new-state)) + (doseq [x (vals new-state)] + (assert (vector? x))) + (doseq [x (vals old-state)] + (assert (vector? x))) + (let [new-qs (into #{} (mapv second (vals new-state)))] + (reduce + (fn [o [send-end-token? old-q]] + ;(assert (boolean? send-end-token?)) + ;(assert (instance? BlockingQueue old-q)) + (if (contains? new-qs old-q) + o + (conj o [send-end-token? old-q]))) + [] + (vals old-state)))) + +(comment + (queues-to-shutdown {:a [true 999] :b [false 777]} + {:a [true 123] :b [true 777]})) + +(defn- multicast-once [conn work-item old-state new-state] + (assert (map? old-state)) + (assert (map? new-state)) + (doseq [[send-end-token? q-to-shutdown] (queues-to-shutdown old-state new-state)] + (if send-end-token? + (do + #_(log/debug "offering :end token") + (if (.offer ^BlockingQueue q-to-shutdown :end 1 TimeUnit/MICROSECONDS) + (log/debug "Multicaster sent :end token") + (log/debug "Multicaster failed to send :end token"))) + (do + (log/debug "Multicaster not sending :end token")))) + (when (seq new-state) + (if (some? work-item) + (reduce-kv + (fn [m id [send-end-token? q]] + (let [ok-offer (.offer ^BlockingQueue q work-item 1 TimeUnit/MICROSECONDS)] + (if (true? ok-offer) + (assoc m id [send-end-token? q]) + (log/error "Multicaster failed to offer item for connection" conn "and queue id" id)))) + {} + new-state) + new-state))) + +(defonce ^:private multicast-state-lock (Object.)) +(defonce ^:private consumer-state-lock (Object.)) +(defonce ^:private multicast-state (atom {})) +(defonce ^:private thread-count (atom 0)) + +(defn- multicaster-loop [init-state conn ready?] + (assert (instance? Connection conn)) + (let [input-queue (d/tx-report-queue conn)] + (deliver ready? true) + (loop [old-state init-state] + (let [work-item (.poll ^BlockingQueue input-queue 16 TimeUnit/MILLISECONDS) + new-state (locking multicast-state-lock + ; writer to `multicast-state` must be protected by `multicast-state-lock` + ; it should block minimally / spend minimum amount of time + (swap! multicast-state (fn [old-state] (update-in old-state [:iter-count conn] (fnil inc 0)))) + (if-let [new-state (multicast-once conn work-item old-state (get-in @multicast-state [:queues conn] {}))] + new-state + (do (swap! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn]))) + (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] dec))) + (d/remove-tx-report-queue conn) + (log/debug "Multicaster removed tx-report-queue for conn" conn) + nil)))] + (if new-state + (recur new-state) + nil))))) + +(defn- start-multicaster! [conn] + (assert (instance? Connection conn)) + (let [ready? (promise)] + (future + (log/debug "Multicaster starting for conn" conn) + (try + (swap! thread-count inc) + (let [new-state (swap! multicast-state (fn [old-state] (update-in old-state [:thread-count conn] (fnil inc 0))))] + (assert (= 1 (get-in new-state [:thread-count conn]))) + ; "parent" thread holds `multicast-state-lock` and + ; waits for `ready?` promise, so effectively this new thread also holds + ; the lock until `ready?` is delivered. That is: it is safe + ; for this thread to modify multicast-state regardless of what other threads are doing + (multicaster-loop (get-in new-state [:queues conn]) conn ready?)) + (catch Throwable t + (log/error t "Unexpected error in multicaster:" (.getMessage t)) + (log/error "Multicaster exiting for conn")) + (finally + (swap! thread-count dec) + (log/debug "Multicaster exiting for conn" conn)))) + (when (= :timeout (deref ready? 30000 :timeout)) + (throw (RuntimeException. "Timed out waiting for multicaster to start"))))) + +(defn- wait-multicast-thread-step + [conn state] + ; `get-tx-report-queue-multicast!` should return only when the multicaster thread + ; has picked up the new queue. + ; + ; Otherwise the following could happen: + ; 1. multicast thread is sleeping + ; 2: user-thread calls get-tx-report-queue-multicast! with `send-end-token?` `true` + ; 3: user-thread (or somebody else) calls `stop-multicaster`. + ; The multicast-state atom is now identical as it was in step 1. + ; , Step 2 and 3 happened while the multicast thread was sleeping. + ; 4: The multicast thread is scheduled and does _not_ detect any state change. + ; Therefore the multicast thread does _not_ send out an :end token as one would expect. + ; + ; The new queue is written to memory at this point. No other thread can remove it because + ; we are still, and have been during the modification of multicast-state, holding consumer-state-lock. + ; This means that the multicast thread cannot exit at this point. Also, because we hold the consumer-state-lock, + ; we can be sure that no other thread changes or has changed the state. + ; + ; Once [:iter-count conn] has changed, we know that the multicaster thread + ; has seen the new queue. This means that we can be sure that the queue + ; will receive the `:end` token if the queue is stopped. + (let [start-ms (System/currentTimeMillis) + iter-count (get-in state [:iter-count conn] -1)] + (loop [spin-count 0] + (if (not= iter-count (locking multicast-state-lock + (get-in @multicast-state [:iter-count conn] -1))) + nil + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread")) + (do + (Thread/sleep 16) + (recur (inc spin-count))))))))) + +(defn get-tx-report-queue-multicast! + "Multicast the datomic.api/tx-report-queue to different consumers. + A multicaster thread is started on demand per connection. `conn` and `id` identifies the consumer. + Repeated calls using the same `conn` and `id` returns the same queue. + + The optional third parameter, `send-end-token?`, if set to `true`, instructs the multicaster thread + to send `:end` if the queue is stopped. The default value is `false`. + + A queue may be stopped using `stop-multicaster-id!`, `stop-multicaster!` or `stop-all-multicasters!`. + + Returns a `java.util.concurrent.BlockingQueue` like `datomic.api/tx-report-queue`." + ([conn id] + (get-tx-report-queue-multicast! conn id false)) + ([conn id send-end-token?] + (assert (instance? Connection conn)) + (locking consumer-state-lock + (let [[new-state the-q] + (locking multicast-state-lock + (assert (map? @multicast-state)) + (if-let [existing-q (get-in @multicast-state [:queues conn id])] + (do + (let [new-state (swap! multicast-state + (fn [old-state] + (update-in old-state [:queues conn id] (fn [[end-token? q]] + (if (not= end-token? send-end-token?) + (log/debug "flipped `send-end-token?`") + (log/debug "identical `send-end-token?`")) + [send-end-token? q]))))] + (log/debug "Returning existing queue for id" id) + (assert (instance? BlockingQueue (second existing-q))) + [new-state (second existing-q)])) + (let [needs-multicaster? (nil? (get-in @multicast-state [:queues conn])) + new-q (LinkedBlockingQueue.) + new-state (swap! multicast-state (fn [old-state] (assoc-in old-state [:queues conn id] [send-end-token? new-q])))] + (if needs-multicaster? + (do + (start-multicaster! conn) + (log/debug "Returning new queue for id" id "(multicaster thread started)") + [new-state new-q]) + (do + (log/debug "Returning new queue for id" id "(multicaster thread already running)") + [new-state new-q])))))] + ; wait for multicaster thread to pick up current Queue + (wait-multicast-thread-step conn new-state) + the-q)))) + +(defn- wait-multicast-threads-exit [[old-state new-state]] + (assert (map? old-state)) + (assert (map? new-state)) + (assert (map? (get old-state :queues {}))) + (assert (map? (get new-state :queues {}))) + (assert (map? (get old-state :thread-count {}))) + (assert (map? (get new-state :thread-count {}))) + (locking consumer-state-lock + ; No new multicast threads will be launched inside this block. + ; The lock is already held by parent function. + ; + ; Why do we need to _wait_ for multicaster thread(s) to exit after + ; removing all queue ids for a given connection? + ; Otherwise the following could happen: + ; 1. multicaster thread is sleeping + ; 2. user calls stop-multicaster! + ; One would expect that multicaster thread would exit, but it is still sleeping + ; 3. user calls get-tx-report-queue-multicast! with the same conn + ; The state is now empty, so a new multicaster thread is spawned. + ; 4. Now there is two multicaster threads for the same connection! + ; ... and since the datomic report queue can be shared between threads + ; it will seemingly work, but when the end event is sent, it will be + ; sent by multiple threads. + (let [old-conns (into #{} (keys (get old-state :queues {}))) + new-conns (into #{} (keys (get new-state :queues {})))] + (assert (every? + (fn [x] (instance? Connection x)) + old-conns)) + (assert (every? + (fn [x] (instance? Connection x)) + new-conns)) + (doseq [old-conn old-conns] + (when-not (contains? new-conns old-conn) + (let [old-threadcount (get-in old-state [:thread-count old-conn] nil)] + (assert (= 1 old-threadcount)) + (let [start-ms (System/currentTimeMillis)] + (loop [] + (if (= 0 (get-in @multicast-state [:thread-count old-conn])) + :ok + (do + (let [spent-ms (- (System/currentTimeMillis) start-ms)] + (if (> spent-ms 30000) + (throw (RuntimeException. "Timed out waiting for multicaster thread to exit")) + (do + (Thread/sleep 16) + (recur)))))))))))))) + +(defn- all-queues [state] + (->> (mapcat (fn [[conn qmap]] + (mapv (fn [q-id] [conn q-id]) + (keys qmap))) + (seq (get state :queues {}))) + (into #{}))) + +(comment + (do + (assert (= #{} + (all-queues {}))) + (assert (= #{} + (all-queues {:queues {}}))) + (assert (= #{[:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1}}}))) + (assert (= #{[:conn-a :q-id] [:conn-a :q-id-2]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2}}}))) + (assert (= #{[:conn-a :q-id-2] [:conn-b :q-id-3] [:conn-a :q-id]} + (all-queues {:queues {:conn-a {:q-id 1 :q-id-2 2} + :conn-b {:q-id-3 3}}}))))) + +(defn- removed-queues? [old new] + (not= (all-queues old) + (all-queues new))) + +(defn stop-multicast-consumer-id! [conn id] + (assert (instance? Connection conn)) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] + (let [new-state (dissoc-in old-state [:queues conn id])] + (if (= {} (get-in new-state [:queues conn])) + (dissoc-in old-state [:queues conn]) + new-state))))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(defn stop-multicaster! [conn] + (assert (instance? Connection conn)) + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (dissoc-in old-state [:queues conn])))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(defn stop-all-multicasters! [] + (let [did-remove? (atom nil)] + (locking consumer-state-lock + (wait-multicast-threads-exit + (locking multicast-state-lock + (let [[old new] (swap-vals! multicast-state (fn [old-state] (assoc old-state :queues {})))] + (reset! did-remove? (removed-queues? old new)) + [old new])))) + @did-remove?)) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (require '[datomic.api :as d]) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)))) + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (defn drain! [^BlockingQueue q] + (loop [items []] + (if-let [elem (.poll q 100 TimeUnit/MILLISECONDS)] + (recur (conj items elem)) + items))) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq"} :debug] + ;[#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (log/info "********************************") + (defonce conn (let [uri (str "datomic:mem://demo") + _ (d/delete-database uri) + _ (d/create-database uri) + conn (d/connect uri)] + conn)) + (log/info "stop-all!") + (stop-all-multicasters!) + (assert (= 0 @thread-count)) + (let [q1 (get-tx-report-queue-multicast! conn :q1 false) + q2 (get-tx-report-queue-multicast! conn :q2 false) + _ (get-tx-report-queue-multicast! conn :q1 true)] + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + (log/info "begin drain q1") + (stop-multicast-consumer-id! conn :q1) + (stop-multicast-consumer-id! conn :q1) + (println "thread count" @thread-count) + (let [qitems-2 (drain! q2) + qitems-1 (drain! q1)] + (assert (= :end (last qitems-1))) + (println "drain count q1:" (count qitems-1)) + (println "drain count q2:" (count qitems-2)))))) + +(comment + (do + (let [q (get-tx-report-queue-multicast! conn :q1 true)] + (log/debug "stopping id :q1") + (stop-multicaster-id! conn :q1) + (let [drained (drain! q)] + (println "drained:" drained) + (assert (= [:end] drained))) + @multicast-state))) + +(comment + (stop-all-multicasters!)) + +(comment + (do + (let [q (get-tx-report-queue-multicast! conn :q2 false)] + (println "drain count:" (count (drain! q))) + @multicast-state + nil))) + +(comment + (get-tx-report-queue-multicast! conn :q1 false) + (get-tx-report-queue-multicast! conn :q1 true)) + +(comment + (do + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + @(d/transact conn [{:db/doc "demo"}]) + :yay))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj new file mode 100644 index 0000000..53dfe89 --- /dev/null +++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj @@ -0,0 +1,36 @@ +(ns com.github.ivarref.yoltq.slow-executor-detector + (:require [clojure.string :as str] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.ext-sys :as ext]) + (:import (java.util.concurrent ExecutorService))) + +(defn- do-show-slow-threads [{:keys [start-execute-time + max-execute-time + slow? + slow-thread-show-stacktrace?] + :or {slow-thread-show-stacktrace? true}}] + (let [new-slow-val (atom false)] + (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time] + (when (> (ext/now-ms) (+ start-time max-execute-time)) + (reset! new-slow-val true) + (log/error "thread" (.getName thread) "spent too much time on" + "queue item" (str queue-id) + "for queue" queue-name + (if slow-thread-show-stacktrace? + (str "stacktrace: \n" (str/join "\n" (mapv str (seq (.getStackTrace thread))))) + "")))) + (reset! slow? @new-slow-val))) + +(defn show-slow-threads [^ExecutorService pool config-atom] + (try + (while (not (.isTerminated pool)) + (try + (do-show-slow-threads @config-atom) + (catch Throwable t + (log/error t "do-show-slow-threads crashed:" (ex-message t)))) + (dotimes [_ 3] + (when (not (.isTerminated pool)) + (Thread/sleep 1000)))) + (log/debug "show-slow-threads exiting") + (catch Throwable t + (log/error t "reap! crashed:" (ex-message t))))) diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj new file mode 100644 index 0000000..ee9cd54 --- /dev/null +++ b/src/com/github/ivarref/yoltq/test_queue.clj @@ -0,0 +1,191 @@ +(ns com.github.ivarref.yoltq.test-queue + (:require [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq :as yq] + [datomic.api :as d] + [com.github.ivarref.yoltq.poller :as poller] + [clojure.test :as test] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i]) + (:import (java.util.concurrent BlockingQueue TimeUnit) + (datomic Datom))) + + +(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn] + (let [ready? (promise)] + (future + (let [q (d/tx-report-queue conn)] + (try + (while @running? + (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)] + (swap! txq conj poll-result)) + (deliver ready? true) + (reset! bootstrapped? true)) + (catch Throwable t + (log/error t "test-poller crashed: " (ex-message t))) + (finally + (try + (d/remove-tx-report-queue conn) + (catch Throwable t + (log/warn t "remove-tx-report-queue failed:" (ex-message t)))) + (deliver poller-exited? true))))) + @ready?)) + + +(defmacro with-virtual-queue! + [& body] + `(let [txq# (atom []) + poller-exited?# (promise) + bootstrapped?# (atom false) + running?# (atom true) + config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) + :init-backoff-time 0 + :hung-log-level :warn + :prev-consumed (atom {}) + :tx-queue txq#})] + (with-bindings {#'yq/*config* config# + #'yq/*running?* (atom false) + #'yq/*test-mode* true + #'ext/*now-ms-atom* (atom 0) + #'ext/*random-atom* (atom 0) + #'ext/*squuid-atom* (atom 0)} + (try + ~@body + (finally + (reset! running?# false) + (when @bootstrapped?# + @poller-exited?#)))))) + + +(defn call-with-virtual-queue! + [f] + (with-virtual-queue! + (f))) + + +(defn run-report-queue! [min-items] + (let [{:keys [tx-queue conn]} @yq/*config* + id-ident (d/q '[:find ?e . + :where [?e :db/ident :com.github.ivarref.yoltq/id]] + (d/db conn))] + (let [timeout (+ 3000 (System/currentTimeMillis))] + (while (and (< (System/currentTimeMillis) timeout) + (< (count @tx-queue) min-items)) + (Thread/sleep 10))) + (when (< (count @tx-queue) min-items) + (let [msg (str "run-report-queue: timeout waiting for " min-items " items")] + (log/error msg) + (throw (ex-info msg {})))) + (let [res (atom [])] + (doseq [itm (first (swap-vals! tx-queue (constantly [])))] + (rq/process-poll-result! + @yq/*config* + id-ident + itm + (fn [f] (swap! res conj (f))))) + @res))) + + +(defn run-one-report-queue! [] + (first (run-report-queue! 1))) + + +(defn run-queue-once! [q status] + (poller/poll-once! @yq/*config* q status)) + + +(defn put! [q payload] + @(d/transact (:conn @yq/*config*) [(yq/put q payload)])) + + +(defn transact-result->maps [{:keys [tx-data db-after]}] + (let [m (->> tx-data + (group-by (fn [^Datom d] (.e d))) + (vals) + (mapv (fn [datoms] + (reduce (fn [o ^Datom d] + (if (.added d) + (assoc o (d/q '[:find ?r . + :in $ ?e + :where [?e :db/ident ?r]] + db-after + (.a d)) + (.v d)) + o)) + {} + datoms))))] + m)) + +(defn contains-queue-job? + [queue-id conn {::yq/keys [id queue-name status] :as m}] + (when (and (= queue-id queue-name) + (= status :init) + (d/q '[:find ?e . + :in $ ?id + :where + [?e ::yq/id ?id] + [?e ::yq/status :init]] + (d/db conn) + id)) + m)) + + +(defn get-tx-q-job [q-id] + (let [{:keys [tx-queue conn]} @yq/*config*] + (loop [timeout (+ 3000 (System/currentTimeMillis))] + (if-let [job (->> @tx-queue + (mapcat transact-result->maps) + (filter (partial contains-queue-job? q-id conn)) + (first))] + (u/get-queue-item (d/db conn) (::yq/id job)) + (if (< (System/currentTimeMillis) timeout) + (do (Thread/sleep 10) + (recur timeout)) + nil))))) + +(defmacro consume-expect! [queue-name expected-status] + `(if-let [job# (get-tx-q-job ~queue-name)] + (try + (with-bindings (:com.github.ivarref.yoltq/bindings job#) + (let [prep# (u/prepare-processing (d/db (:conn @yq/*config*)) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + (:com.github.ivarref.yoltq/status job#))] + (if-let [depends-on# (i/depends-on-waiting? @yq/*config* prep#)] + depends-on# + (let [res# (some->> prep# + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) + (test/is (= ~expected-status (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#)))))) + (catch Throwable t# + (log/error t# "unexpected error in consume-expect:" (ex-message t#)) + (throw t#))) + (test/is false (str "No job found for queue " ~queue-name)))) + +(defmacro consume! [queue-name] + `(consume-expect! ~queue-name :done)) + + +(defmacro force-retry! [queue-name] + `(if-let [job# (some-> @yq/*config* :prev-consumed deref (get ~queue-name))] + (let [db-res# @(d/transact (:conn @yq/*config*) [{:com.github.ivarref.yoltq/id (:com.github.ivarref.yoltq/id job#) + :com.github.ivarref.yoltq/status :init}]) + res# (some->> (u/prepare-processing (:db-after db-res#) + (:com.github.ivarref.yoltq/id job#) + ~queue-name + (:com.github.ivarref.yoltq/lock job#) + :init) + (i/take! @yq/*config*) + (i/execute! @yq/*config*))] + (swap! (:prev-consumed @yq/*config*) assoc ~queue-name res#) + (test/is (= :done (:com.github.ivarref.yoltq/status res#))) + (if (:retval res#) + (:retval res#) + (:exception res#))) + (test/is false "Expected to have previously consumed something. Was nil."))) diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj new file mode 100644 index 0000000..9defd0e --- /dev/null +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -0,0 +1,179 @@ +(ns com.github.ivarref.yoltq.utils + (:require [datomic.api :as d] + [clojure.edn :as edn] + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.tools.logging :as log]) + (:refer-clojure :exclude [random-uuid]) + (:import (datomic Connection) + (java.time Duration))) + + +(def status-init :init) +(def status-processing :processing) +(def status-done :done) +(def status-error :error) + +(def current-version "2") + +(defn duration->millis [m] + (reduce-kv (fn [o k v] + (if (instance? Duration v) + (assoc o k (.toMillis v)) + (assoc o k v))) + {} + m)) + + +(defn squuid [] + (ext/squuid)) + + +(defn random-uuid [] + (ext/random-uuid)) + + +(defn now-ms [] + (ext/now-ms)) + + +(defn root-cause [e] + (if-let [root (ex-cause e)] + (root-cause root) + e)) + + +(defn db-error-map [^Throwable t] + (loop [e t] + (cond (nil? e) nil + + (and (map? (ex-data e)) + (contains? (ex-data e) :db/error)) + (ex-data e) + + :else + (recur (ex-cause e))))) + + +(defn get-queue-item [db id] + (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) + (dissoc :db/id) + (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {}))) + (update :com.github.ivarref.yoltq/bindings + (fn [s] + (when s + (->> s + (edn/read-string) + (reduce-kv (fn [o k v] + (assoc o (resolve k) v)) + {}))))))) + + +(defn prepare-processing [db id queue-name old-lock old-status] + (let [new-lock (random-uuid)] + {:id id + :lock new-lock + :queue-name queue-name + :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) + :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]})) + + +(defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [db (or db (d/db conn))] + (if-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff ?current-version + :where + [?e :com.github.ivarref.yoltq/status :init] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(>= ?backoff ?init-time)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] + db + queue-name + (- (now-ms) init-backoff-time) + current-version) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing db id queue-name old-lock :init)) + (log/debug "no new-items in :init status for queue" queue-name)))) + +(defn- get-max-retries [cfg queue-name] + (let [v (get-in cfg [:handlers queue-name :max-retries] (:max-retries cfg))] + (if (and (number? v) (pos-int? v)) + v + Long/MAX_VALUE))) + +(defn get-error [{:keys [conn db error-backoff-time] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [db (or db (d/db conn)) + max-retries (get-max-retries cfg queue-name)] + (when-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff ?max-tries ?current-version + :where + [?e :com.github.ivarref.yoltq/status :error] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/error-time ?time] + [(>= ?backoff ?time)] + [?e :com.github.ivarref.yoltq/tries ?tries] + [(>= ?max-tries ?tries)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] + db + queue-name + (- (now-ms) error-backoff-time) + max-retries + current-version) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing db id queue-name old-lock :error))))) + + +(defn get-hung [{:keys [conn db now hung-backoff-time] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [now (or now (now-ms)) + max-retries (get-max-retries cfg queue-name) + db (or db (d/db conn))] + (when-let [ids (->> (d/q '[:find ?id ?lock ?tries + :in $ ?qname ?backoff ?current-version + :where + [?e :com.github.ivarref.yoltq/status :processing] + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/processing-time ?time] + [(>= ?backoff ?time)] + [?e :com.github.ivarref.yoltq/tries ?tries] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] + db + queue-name + (- now hung-backoff-time) + current-version) + (not-empty))] + (let [new-lock (random-uuid) + [id old-lock tries _t] (rand-nth (into [] ids)) + to-error? (>= tries max-retries)] + {:id id + :lock new-lock + :queue-name queue-name + :was-hung? true + :to-error? to-error? + :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) + :tx (if (not to-error?) + [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}] + [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status status-processing status-error] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}])})))) diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj new file mode 100644 index 0000000..4d92b81 --- /dev/null +++ b/test/com/github/ivarref/yoltq/error_poller_test.clj @@ -0,0 +1,35 @@ +(ns com.github.ivarref.yoltq.error-poller-test + (:require [clojure.edn :as edn] + [clojure.test :refer [deftest is]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.error-poller :as ep] + [com.github.ivarref.yoltq.log-init :as logconfig])) + + +(deftest error-poller + (logconfig/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"*"} (edn/read-string + (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]]) + (let [cfg {:system-error-callback-backoff 100} + time (atom 0) + tick! (fn [& [amount]] + (swap! time + (or amount 1))) + verify (fn [state now-ns error-count expected-callback] + (let [{:keys [errors state run-callback] :as res} (ep/handle-error-count state cfg now-ns error-count)] + (log/info errors "=>" state "::" run-callback) + (is (= expected-callback run-callback)) + res))] + (-> {} + (verify (tick!) 0 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 :error) + (verify (tick! 100) 0 nil) + (verify (tick!) 0 :error) + (verify (tick!) 0 :recovery) + (verify (tick!) 1 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 :error) + (verify (tick! 100) 1 nil) + (verify (tick!) 1 :error)))) diff --git a/test/com/github/ivarref/yoltq/http_hang_demo.clj b/test/com/github/ivarref/yoltq/http_hang_demo.clj new file mode 100644 index 0000000..06d877b --- /dev/null +++ b/test/com/github/ivarref/yoltq/http_hang_demo.clj @@ -0,0 +1,45 @@ +(ns com.github.ivarref.yoltq.http-hang-demo + (:require [datomic.api :as d] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.log-init]) + (:import (java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers))) + +(comment + (do + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq*"} :debug] + [#{"*"} :info]]) + (yq/stop!) + (let [received (atom []) + uri (str "datomic:mem://hello-world-" (java.util.UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri)] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 5) + :poll-delay 5 + :system-error-poll-interval [1 TimeUnit/MINUTES] + :system-error-callback-backoff (Duration/ofHours 1) + :max-execute-time (Duration/ofSeconds 3) + :on-system-error (fn [] (log/error "system in error state")) + :on-system-recovery (fn [] (log/info "system recovered"))}) + (yq/add-consumer! :slow (fn [_payload] + (log/info "start slow handler...") + ; sudo tc qdisc del dev wlp3s0 root netem + ; sudo tc qdisc add dev wlp3s0 root netem delay 10000ms + ; https://jvns.ca/blog/2017/04/01/slow-down-your-internet-with-tc/ + (let [request (-> (HttpRequest/newBuilder) + (.uri (java.net.URI. "https://postman-echo.com/get")) + (.timeout (Duration/ofSeconds 10)) + (.GET) + (.build))] + (log/info "body is:" (-> (.send (HttpClient/newHttpClient) request (HttpResponse$BodyHandlers/ofString)) + (.body)))) + (log/info "slow handler is done!"))) + (yq/start!) + @(d/transact conn [(put :slow {:work 123})]) + #_(dotimes [x 1] @(d/transact conn [(put :q {:work x})])) + nil))))
\ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj new file mode 100644 index 0000000..7eae557 --- /dev/null +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -0,0 +1,66 @@ +(ns com.github.ivarref.yoltq.log-init + (:require [clojure.term.colors :as colors] + [taoensso.timbre :as timbre] + [clojure.string :as str])) + +(set! *warn-on-reflection* true) + +(def level-colors + {;:warn colors/red + :error colors/red}) + +(def ^:dynamic *override-color* nil) + +(defn min-length [n s] + (loop [s s] + (if (>= (count s) n) + s + (recur (str s " "))))) + +(defn local-console-format-fn + "A simpler log format, suitable for readable logs during development. colorized stacktraces" + [data] + (try + (let [{:keys [level ?err msg_ ?ns-str ?file hostname_ + timestamp_ ?line context]} data + ts (force timestamp_)] + (let [color-f (if (nil? *override-color*) + (get level-colors level str) + colors/green) + maybe-stacktrace (when ?err + (str "\n" (timbre/stacktrace ?err)))] + (cond-> (str #_(str ?ns-str ":" ?line) + #_(min-length (count "[yoltq:326] ") + (str + "[" + (some-> ?ns-str + (str/split #"\.") + (last)) + ":" ?line)) + ts + " " + (color-f (min-length 5 (str/upper-case (name level)))) + " " + + (when-let [x-req-id (:x-request-id context)] + (str "[" x-req-id "] ")) + #_(.getName ^Thread (Thread/currentThread)) + + (color-f (force msg_)) + + maybe-stacktrace)))) + + + (catch Throwable t + (println "error in local-console-format-fn:" (ex-message t)) + nil))) + + +(defn init-logging! [min-levels] + (timbre/merge-config! + {:min-level min-levels + :timestamp-opts {:pattern "HH:mm:ss.SSS" + :timezone :jvm-default} + :output-fn (fn [data] (local-console-format-fn data)) + :appenders {:println (timbre/println-appender {:stream :std-out})}})) + diff --git a/test/com/github/ivarref/yoltq/migrate_test.clj b/test/com/github/ivarref/yoltq/migrate_test.clj new file mode 100644 index 0000000..0063631 --- /dev/null +++ b/test/com/github/ivarref/yoltq/migrate_test.clj @@ -0,0 +1,92 @@ +(ns com.github.ivarref.yoltq.migrate-test + (:require [clojure.test :refer [deftest is]] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.migrate :as m] + [com.github.ivarref.yoltq.impl :as impl] + [com.github.ivarref.yoltq.test-utils :as tu] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d])) + + +(deftest to-v2-migration + (with-bindings {#'ext/*squuid-atom* (atom 0)} + (let [conn (tu/empty-conn)] + @(d/transact conn impl/schema) + @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid) + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-processing + :com.github.ivarref.yoltq/init-time 1 + :com.github.ivarref.yoltq/processing-time 2}]) + @(d/transact conn [{:com.github.ivarref.yoltq/id (u/squuid) + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/init-time 3}]) + (is (= [[[:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/version + nil + "2"] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/init-time + 1 + 1000] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000001"] + :com.github.ivarref.yoltq/processing-time + 2 + 1000]] + [[:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000002"] + :com.github.ivarref.yoltq/version + nil + "2"] + [:db/cas + [:com.github.ivarref.yoltq/id + #uuid "00000000-0000-0000-0000-000000000002"] + :com.github.ivarref.yoltq/init-time + 3 + 1000]]] + (m/migrate! {:conn conn + :now-ms 1000 + :loop? true}))) + (is (= [] + (m/migrate! {:conn conn + :now-ms 1000 + :loop? true})))))) + + +(deftest to-v2-migration-real-time + (with-bindings {#'ext/*squuid-atom* (atom 0)} + (let [conn (tu/empty-conn) + id (u/squuid)] + @(d/transact conn impl/schema) + @(d/transact conn [{:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name :dummy + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/init-time 1}]) + (Thread/sleep 100) + @(d/transact conn [{:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/init-time 2}]) + (let [tx-times (->> (d/q '[:find [?txinst ...] + :in $ ?e + :where + [?e :com.github.ivarref.yoltq/init-time _ ?tx true] + [?tx :db/txInstant ?txinst]] + (d/history (d/db conn)) + [:com.github.ivarref.yoltq/id id]) + (sort) + (vec))] + (is (= 2 (count tx-times))) + (m/migrate! {:conn conn}) + (is (= (.getTime (last tx-times)) + (d/q '[:find ?init-time . + :in $ ?e + :where + [?e :com.github.ivarref.yoltq/init-time ?init-time]] + (d/db conn) + [:com.github.ivarref.yoltq/id id]))))))) diff --git a/test/com/github/ivarref/yoltq/readme_demo.clj b/test/com/github/ivarref/yoltq/readme_demo.clj new file mode 100644 index 0000000..eae0a3e --- /dev/null +++ b/test/com/github/ivarref/yoltq/readme_demo.clj @@ -0,0 +1,48 @@ +(ns com.github.ivarref.yoltq.readme-demo + (:require [clojure.tools.logging :as log] + [datomic.api :as d] + [com.github.ivarref.yoltq :as yq]) + (:import (java.util UUID))) + + +(defonce conn + (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (d/connect uri))) + + +(com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq*"} :debug] + [#{"*"} :info]]) + + +(yq/stop!) + +(yq/init! {:conn conn}) + + +(yq/add-consumer! :q + (let [cnt (atom 0)] + (fn [payload] + (when (= 1 (swap! cnt inc)) + (throw (ex-info "failed" {}))) + (log/info "got payload" payload)))) + +(yq/start!) + +@(d/transact conn [(yq/put :q {:work 123})]) + +(comment + (yq/add-consumer! :q (fn [_] (throw (ex-info "always fail" {}))))) + +(comment + @(d/transact conn [(yq/put :q {:work 123})])) + +(comment + (do + (yq/add-consumer! :q (fn [_] :ok)) + nil)) diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj new file mode 100644 index 0000000..0c1b2f0 --- /dev/null +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -0,0 +1,80 @@ +(ns com.github.ivarref.yoltq.test-utils + (:require [com.github.ivarref.yoltq.log-init :as logconfig] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq :as yq] + [datomic.api :as d] + [clojure.string :as str] + [com.github.ivarref.yoltq.impl :as i] + [clojure.edn :as edn] + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.pprint :as pp]) + (:import (java.util UUID) + (java.time Duration))) + + +(logconfig/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"*"} (edn/read-string + (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]]) + + +(defn empty-conn [] + (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (d/connect uri))) + + +(defn break [] + (log/info (str/join "*" (repeat 60 "")))) + + +(defn clear [] + (.print System/out "\033[H\033[2J") + (.flush System/out) + (break)) + + +(defn put-transact! [id payload] + @(d/transact (:conn @yq/*config*) [(i/put @yq/*config* id payload {})])) + + +(defn advance! [tp] + (assert (some? ext/*now-ms-atom*) "Expected to be running in test-mode!") + (swap! ext/*now-ms-atom* + (if (number? tp) + tp + (.toMillis ^Duration tp)))) + + +(defn done-count [] + (d/q '[:find (count ?e) . + :where + [?e :com.github.ivarref.yoltq/id _] + [?e :com.github.ivarref.yoltq/status :done]] + (d/db (:conn @yq/*config*)))) + + +(defn pp [x] + (pp/pprint x) + x) + +(defn get-init [& args] + (apply u/get-init @yq/*config* args)) + + +(defn get-error [& args] + (apply u/get-error @yq/*config* args)) + + +(defn get-hung [& args] + (apply u/get-hung @yq/*config* args)) + + +(defn take! [& args] + (apply i/take! @yq/*config* args)) + + +(defn execute! [& args] + (apply i/execute! @yq/*config* args)) + diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj new file mode 100644 index 0000000..a2ed269 --- /dev/null +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -0,0 +1,474 @@ +(ns com.github.ivarref.yoltq.virtual-test + (:require + [clojure.string :as str] + [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.error-poller :as error-poller] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.migrate :as migrate] + [com.github.ivarref.yoltq.test-queue :as tq] + [com.github.ivarref.yoltq.test-utils :as u] + [com.github.ivarref.yoltq.utils :as uu] + [datomic-schema.core] + [datomic.api :as d] + [taoensso.nippy :as nippy] + [taoensso.timbre :as timbre]) + (:import (java.time Duration LocalDateTime))) + + +(use-fixtures :each tq/call-with-virtual-queue!) + + +(deftest happy-case-1 + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))))) + +(deftest happy-case-no-migration-for-new-entities + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (tq/consume! :q))) + (is (= [] (migrate/migrate! @yq/*config*))))) + +(deftest happy-case-tx-report-q + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q {:work 123})]) + (is (= {:work 123} (:retval (tq/run-one-report-queue!)))) + (is (= 1 (u/done-count))))) + + +(deftest happy-case-poller + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :handlers {:q {:f (fn [payload] payload)}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (= {:work 123} (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :retval))))) + + +(deftest happy-case-queue-fn-allow-cas-failure + (let [conn (u/empty-conn) + invoke-count (atom 0)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q + (fn [{:keys [id]}] + (swap! invoke-count inc) + @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]])) + {:allow-cas-failure? #{:e/val}}) + @(d/transact conn #d/schema [[:e/id :one :string :id] + [:e/val :one :string]]) + @(d/transact conn [{:e/id "demo"} + (yq/put :q {:id "demo"})]) + (u/advance! (:init-backoff-time yq/default-opts)) + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (log/info "mark-status! doing nothing for new status" new-status))) + (is (nil? (some->> (u/get-init :q) + (u/take!) + (u/execute!)))) + (swap! yq/*config* dissoc :mark-status-fn!) + + ; (mark-status! :done) failed but f succeeded. + (is (nil? (u/get-hung :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + (is (some? (u/get-hung :q))) + (is (true? (some->> (u/get-hung :q) + (u/take!) + (u/execute!) + :allow-cas-failure?))) + (is (= 2 @invoke-count)))) + + +(deftest backoff-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (is (nil? (u/get-init :q))) + + (u/advance! (dec (:init-backoff-time yq/default-opts))) + (is (nil? (u/get-init :q))) + (u/advance! 1) + (is (some? (u/get-init :q))) + + (is (some? (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :exception))) + + (u/advance! (dec (:error-backoff-time @yq/*config*))) + (is (nil? (u/get-error :q))) + (u/advance! 1) + (is (some? (u/get-error :q))))) + + +(deftest get-hung-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (is (= :processing (some->> (u/get-init :q) + (u/take!) + :com.github.ivarref.yoltq/status))) + + (is (nil? (u/get-hung :q))) + (u/advance! (dec (:hung-backoff-time yq/default-opts))) + (is (nil? (u/get-hung :q))) + (u/advance! 1) + (is (some? (u/get-hung :q))))) + + +(deftest basic-locking + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :cas-failures (atom 0) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (let [job (u/get-init :q)] + (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status))) + (u/take! job) + (is (= 1 @(:cas-failures @yq/*config*)))))) + + +(deftest retry-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f + (let [c (atom 0)] + (fn [_] + (if (<= (swap! c inc) 2) + (throw (ex-info "janei" {})) + ::ok)))}}}) + (u/put-transact! :q {:work 123}) + + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))))) + + +(deftest max-retries-test + (let [conn (u/empty-conn) + call-count (atom 0)] + (yq/init! {:conn conn + :error-backoff-time 0}) + (yq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 1}) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) + + (dotimes [_ 10] + (tq/run-queue-once! :q :error)) + (is (= 2 @call-count)))) + + +(deftest max-retries-test-two + (let [conn (u/empty-conn) + call-count (atom 0)] + (yq/init! {:conn conn + :error-backoff-time 0}) + (yq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 3}) + (tq/put! :q {:work 123}) + (is (some? (:exception (tq/run-one-report-queue!)))) + + (timbre/with-level :fatal + (dotimes [_ 20] + (tq/run-queue-once! :q :error))) + (is (= 4 @call-count)))) + + +(deftest hung-to-error + (let [conn (u/empty-conn) + call-count (atom 0) + missed-mark-status (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q + (fn [_] + (if (= 1 (swap! call-count inc)) + (throw (ex-info "error" {})) + (log/info "return OK")))) + (tq/put! :q {:id "demo"}) + (tq/run-one-report-queue!) ; now in status :error + + + (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (reset! missed-mark-status new-status) + (log/info "mark-status! doing nothing for new status" new-status))) + (u/advance! (:error-backoff-time @yq/*config*)) + (tq/run-queue-once! :q :error) + (swap! yq/*config* dissoc :mark-status-fn!) + (is (= :done @missed-mark-status)) + + (is (nil? (uu/get-hung @yq/*config* :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + + (is (some? (uu/get-hung @yq/*config* :q))) + + (is (= 2 @call-count)) + + (is (true? (some->> (uu/get-hung (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q) + (i/take! @yq/*config*) + (i/execute! @yq/*config*) + :failed?))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (uu/get-error @yq/*config* :q))) + (is (nil? (uu/get-error (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q))))) + + +(deftest consume-expect-test + (let [conn (u/empty-conn) + seen (atom #{})] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [payload] + (when (= #{1 2} (swap! seen conj payload)) + (throw (ex-info "oops" {}))) + payload)) + + @(d/transact conn [(yq/put :q 1)]) + @(d/transact conn [(yq/put :q 2)]) + + (is (= 1 (tq/consume-expect! :q :done))) + (tq/consume-expect! :q :error))) + + +(def ^:dynamic *some-binding* nil) + + +(deftest binding-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn + :capture-bindings [#'*some-binding* #'timbre/*context*]}) + (yq/add-consumer! :q (fn [_] *some-binding*)) + (binding [timbre/*context* {:x-request-id "wooho"}] + (binding [*some-binding* 1] + @(d/transact conn [(yq/put :q nil)])) + (binding [*some-binding* 2] + @(d/transact conn [(yq/put :q nil)])) + @(d/transact conn [(yq/put :q nil)])) + + (is (= 1 (tq/consume-expect! :q :done))) + (is (= 2 (tq/consume-expect! :q :done))) + (is (nil? (tq/consume-expect! :q :done))))) + + +(deftest default-binding-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*))) + (binding [timbre/*context* {:x-request-id "123"}] + @(d/transact conn [(yq/put :q nil)])) + (is (= "123" (tq/consume-expect! :q :done))))) + + +(deftest force-retry-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (let [cnt (atom 0)] + (fn [_] (swap! cnt inc)))) + @(d/transact conn [(yq/put :q nil)]) + (is (= 1 (tq/consume! :q))) + (is (= 2 (tq/force-retry! :q))) + (is (= 3 (tq/force-retry! :q))))) + + +(deftest ext-id-no-duplicates + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity) + @(d/transact conn [(yq/put :q nil {:id "123"})]) + (is (thrown? Exception @(d/transact conn [(yq/put :q nil {:id "123"})]))))) + + +(deftest depends-on + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity) + @(d/transact conn [(yq/put :a {:id "a1"} {:id "a1"})]) + @(d/transact conn [(yq/put :b {:id "b1"} {:depends-on [:a "a1"]})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "a1"]} (tq/consume! :b))) + + (is (= {:id "a1"} (tq/consume! :a))) + (is (= {:id "b1"} (tq/consume! :b))))) + + +(deftest depends-on-queue-level + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (yq/add-consumer! :b identity {:depends-on (fn [{:keys [id]}] [:a id])}) + @(d/transact conn [(yq/put :a {:id "1"} {:id "1"})]) + @(d/transact conn [(yq/put :b {:id "1"})]) + + ; can't consume :b yet: + (is (= {:depends-on [:a "1"]} (tq/consume! :b))) + + (is (= {:id "1"} (tq/consume! :a))) + (is (= {:id "1"} (tq/consume! :b))))) + + +(deftest verify-can-read-string + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :a identity) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})])))))) + + +(deftest payload-verifier + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q identity + {:valid-payload? (fn [{:keys [id]}] + (some? id))}) + @(d/transact conn [(yq/put :q {:id "a"})]) + (timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + + +(defn my-consumer + {:yoltq/queue-id :some-q} + [state payload] + (swap! state conj payload)) + +(deftest queue-id-can-be-var + (let [conn (u/empty-conn) + received (atom #{})] + (yq/init! {:conn conn}) + (yq/add-consumer! #'my-consumer (partial my-consumer received)) + @(d/transact conn [(yq/put #'my-consumer {:id "a"})]) + (tq/consume! :some-q) + (is (= #{{:id "a"}} @received)) + #_(timbre/with-level :fatal + (is (thrown? Exception @(d/transact conn [(yq/put :q {})])))))) + +(deftest healthy-allowed-error-time-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [_] (throw (ex-info "" {})))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume-expect! :q :error) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms)))) + (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms))))) + (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms))))))) + +(deftest global-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn + :encode nippy/freeze + :decode nippy/thaw}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest queue-encode-decode + (let [conn (u/empty-conn) + ldt (LocalDateTime/now) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:encode nippy/freeze + :decode nippy/thaw}) + @(d/transact conn [(yq/put :q {:work ldt})]) + (tq/consume! :q) + (is (= @got-work {:work ldt})))) + +(deftest global-partition + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :partition-fn (fn [_queue-name] :my-part)}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest partition-per-queue + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q (fn [work] (reset! got-work work)) + {:partition-fn (fn [_queue-name] :my-part)}) + @(d/transact conn [(yq/put :q {:work 123})]) + (tq/consume! :q) + (is (some? (d/q '[:find ?e . + :in $ ?part + :where + [?e :db/ident ?part]] + (d/db conn) + :my-part))) + (is (= @got-work {:work 123})))) + +(deftest string-encode-decode + (let [conn (u/empty-conn) + got-work (atom nil)] + (yq/init! {:conn conn + :encode (fn [x] (str/join (reverse x))) + :decode (fn [x] (str/join (reverse x)))}) + (yq/add-consumer! :q (fn [work] (reset! got-work work))) + @(d/transact conn [(yq/put :q "asdf")]) + (tq/consume! :q) + (is (= @got-work "asdf")))) + +(deftest job-group-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q1 identity) + (yq/add-consumer! :q2 identity) + @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1}) + (yq/put :q1 {:work 456} {:job-group :group2}) + (yq/put :q2 {:work 789} {:job-group :group1})]) + (is (= [{:qname :q1 + :job-group :group1 + :status :init + :count 1}] + (yq/job-group-progress :q1 :group1))) + + (is (= {:work 123} (tq/consume! :q1))) + + (is (= [{:qname :q1 + :job-group :group1 + :status :done + :count 1}] + (yq/job-group-progress :q1 :group1))))) |
