diff options
| -rw-r--r-- | .gitignore | 12 | ||||
| -rw-r--r-- | README.md | 105 | ||||
| -rw-r--r-- | deps.edn | 28 | ||||
| -rw-r--r-- | pom.xml | 36 | ||||
| -rwxr-xr-x | release.sh | 20 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 175 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/error_poller.clj | 109 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/ext_sys.clj | 26 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/impl.clj | 147 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/poller.clj | 51 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/report_queue.clj | 54 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/slow_executor_detector.clj | 28 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/utils.clj | 154 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/virtual_queue.clj | 94 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/error_poller_test.clj | 35 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/http_hang_demo.clj | 45 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/log_init.clj | 61 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/readme_demo.clj | 48 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/test_utils.clj | 74 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/virtual_test.clj | 232 |
20 files changed, 1534 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cb9a7ca --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +.idea/ +*.iml +.cpcache/ +.nrepl-port +target/ +.connkey +tree.txt +.db.url +.stage-url.txt +*.pom.asc +*.pom +temp/
\ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..29c67da --- /dev/null +++ b/README.md @@ -0,0 +1,105 @@ +# yoltq + +An opinionated Datomic queue for building (more) reliable systems. +Implements the [transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html) +pattern. +Supports retries, backoff and more. +On-prem only. + +## Installation + +... + +## 1-minute example + +```clojure +(require '[com.github.ivarref.yoltq :as yq]) + +(def conn (datomic.api/connect "...")) + +; Initialize system +(yq/init! {:conn conn}) + +; Add a queue consumer that will intentionally fail on the first attempt +(yq/add-consumer! :q + (let [cnt (atom 0)] + (fn [payload] + (when (= 1 (swap! cnt inc)) + ; A consumer throwing an exception is considered a queue job failure + (throw (ex-info "failed" {}))) + ; Anything else than a throwing exception is considered a queue job success + ; This includes nil, false and everything else. + (log/info "got payload" payload)))) + +; Start threadpool +(yq/start!) + +; Queue a job +@(d/transact conn [(yq/put :q {:work 123})]) + +; On your console you will see something like this: +; 17:29:54.598 DEBUG queue item 613... for queue :q is pending status :init +; 17:29:54.602 DEBUG queue item 613... for queue :q now has status :processing +; 17:29:54.603 DEBUG queue item 613... for queue :q is now processing +; 17:29:54.605 WARN queue-item 613... for queue :q now has status :error after 1 try in 4.8 ms +; 17:29:54.607 WARN error message was: "failed" for queue-item 613... +; 17:29:54.615 WARN ex-data was: {} for queue-item 613... +; The item is so far failed... + +; But after approximately 10 seconds have elapsed, the item will be retried: +; 17:30:05.596 DEBUG queue item 613... for queue :q now has status :processing +; 17:30:05.597 DEBUG queue item 613... for queue :q is now processing +; 17:30:05.597 INFO got payload {:work 123} +; 17:30:05.599 INFO queue-item 613... for queue :q now has status :done after 2 tries in 5999.3 ms +; And then it has succeeded. +``` + +## Rationale + +Integrating with external systems that may be unavailable can be tricky. +Imagine the following code: + +```clojure +(defn post-handler [user-input] + (let [db-item (process user-input) + ext-ref (clj-http.client/post ext-service {...})] ; may throw exception + @(d/transact conn [(assoc db-item :some/ext-ref ext-ref)]))) +``` + +What if the POST request fails? Should it be retried? For how long? +Should it be allowed to fail? How do you then process failures later? + +The queue way to solve this would be: + +```clojure +(defn get-ext-ref [{:keys [id]}] + (let [ext-ref (clj-http.client/post ext-service {...})] ; may throw exception + @(d/transact conn [[:db/cas [:some/id id] + :some/ext-ref + nil + ext-ref]]))) + +(yq/add-consumer! :get-ext-ref get-ext-ref {:allow-cas-failure? true}) + +(defn post-handler [user-input] + (let [{:some/keys [id] :as db-item} (process user-input) + @(d/transact conn [db-item + (yq/put :get-ext-ref {:id id})]))) + +``` + +Here `post-handler` will always succeed as long as the transaction commits. + +`get-ext-ref` may fail multiple times if `ext-service` is down. +This is fine as long as it eventually succeeds. + +There is a special case where `get-ext-ref` succeeds, but +saving the new queue job status to the database fails. +Thus `get-ext-ref` and any queue consumer should tolerate to +be executed successfully several times. + +For `get-ext-ref` this is solved by using +the database function [:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas) +to achieve a write-once behaviour. +The yoltq system treats cas failures as job successes +when a consumer has `:allow-cas-failure?` set to `true` in its options. diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..cf8297c --- /dev/null +++ b/deps.edn @@ -0,0 +1,28 @@ +{:deps {org.clojure/tools.logging {:mvn/version "1.1.0"} + org.clojure/clojure {:mvn/version "1.10.3"}} + + :paths ["src"] + + :aliases {:test {:extra-paths ["test"] + :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"} + com.taoensso/timbre {:mvn/version "5.1.2"} + com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"} + clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"} + com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]} + org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"} + io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}} + :jvm-opts ["-DDISABLE_SPY=true" + "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"] + :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]} + + :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git" + :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}} + :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]} + + :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}} + + :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.1.3"}} + :main-opts ["-m" "deps-deploy.deps-deploy" "deploy" + "target/out.jar" "true"]}} + + :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}} @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <groupId>com.github.ivarref</groupId> + <artifactId>yoltq</artifactId> + <version>0.1.3</version> + <name>yoltq</name> + <dependencies> + <dependency> + <groupId>org.clojure</groupId> + <artifactId>clojure</artifactId> + <version>1.10.3</version> + </dependency> + <dependency> + <groupId>org.clojure</groupId> + <artifactId>tools.logging</artifactId> + <version>1.1.0</version> + </dependency> + </dependencies> + <build> + <sourceDirectory>src</sourceDirectory> + </build> + <repositories> + <repository> + <id>clojars</id> + <url>https://repo.clojars.org/</url> + </repository> + </repositories> + <scm> + <connection>scm:git:git://github.com/ivarref/yoltq.git</connection> + <developerConnection>scm:git:ssh://git@github.com/ivarref/yoltq.git</developerConnection> + <tag>v0.1.3</tag> + <url>https://github.com/ivarref/yoltq</url> + </scm> +</project>
\ No newline at end of file diff --git a/release.sh b/release.sh new file mode 100755 index 0000000..70f67b5 --- /dev/null +++ b/release.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +set -ex + +clojure -Spom +clojure -M:test +clojure -M:jar +clojure -X:release ivarref.pom-patch/clojars-repo-only! +VERSION=$(clojure -X:release ivarref.pom-patch/set-patch-version! :patch :commit-count+1) + +git add pom.xml +git commit -m "Release $VERSION" +git tag -a v$VERSION -m "Release v$VERSION" +git push --follow-tags + +clojure -M:deploy + +echo "Released $VERSION" + +rm *.pom.asc
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj new file mode 100644 index 0000000..565c01d --- /dev/null +++ b/src/com/github/ivarref/yoltq.clj @@ -0,0 +1,175 @@ +(ns com.github.ivarref.yoltq + (:require [datomic-schema.core] + [datomic.api :as d] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.poller :as poller] + [com.github.ivarref.yoltq.error-poller :as errpoller] + [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor] + [com.github.ivarref.yoltq.utils :as u]) + (:import (datomic Connection) + (java.util.concurrent Executors TimeUnit ExecutorService) + (java.time Duration))) + + +(defonce ^:dynamic *config* (atom nil)) +(defonce threadpool (atom nil)) +(defonce ^:dynamic *running?* (atom false)) +(defonce ^:dynamic *test-mode* false) + + +(def default-opts + (-> {; Default number of times a queue job will be retried before giving up + ; Can be overridden on a per consumer basis with + ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200}) + :max-retries 100 + + ; Minimum amount of time to wait before a failed queue job is retried + :error-backoff-time (Duration/ofSeconds 5) + + ; Max time a queue job can execute before an error is logged + :max-execute-time (Duration/ofMinutes 5) + + ; Amount of time an in progress queue job can run before it is considered failed + ; and will be marked as such. + :hung-backoff-time (Duration/ofMinutes 30) + + ; Most queue jobs in init state will be consumed by the tx-report-queue listener. + ; However in the case where a init job was added right before the application + ; was shut down and did not have time to be processed by the tx-report-queue listener, + ; it will be consumer by the init poller. This init poller backs off by + ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could + ; otherwise occur if competing with the tx-report-queue listener. + :init-backoff-time (Duration/ofSeconds 60) + + ; How frequent polling for init, error and hung jobs should be done. + :poll-delay (Duration/ofSeconds 10) + + ; Specifies the number of threads available for executing queue and polling jobs. + ; The final thread pool will be this size + 2. + ; + ; One thread is permanently allocated for listening to the + ; tx-report-queue. + ; + ; Another thread is permanently allocated for checking :max-execute-time. + ; This means that if all executing queue jobs are stuck and the thread pool is unavailable + ; as such, at least an error will be logged about this. The log entry will + ; contain the stacktrace of the stuck threads. + :pool-size 4 + + ; How often should the system be polled for failed queue jobs + :system-error-poll-delay (Duration/ofMinutes 1) + + ; How often should the system invoke + :system-error-callback-backoff (Duration/ofHours 1)} + + u/duration->nanos)) + + +(defn init! [{:keys [conn] :as cfg}] + (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil"))) + (locking threadpool + @(d/transact conn i/schema) + (let [new-cfg (swap! *config* + (fn [old-conf] + (-> (merge-with (fn [a b] (or b a)) + {:running-queues (atom #{}) + :start-execute-time (atom {})} + default-opts + old-conf + cfg) + (assoc :system-error (atom {})) + u/duration->nanos)))] + new-cfg))) + + +(defn add-consumer! + ([queue-id f] + (add-consumer! queue-id f {})) + ([queue-id f opts] + (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f})))))) + + +(defn put [id payload] + (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*] + (when (and *test-mode* bootstrap-poller!) + (bootstrap-poller! conn)) + (i/put cfg id payload))) + + +(defn- do-start! [] + (let [{:keys [poll-delay pool-size system-error-poll-delay]} @*config*] + (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size))) + (let [pool @threadpool + queue-listener-ready (promise)] + (reset! *running?* true) + (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/NANOSECONDS) + (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/NANOSECONDS) + (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*))) + (.execute pool (fn [] (slow-executor/show-slow-threads *running?* *config*))) + @queue-listener-ready))) + + +(defn start! [] + (locking threadpool + (cond (true? *test-mode*) + (log/info "test mode enabled, doing nothing for start!") + + (true? @*running?*) + nil + + (false? @*running?*) + (do-start!)))) + + +(defn stop! [] + (locking threadpool + (cond (true? *test-mode*) + (log/info "test mode enabled, doing nothing for stop!") + + (false? @*running?*) + nil + + (true? @*running?*) + (do + (reset! *running?* false) + (when-let [^ExecutorService tp @threadpool] + (log/debug "shutting down old threadpool") + (.shutdown tp) + (while (not (.awaitTermination tp 1 TimeUnit/SECONDS)) + (log/debug "waiting for threadpool to stop")) + (log/debug "stopped!") + (reset! threadpool nil)))))) + + +(comment + (do + (require 'com.github.ivarref.yoltq.log-init) + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"ivarref.yoltq.report-queue"} :info] + [#{"ivarref.yoltq.poller"} :info] + [#{"ivarref.yoltq*"} :info] + [#{"*"} :info]]) + (stop!) + (let [received (atom []) + uri (str "datomic:mem://demo")] + (d/delete-database uri) + (d/create-database uri) + (let [ok-items (atom []) + conn (d/connect uri) + n 100] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 1) + :poll-delay (Duration/ofSeconds 1)}) + (add-consumer! :q (fn [payload] + (when (> (Math/random) 0.5) + (throw (ex-info "oops" {}))) + (if (= n (count (swap! received conj (:work payload)))) + (log/info "... and we are done!") + (log/info "got payload" payload "total ok:" (count @received))))) + (start!) + (dotimes [x n] + @(d/transact conn [(put :q {:work x})])) + nil))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj new file mode 100644 index 0000000..77339f7 --- /dev/null +++ b/src/com/github/ivarref/yoltq/error_poller.clj @@ -0,0 +1,109 @@ +(ns com.github.ivarref.yoltq.error-poller + (:require [datomic.api :as d] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.tools.logging :as log])) + + +(defn get-state [v] + (case v + [:error :none] :recovery + [:error :some] :error + [:error :all] :error + [:recovery :none] :recovery + [:recovery :some] :recovery + [:recovery :all] :error + nil)) + + +(defn handle-error-count [{:keys [errors last-notify state] + :or {errors [] + last-notify 0 + state :recovery}} + {:keys [system-error-min-count system-error-callback-backoff] + :or {system-error-min-count 3}} + now-ns + error-count] + (let [new-errors (->> (conj errors error-count) + (take-last system-error-min-count) + (vec)) + classify (fn [coll] + (cond + (not= system-error-min-count (count coll)) + :missing + + (every? pos-int? coll) + :all + + (every? zero? coll) + :none + + :else + :some)) + old-state state] + (merge + {:errors new-errors + :last-notify last-notify} + (when-let [new-state (get-state [old-state (classify new-errors)])] + (merge + {:state new-state} + (when (and (= old-state :recovery) + (= new-state :error)) + {:run-callback :error + :last-notify now-ns}) + + (when (and (= new-state :error) + (= old-state :error) + (> now-ns + (+ last-notify system-error-callback-backoff))) + {:run-callback :error + :last-notify now-ns}) + + (when (and (= new-state :recovery) + (= old-state :error)) + {:run-callback :recovery})))))) + + +(defn do-poll-errors [{:keys [conn system-error + on-system-error + on-system-recovery] + :or {on-system-error (fn [] nil) + on-system-recovery (fn [] nil)} + :as config}] + (assert (some? conn) "expected :conn to be present") + (assert (some? system-error) "expected :system-error to be present") + (let [error-count (or (d/q '[:find (count ?e) . + :in $ ?status + :where + [?e :com.github.ivarref.yoltq/status ?status]] + (d/db conn) + u/status-error) + 0)] + (when (pos-int? error-count) + (log/debug "poll-errors found" error-count "errors in system")) + (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)] + (when run-callback + (cond (= run-callback :error) + (on-system-error) + + (= run-callback :recovery) + (on-system-recovery) + + :else + (log/error "unhandled callback-type" run-callback)) + (log/debug "run-callback is" run-callback)) + new-state))) + + +(defn poll-errors [running? config-atom] + (try + (when @running? + (do-poll-errors @config-atom)) + (catch Throwable t + (log/error t "unexpected error in poll-erros:" (ex-message t)) + nil))) + + +(comment + (do-poll-errors @com.github.ivarref.yoltq/*config*)) + diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj new file mode 100644 index 0000000..3480475 --- /dev/null +++ b/src/com/github/ivarref/yoltq/ext_sys.clj @@ -0,0 +1,26 @@ +(ns com.github.ivarref.yoltq.ext-sys + (:require [datomic.api :as d]) + (:import (java.util UUID))) + + +(def ^:dynamic *now-ns-atom* nil) +(def ^:dynamic *squuid-atom* nil) +(def ^:dynamic *random-atom* nil) + + +(defn now-ns [] + (if *now-ns-atom* + @*now-ns-atom* + (System/nanoTime))) + + +(defn squuid [] + (if *squuid-atom* + (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *squuid-atom* inc)))) + (d/squuid))) + + +(defn random-uuid [] + (if *random-atom* + (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc)))) + (UUID/randomUUID)))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj new file mode 100644 index 0000000..2acc83d --- /dev/null +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -0,0 +1,147 @@ +(ns com.github.ivarref.yoltq.impl + (:require [datomic.api :as d] + [clojure.tools.logging :as log] + [clojure.string :as str] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.ext-sys :as ext])) + + +(def schema + [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} + #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}]) + + +(defn put [config queue-name payload] + (if-let [_ (get-in config [:handlers queue-name])] + (let [id (u/squuid)] + (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) + {:com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/payload (pr-str payload) + :com.github.ivarref.yoltq/bindings (pr-str {}) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ns)}) + (do + (log/error "Did not find registered handler for queue" queue-name) + (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) + + +(defn take! [{:keys [conn cas-failures hung-log-level] + :or {hung-log-level :error}} + {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}] + (when queue-item-info + (try + (cond to-error? + (log/logp hung-log-level "queue-item" (str id) "was hung and retried too many times. Giving up!") + + was-hung? + (log/logp hung-log-level "queue-item" (str id) "was hung, retrying ...") + + :else + nil) + (let [{:keys [db-after]} @(d/transact conn tx) + {:com.github.ivarref.yoltq/keys [status] :as q-item} (u/get-queue-item db-after id)] + (log/debug "queue item" (str id) "for queue" queue-name "now has status" status) + q-item) + (catch Throwable t + (let [{:db/keys [error] :as m} (u/db-error-map t)] + (cond + (= :db.error/cas-failed error) + (do + (log/info ":db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) + (when cas-failures + (swap! cas-failures inc)) + nil) + + :else + (do + (log/error t "Unexpected failure for queue item" (str id) ":" (ex-message t)) + nil))))))) + + +(defn mark-status! [{:keys [conn]} + {:com.github.ivarref.yoltq/keys [id lock tries]} + new-status] + (try + (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status] + (if (= new-status u/status-done) + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)} + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})] + {:keys [db-after]} @(d/transact conn tx)] + (u/get-queue-item db-after id)) + (catch Throwable t + (log/error t "unexpected error in mark-status!: " (ex-message t)) + nil))) + + +(defn fmt [id queue-name new-status tries spent-ns] + (str/join " " ["queue-item" (str id) + "for queue" queue-name + "now has status" new-status + "after" tries (if (= 1 tries) + "try" + "tries") + "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) + + +(defn execute! [{:keys [handlers mark-status-fn! start-execute-time] + :or {mark-status-fn! mark-status!} + :as cfg} + {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}] + (when queue-item + (if (= :error status) + (assoc queue-item :failed? true) + (if-let [queue (get handlers queue-name)] + (let [{:keys [f allow-cas-failure?]} queue] + (log/debug "queue item" (str id) "for queue" queue-name "is now processing") + (let [{:keys [retval exception]} + (try + (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name]) + (let [v (f payload)] + {:retval v}) + (catch Throwable t + {:exception t}) + (finally + (swap! start-execute-time dissoc (Thread/currentThread)))) + {:db/keys [error] :as m} (u/db-error-map exception)] + (cond + (and (some? exception) + allow-cas-failure? + (= :db.error/cas-failed error) + (or (true? allow-cas-failure?) + (allow-cas-failure? (:a m)))) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (assoc q-item :retval retval :success? true :allow-cas-failure? true))) + + (some? exception) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-error)] + (let [{:com.github.ivarref.yoltq/keys [init-time error-time tries]} q-item + level (if (>= tries 3) :error :warn)] + (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) + (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) + (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) + (assoc q-item :exception exception))) + + :else + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (assoc q-item :retval retval :success? true)))))) + (do + (log/error "no handler for queue" queue-name) + nil))))) diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj new file mode 100644 index 0000000..ad9d32a --- /dev/null +++ b/src/com/github/ivarref/yoltq/poller.clj @@ -0,0 +1,51 @@ +(ns com.github.ivarref.yoltq.poller + (:require [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i] + [clojure.tools.logging :as log])) + + +(defn poll-once! [cfg q status] + (case status + :init (some->> (u/get-init cfg q) (i/take! cfg) (i/execute! cfg)) + :error (some->> (u/get-error cfg q) (i/take! cfg) (i/execute! cfg)) + :hung (some->> (u/get-hung cfg q) (i/take! cfg) (i/execute! cfg)))) + + +(defn poll-queue! [running? + {:keys [running-queues] :as cfg} + [queue-name status :as q]] + (try + (let [[old _] (swap-vals! running-queues conj q)] + (if-not (contains? old q) + (try + (log/debug "polling queue" queue-name "for status" status) + (let [start-time (u/now-ns) + last-res (loop [prev-res nil] + (when @running? + (let [res (poll-once! cfg queue-name status)] + (if (and res (:success? res)) + (recur res) + prev-res))))] + (let [spent-ns (- (u/now-ns) start-time)] + (log/trace "done polling queue" q "in" + (format "%.1f" (double (/ spent-ns 1e6))) + "ms")) + last-res) + (finally + (swap! running-queues disj q))) + (log/debug "queue" q "is already being polled, doing nothing..."))) + (catch Throwable t + (log/error t "poll-queue! crashed:" (ex-message t))) + (finally))) + + +(defn poll-all-queues! [running? config-atom pool] + (try + (when @running? + (let [{:keys [handlers]} @config-atom] + (doseq [q (shuffle (vec (for [q-name (keys handlers) + status [:init :error :hung]] + [q-name status])))] + (.execute pool (fn [] (poll-queue! running? @config-atom q)))))) + (catch Throwable t + (log/error t "poll-all-queues! crashed:" (ex-message t)))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj new file mode 100644 index 0000000..a40d29a --- /dev/null +++ b/src/com/github/ivarref/yoltq/report_queue.clj @@ -0,0 +1,54 @@ +(ns com.github.ivarref.yoltq.report-queue + (:require [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq.impl :as i] + [datomic.api :as d] + [clojure.tools.logging :as log]) + (:import (datomic Datom) + (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit))) + + +(defn process-poll-result! [cfg id-ident poll-result consumer] + (let [{:keys [tx-data db-after]} poll-result] + (when-let [new-ids (->> tx-data + (filter (fn [^Datom datom] (and + (= (.a datom) id-ident) + (.added datom)))) + (mapv (fn [^Datom datom] (.v datom))) + (into []) + (not-empty))] + (doseq [id new-ids] + (consumer (fn [] + (try + (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name]} (u/get-queue-item db-after id)] + (some->> + (u/prepare-processing id queue-name lock status) + (i/take! cfg) + (i/execute! cfg))) + (catch Throwable t + (log/error t "unexpected error in process-poll-result!"))))))))) + + +(defn report-queue-listener [running? + ready? + ^ScheduledExecutorService pool + config-atom] + (let [conn (:conn @config-atom) + ^BlockingQueue q (d/tx-report-queue conn) + id-ident (d/q '[:find ?e . + :where [?e :db/ident :com.github.ivarref.yoltq/id]] + (d/db conn))] + (try + (while @running? + (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)] + (process-poll-result! @config-atom + id-ident + poll-result + (fn [f] + (when @running? + (.execute ^ScheduledExecutorService pool f))))) + (deliver ready? true)) + (catch Throwable t + (log/error t "unexpected error in report-queue-listener")) + (finally + (log/debug "remove tx-report-queue") + (d/remove-tx-report-queue conn)))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj new file mode 100644 index 0000000..f15ef7d --- /dev/null +++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj @@ -0,0 +1,28 @@ +(ns com.github.ivarref.yoltq.slow-executor-detector + (:require [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.tools.logging :as log] + [clojure.string :as str])) + + +(defn- do-show-slow-threads [{:keys [start-execute-time + max-execute-time]}] + (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time] + (when (> (ext/now-ns) (+ start-time max-execute-time)) + (log/error "thread" (.getName thread) "spent too much time on" + "queue item" (str queue-id) + "for queue" queue-name + "stacktrace: \n" + (str/join "\n" (mapv str (seq (.getStackTrace thread)))))))) + + +(defn show-slow-threads [running? config-atom] + (try + (while @running? + (try + (do-show-slow-threads @config-atom) + (catch Throwable t + (log/error t "do-show-slow-threads crashed:" (ex-message t)))) + (dotimes [_ 3] + (when @running? (Thread/sleep 1000)))) + (catch Throwable t + (log/error t "reap! crashed:" (ex-message t)))))
\ No newline at end of file diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj new file mode 100644 index 0000000..c96d1dc --- /dev/null +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -0,0 +1,154 @@ +(ns com.github.ivarref.yoltq.utils + (:require [datomic.api :as d] + [clojure.edn :as edn] + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.tools.logging :as log]) + (:import (datomic Connection) + (java.time Duration))) + + +(def status-init :init) +(def status-processing :processing) +(def status-done :done) +(def status-error :error) + + +(defn duration->nanos [m] + (reduce-kv (fn [o k v] + (if (instance? Duration v) + (assoc o k (.toNanos v)) + (assoc o k v))) + {} + m)) + + +(defn squuid [] + (ext/squuid)) + + +(defn random-uuid [] + (ext/random-uuid)) + + +(defn now-ns [] + (ext/now-ns)) + + +(defn root-cause [e] + (if-let [root (ex-cause e)] + (root-cause root) + e)) + + +(defn db-error-map [^Throwable t] + (loop [e t] + (cond (nil? e) nil + + (and (map? (ex-data e)) + (contains? (ex-data e) :db/error)) + (ex-data e) + + :else + (recur (ex-cause e))))) + + +(defn get-queue-item [db id] + (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) + (dissoc :db/id) + (update :com.github.ivarref.yoltq/payload edn/read-string) + (update :com.github.ivarref.yoltq/bindings edn/read-string))) + + +(defn prepare-processing [id queue-name old-lock old-status] + (let [new-lock (random-uuid)] + {:id id + :lock new-lock + :queue-name queue-name + :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]})) + + +(defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (if-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff + :where + [?e :com.github.ivarref.yoltq/status :init] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(>= ?backoff ?init-time)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock]] + (or db (d/db conn)) + queue-name + (- (now-ns) init-backoff-time)) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing id queue-name old-lock :init)) + (log/trace "no new-items in :init status for queue" queue-name))) + + +(defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] + (when-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff ?max-tries + :where + [?e :com.github.ivarref.yoltq/status :error] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/error-time ?time] + [(>= ?backoff ?time)] + [?e :com.github.ivarref.yoltq/tries ?tries] + [(> ?max-tries ?tries)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock]] + (or db (d/db conn)) + queue-name + (- (now-ns) error-backoff-time) + (inc max-retries)) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing id queue-name old-lock :error))))) + + +(defn get-hung [{:keys [conn db now hung-backoff-time max-retries] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [now (or now (now-ns)) + max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)] + (when-let [ids (->> (d/q '[:find ?id ?lock ?tries + :in $ ?qname ?backoff + :where + [?e :com.github.ivarref.yoltq/status :processing] + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/processing-time ?time] + [(>= ?backoff ?time)] + [?e :com.github.ivarref.yoltq/tries ?tries] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock]] + (or db (d/db conn)) + queue-name + (- now hung-backoff-time)) + (not-empty))] + (let [new-lock (random-uuid) + [id old-lock tries _t] (rand-nth (into [] ids)) + to-error? (>= tries max-retries)] + {:id id + :lock new-lock + :queue-name queue-name + :was-hung? true + :to-error? to-error? + :tx (if (not to-error?) + [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}] + [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status status-processing status-error] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}])})))) diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj new file mode 100644 index 0000000..e49aca3 --- /dev/null +++ b/src/com/github/ivarref/yoltq/virtual_queue.clj @@ -0,0 +1,94 @@ +(ns com.github.ivarref.yoltq.virtual-queue + (:require [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.report-queue :as rq] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq :as dq] + [datomic.api :as d] + [com.github.ivarref.yoltq.poller :as poller]) + (:import (java.util.concurrent BlockingQueue TimeUnit))) + + +(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn] + (let [ready? (promise)] + (future + (let [q (d/tx-report-queue conn)] + (try + (while @running? + (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)] + (swap! txq conj poll-result)) + (deliver ready? true) + (reset! bootstrapped? true)) + (catch Throwable t + (log/error t "test-poller crashed: " (ex-message t))) + (finally + (try + (d/remove-tx-report-queue conn) + (catch Throwable t + (log/warn t "remove-tx-report-queue failed:" (ex-message t)))) + (deliver poller-exited? true))))) + @ready?)) + + +(defmacro with-virtual-queue! + [& body] + `(let [txq# (atom []) + poller-exited?# (promise) + bootstrapped?# (atom false) + running?# (atom true) + config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#) + :init-backoff-time 0 + :hung-log-level :warn + :tx-queue txq#})] + (with-bindings {#'dq/*config* config# + #'dq/*running?* (atom false) + #'dq/*test-mode* true + #'ext/*now-ns-atom* (atom 0) + #'ext/*random-atom* (atom 0) + #'ext/*squuid-atom* (atom 0)} + (try + ~@body + (finally + (reset! running?# false) + (when @bootstrapped?# + @poller-exited?#)))))) + + +(defn call-with-virtual-queue! + [f] + (with-virtual-queue! + (f))) + + +(defn run-report-queue! [min-items] + (let [{:keys [tx-queue conn]} @dq/*config* + id-ident (d/q '[:find ?e . + :where [?e :db/ident :com.github.ivarref.yoltq/id]] + (d/db conn))] + (let [timeout (+ 3000 (System/currentTimeMillis))] + (while (and (< (System/currentTimeMillis) timeout) + (< (count @tx-queue) min-items)) + (Thread/sleep 10))) + (when (< (count @tx-queue) min-items) + (let [msg (str "run-report-queue: timeout waiting for " min-items " items")] + (log/error msg) + (throw (ex-info msg {})))) + (let [res (atom [])] + (doseq [itm (first (swap-vals! tx-queue (constantly [])))] + (rq/process-poll-result! + @dq/*config* + id-ident + itm + (fn [f] (swap! res conj (f))))) + @res))) + + +(defn run-one-report-queue! [] + (first (run-report-queue! 1))) + + +(defn run-queue-once! [q status] + (poller/poll-once! @dq/*config* q status)) + + +(defn put! [q payload] + @(d/transact (:conn @dq/*config*) [(dq/put q payload)]))
\ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj new file mode 100644 index 0000000..2e0873e --- /dev/null +++ b/test/com/github/ivarref/yoltq/error_poller_test.clj @@ -0,0 +1,35 @@ +(ns com.github.ivarref.yoltq.error-poller-test + (:require [clojure.test :refer :all] + [com.github.ivarref.yoltq.error-poller :as ep] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.log-init :as logconfig] + [clojure.edn :as edn])) + + +(deftest error-poller + (logconfig/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"*"} (edn/read-string + (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]]) + (let [cfg {:system-error-callback-backoff 100} + time (atom 0) + tick! (fn [& [amount]] + (swap! time + (or amount 1))) + verify (fn [state now-ns error-count expected-callback] + (let [{:keys [errors state run-callback] :as res} (ep/handle-error-count state cfg now-ns error-count)] + (log/info errors "=>" state "::" run-callback) + (is (= expected-callback run-callback)) + res))] + (-> {} + (verify (tick!) 0 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 :error) + (verify (tick! 100) 0 nil) + (verify (tick!) 0 :error) + (verify (tick!) 0 :recovery) + (verify (tick!) 1 nil) + (verify (tick!) 1 nil) + (verify (tick!) 1 :error) + (verify (tick! 100) 1 nil) + (verify (tick!) 1 :error)))) diff --git a/test/com/github/ivarref/yoltq/http_hang_demo.clj b/test/com/github/ivarref/yoltq/http_hang_demo.clj new file mode 100644 index 0000000..06d877b --- /dev/null +++ b/test/com/github/ivarref/yoltq/http_hang_demo.clj @@ -0,0 +1,45 @@ +(ns com.github.ivarref.yoltq.http-hang-demo + (:require [datomic.api :as d] + [com.github.ivarref.yoltq :as yq] + [com.github.ivarref.yoltq.log-init]) + (:import (java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers))) + +(comment + (do + (com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq*"} :debug] + [#{"*"} :info]]) + (yq/stop!) + (let [received (atom []) + uri (str "datomic:mem://hello-world-" (java.util.UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (let [conn (d/connect uri)] + (init! {:conn conn + :error-backoff-time (Duration/ofSeconds 5) + :poll-delay 5 + :system-error-poll-interval [1 TimeUnit/MINUTES] + :system-error-callback-backoff (Duration/ofHours 1) + :max-execute-time (Duration/ofSeconds 3) + :on-system-error (fn [] (log/error "system in error state")) + :on-system-recovery (fn [] (log/info "system recovered"))}) + (yq/add-consumer! :slow (fn [_payload] + (log/info "start slow handler...") + ; sudo tc qdisc del dev wlp3s0 root netem + ; sudo tc qdisc add dev wlp3s0 root netem delay 10000ms + ; https://jvns.ca/blog/2017/04/01/slow-down-your-internet-with-tc/ + (let [request (-> (HttpRequest/newBuilder) + (.uri (java.net.URI. "https://postman-echo.com/get")) + (.timeout (Duration/ofSeconds 10)) + (.GET) + (.build))] + (log/info "body is:" (-> (.send (HttpClient/newHttpClient) request (HttpResponse$BodyHandlers/ofString)) + (.body)))) + (log/info "slow handler is done!"))) + (yq/start!) + @(d/transact conn [(put :slow {:work 123})]) + #_(dotimes [x 1] @(d/transact conn [(put :q {:work x})])) + nil))))
\ No newline at end of file diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj new file mode 100644 index 0000000..cf69e55 --- /dev/null +++ b/test/com/github/ivarref/yoltq/log_init.clj @@ -0,0 +1,61 @@ +(ns com.github.ivarref.yoltq.log-init + (:require [clojure.term.colors :as colors] + [taoensso.timbre :as timbre] + [clojure.string :as str])) + +(def level-colors + {;:warn colors/red + :error colors/red}) + +(def ^:dynamic *override-color* nil) + +(defn min-length [n s] + (loop [s s] + (if (>= (count s) n) + s + (recur (str s " "))))) + +(defn local-console-format-fn + "A simpler log format, suitable for readable logs during development. colorized stacktraces" + [data] + (try + (let [{:keys [level ?err msg_ ?ns-str ?file hostname_ + timestamp_ ?line context]} data + ts (force timestamp_)] + (let [color-f (if (nil? *override-color*) + (get level-colors level str) + colors/green) + maybe-stacktrace (when ?err + (str "\n" (timbre/stacktrace ?err)))] + (cond-> (str #_(str ?ns-str ":" ?line) + #_(min-length (count "[yoltq:326] ") + (str + "[" + (some-> ?ns-str + (str/split #"\.") + (last)) + ":" ?line)) + ts + " " + (color-f (min-length 5 (str/upper-case (name level)))) + " " + #_(.getName ^Thread (Thread/currentThread)) + + (color-f (force msg_)) + + #_maybe-stacktrace)))) + + + (catch Throwable t + (println "error in local-console-format-fn:" (ex-message t)) + nil))) + + +(defn init-logging! [min-levels] + (timbre/merge-config! + {:min-level min-levels + :timestamp-opts {:pattern "HH:mm:ss.SSS" + :timezone :jvm-default} + :output-fn (fn [data] (local-console-format-fn data)) + :appenders {:println (timbre/println-appender {:stream :std-out})}})) + diff --git a/test/com/github/ivarref/yoltq/readme_demo.clj b/test/com/github/ivarref/yoltq/readme_demo.clj new file mode 100644 index 0000000..eae0a3e --- /dev/null +++ b/test/com/github/ivarref/yoltq/readme_demo.clj @@ -0,0 +1,48 @@ +(ns com.github.ivarref.yoltq.readme-demo + (:require [clojure.tools.logging :as log] + [datomic.api :as d] + [com.github.ivarref.yoltq :as yq]) + (:import (java.util UUID))) + + +(defonce conn + (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (d/connect uri))) + + +(com.github.ivarref.yoltq.log-init/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"com.github.ivarref.yoltq.report-queue"} :debug] + [#{"com.github.ivarref.yoltq.poller"} :info] + [#{"com.github.ivarref.yoltq*"} :debug] + [#{"*"} :info]]) + + +(yq/stop!) + +(yq/init! {:conn conn}) + + +(yq/add-consumer! :q + (let [cnt (atom 0)] + (fn [payload] + (when (= 1 (swap! cnt inc)) + (throw (ex-info "failed" {}))) + (log/info "got payload" payload)))) + +(yq/start!) + +@(d/transact conn [(yq/put :q {:work 123})]) + +(comment + (yq/add-consumer! :q (fn [_] (throw (ex-info "always fail" {}))))) + +(comment + @(d/transact conn [(yq/put :q {:work 123})])) + +(comment + (do + (yq/add-consumer! :q (fn [_] :ok)) + nil)) diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj new file mode 100644 index 0000000..dacba68 --- /dev/null +++ b/test/com/github/ivarref/yoltq/test_utils.clj @@ -0,0 +1,74 @@ +(ns com.github.ivarref.yoltq.test-utils + (:require [com.github.ivarref.yoltq.log-init :as logconfig] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.utils :as u] + [com.github.ivarref.yoltq :as dq] + [datomic.api :as d] + [clojure.string :as str] + [com.github.ivarref.yoltq.impl :as i] + [clojure.edn :as edn] + [com.github.ivarref.yoltq.ext-sys :as ext]) + (:import (java.util UUID))) + + +(logconfig/init-logging! + [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn] + [#{"*"} (edn/read-string + (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]]) + + +(defn empty-conn [] + (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))] + (d/delete-database uri) + (d/create-database uri) + (d/connect uri))) + + +(defn break [] + (log/info (str/join "*" (repeat 60 "")))) + + +(defn clear [] + (.print System/out "\033[H\033[2J") + (.flush System/out) + (break)) + + +(defn put-transact! [id payload] + @(d/transact (:conn @dq/*config*) [(i/put @dq/*config* id payload)])) + + +(defn advance! [tp] + (assert (some? ext/*now-ns-atom*) "Expected to be running in test-mode!") + (swap! ext/*now-ns-atom* + (if (number? tp) + tp + (.toNanos tp)))) + + +(defn done-count [] + (d/q '[:find (count ?e) . + :where + [?e :com.github.ivarref.yoltq/id _] + [?e :com.github.ivarref.yoltq/status :done]] + (d/db (:conn @dq/*config*)))) + + +(defn get-init [& args] + (apply u/get-init @dq/*config* args)) + + +(defn get-error [& args] + (apply u/get-error @dq/*config* args)) + + +(defn get-hung [& args] + (apply u/get-hung @dq/*config* args)) + + +(defn take! [& args] + (apply i/take! @dq/*config* args)) + + +(defn execute! [& args] + (apply i/execute! @dq/*config* args)) + diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj new file mode 100644 index 0000000..41d2461 --- /dev/null +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -0,0 +1,232 @@ +(ns com.github.ivarref.yoltq.virtual-test + (:require [datomic-schema.core] + [clojure.test :refer :all] + [com.github.ivarref.yoltq.virtual-queue :as vq] + [com.github.ivarref.yoltq :as dq] + [com.github.ivarref.yoltq.test-utils :as u] + [datomic.api :as d] + [com.github.ivarref.yoltq.utils :as uu] + [clojure.tools.logging :as log] + [com.github.ivarref.yoltq.impl :as i] + [com.github.ivarref.yoltq :as yq])) + + +(use-fixtures :each vq/call-with-virtual-queue!) + + +(deftest happy-case-1 + (let [conn (u/empty-conn)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q identity) + @(d/transact conn [(dq/put :q {:work 123})]) + (is (= {:work 123} (:retval (vq/run-queue-once! :q :init)))))) + + +(deftest happy-case-tx-report-q + (let [conn (u/empty-conn)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q identity) + @(d/transact conn [(dq/put :q {:work 123})]) + (is (= {:work 123} (:retval (vq/run-one-report-queue!)))) + (is (= 1 (u/done-count))))) + + +(deftest happy-case-poller + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :handlers {:q {:f (fn [payload] payload)}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (= {:work 123} (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :retval))))) + + +(deftest happy-case-queue-fn-allow-cas-failure + (let [conn (u/empty-conn) + invoke-count (atom 0)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q + (fn [{:keys [id]}] + (swap! invoke-count inc) + @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]])) + {:allow-cas-failure? #{:e/val}}) + @(d/transact conn #d/schema [[:e/id :one :string :id] + [:e/val :one :string]]) + @(d/transact conn [{:e/id "demo"} + (dq/put :q {:id "demo"})]) + (u/advance! (:init-backoff-time yq/default-opts)) + (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (log/info "mark-status! doing nothing for new status" new-status))) + (is (nil? (some->> (u/get-init :q) + (u/take!) + (u/execute!)))) + (swap! dq/*config* dissoc :mark-status-fn!) + + ; (mark-status! :done) failed but f succeeded. + (is (nil? (u/get-hung :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + (is (some? (u/get-hung :q))) + (is (true? (some->> (u/get-hung :q) + (u/take!) + (u/execute!) + :allow-cas-failure?))) + (is (= 2 @invoke-count)))) + + +(deftest backoff-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (is (nil? (u/get-init :q))) + + (u/advance! (dec (:init-backoff-time yq/default-opts))) + (is (nil? (u/get-init :q))) + (u/advance! 1) + (is (some? (u/get-init :q))) + + (is (some? (some->> (u/get-init :q) + (u/take!) + (u/execute!) + :exception))) + + (u/advance! (dec (:error-backoff-time @yq/*config*))) + (is (nil? (u/get-error :q))) + (u/advance! 1) + (is (some? (u/get-error :q))))) + + +(deftest get-hung-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (is (= :processing (some->> (u/get-init :q) + (u/take!) + :com.github.ivarref.yoltq/status))) + + (is (nil? (u/get-hung :q))) + (u/advance! (dec (:hung-backoff-time yq/default-opts))) + (is (nil? (u/get-hung :q))) + (u/advance! 1) + (is (some? (u/get-hung :q))))) + + +(deftest basic-locking + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :cas-failures (atom 0) + :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}}) + (u/put-transact! :q {:work 123}) + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (u/get-init :q))) + + (let [job (u/get-init :q)] + (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status))) + (u/take! job) + (is (= 1 @(:cas-failures @dq/*config*)))))) + + +(deftest retry-test + (let [conn (u/empty-conn)] + (dq/init! {:conn conn + :init-backoff-time (:init-backoff-time yq/default-opts) + :handlers {:q {:f + (let [c (atom 0)] + (fn [_] + (if (<= (swap! c inc) 2) + (throw (ex-info "janei" {})) + ::ok)))}}}) + (u/put-transact! :q {:work 123}) + + (u/advance! (:init-backoff-time yq/default-opts)) + (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception))))) + + +(deftest max-retries-test + (let [conn (u/empty-conn) + call-count (atom 0)] + (dq/init! {:conn conn + :error-backoff-time 0}) + (dq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 1}) + (vq/put! :q {:work 123}) + (is (some? (:exception (vq/run-one-report-queue!)))) + + (dotimes [_ 10] + (vq/run-queue-once! :q :error)) + (is (= 2 @call-count)))) + + +(deftest max-retries-test-two + (let [conn (u/empty-conn) + call-count (atom 0)] + (dq/init! {:conn conn + :error-backoff-time 0}) + (dq/add-consumer! :q (fn [_] + (swap! call-count inc) + (throw (ex-info "janei" {}))) + {:max-retries 3}) + (vq/put! :q {:work 123}) + (is (some? (:exception (vq/run-one-report-queue!)))) + + (dotimes [_ 20] + (vq/run-queue-once! :q :error)) + (is (= 4 @call-count)))) + + +(deftest hung-to-error + (let [conn (u/empty-conn) + call-count (atom 0) + missed-mark-status (atom nil)] + (dq/init! {:conn conn}) + (dq/add-consumer! :q + (fn [_] + (if (= 1 (swap! call-count inc)) + (throw (ex-info "error" {})) + (log/info "return OK")))) + (vq/put! :q {:id "demo"}) + (vq/run-one-report-queue!) ; now in status :error + + + (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status] + (reset! missed-mark-status new-status) + (log/info "mark-status! doing nothing for new status" new-status))) + (u/advance! (:error-backoff-time @yq/*config*)) + (vq/run-queue-once! :q :error) + (swap! dq/*config* dissoc :mark-status-fn!) + (is (= :done @missed-mark-status)) + + (is (nil? (uu/get-hung @dq/*config* :q))) + (u/advance! (:hung-backoff-time @yq/*config*)) + + (is (some? (uu/get-hung @dq/*config* :q))) + + (is (= 2 @call-count)) + + (is (true? (some->> (uu/get-hung (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q) + (i/take! @dq/*config*) + (i/execute! @dq/*config*) + :failed?))) + + (u/advance! (:error-backoff-time @yq/*config*)) + (is (some? (uu/get-error @dq/*config* :q))) + (is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q))))) + + |
