aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore12
-rw-r--r--README.md105
-rw-r--r--deps.edn28
-rw-r--r--pom.xml36
-rwxr-xr-xrelease.sh20
-rw-r--r--src/com/github/ivarref/yoltq.clj175
-rw-r--r--src/com/github/ivarref/yoltq/error_poller.clj109
-rw-r--r--src/com/github/ivarref/yoltq/ext_sys.clj26
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj147
-rw-r--r--src/com/github/ivarref/yoltq/poller.clj51
-rw-r--r--src/com/github/ivarref/yoltq/report_queue.clj54
-rw-r--r--src/com/github/ivarref/yoltq/slow_executor_detector.clj28
-rw-r--r--src/com/github/ivarref/yoltq/utils.clj154
-rw-r--r--src/com/github/ivarref/yoltq/virtual_queue.clj94
-rw-r--r--test/com/github/ivarref/yoltq/error_poller_test.clj35
-rw-r--r--test/com/github/ivarref/yoltq/http_hang_demo.clj45
-rw-r--r--test/com/github/ivarref/yoltq/log_init.clj61
-rw-r--r--test/com/github/ivarref/yoltq/readme_demo.clj48
-rw-r--r--test/com/github/ivarref/yoltq/test_utils.clj74
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj232
20 files changed, 1534 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..cb9a7ca
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,12 @@
+.idea/
+*.iml
+.cpcache/
+.nrepl-port
+target/
+.connkey
+tree.txt
+.db.url
+.stage-url.txt
+*.pom.asc
+*.pom
+temp/ \ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..29c67da
--- /dev/null
+++ b/README.md
@@ -0,0 +1,105 @@
+# yoltq
+
+An opinionated Datomic queue for building (more) reliable systems.
+Implements the [transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html)
+pattern.
+Supports retries, backoff and more.
+On-prem only.
+
+## Installation
+
+...
+
+## 1-minute example
+
+```clojure
+(require '[com.github.ivarref.yoltq :as yq])
+
+(def conn (datomic.api/connect "..."))
+
+; Initialize system
+(yq/init! {:conn conn})
+
+; Add a queue consumer that will intentionally fail on the first attempt
+(yq/add-consumer! :q
+ (let [cnt (atom 0)]
+ (fn [payload]
+ (when (= 1 (swap! cnt inc))
+ ; A consumer throwing an exception is considered a queue job failure
+ (throw (ex-info "failed" {})))
+ ; Anything else than a throwing exception is considered a queue job success
+ ; This includes nil, false and everything else.
+ (log/info "got payload" payload))))
+
+; Start threadpool
+(yq/start!)
+
+; Queue a job
+@(d/transact conn [(yq/put :q {:work 123})])
+
+; On your console you will see something like this:
+; 17:29:54.598 DEBUG queue item 613... for queue :q is pending status :init
+; 17:29:54.602 DEBUG queue item 613... for queue :q now has status :processing
+; 17:29:54.603 DEBUG queue item 613... for queue :q is now processing
+; 17:29:54.605 WARN queue-item 613... for queue :q now has status :error after 1 try in 4.8 ms
+; 17:29:54.607 WARN error message was: "failed" for queue-item 613...
+; 17:29:54.615 WARN ex-data was: {} for queue-item 613...
+; The item is so far failed...
+
+; But after approximately 10 seconds have elapsed, the item will be retried:
+; 17:30:05.596 DEBUG queue item 613... for queue :q now has status :processing
+; 17:30:05.597 DEBUG queue item 613... for queue :q is now processing
+; 17:30:05.597 INFO got payload {:work 123}
+; 17:30:05.599 INFO queue-item 613... for queue :q now has status :done after 2 tries in 5999.3 ms
+; And then it has succeeded.
+```
+
+## Rationale
+
+Integrating with external systems that may be unavailable can be tricky.
+Imagine the following code:
+
+```clojure
+(defn post-handler [user-input]
+ (let [db-item (process user-input)
+ ext-ref (clj-http.client/post ext-service {...})] ; may throw exception
+ @(d/transact conn [(assoc db-item :some/ext-ref ext-ref)])))
+```
+
+What if the POST request fails? Should it be retried? For how long?
+Should it be allowed to fail? How do you then process failures later?
+
+The queue way to solve this would be:
+
+```clojure
+(defn get-ext-ref [{:keys [id]}]
+ (let [ext-ref (clj-http.client/post ext-service {...})] ; may throw exception
+ @(d/transact conn [[:db/cas [:some/id id]
+ :some/ext-ref
+ nil
+ ext-ref]])))
+
+(yq/add-consumer! :get-ext-ref get-ext-ref {:allow-cas-failure? true})
+
+(defn post-handler [user-input]
+ (let [{:some/keys [id] :as db-item} (process user-input)
+ @(d/transact conn [db-item
+ (yq/put :get-ext-ref {:id id})])))
+
+```
+
+Here `post-handler` will always succeed as long as the transaction commits.
+
+`get-ext-ref` may fail multiple times if `ext-service` is down.
+This is fine as long as it eventually succeeds.
+
+There is a special case where `get-ext-ref` succeeds, but
+saving the new queue job status to the database fails.
+Thus `get-ext-ref` and any queue consumer should tolerate to
+be executed successfully several times.
+
+For `get-ext-ref` this is solved by using
+the database function [:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas)
+to achieve a write-once behaviour.
+The yoltq system treats cas failures as job successes
+when a consumer has `:allow-cas-failure?` set to `true` in its options.
diff --git a/deps.edn b/deps.edn
new file mode 100644
index 0000000..cf8297c
--- /dev/null
+++ b/deps.edn
@@ -0,0 +1,28 @@
+{:deps {org.clojure/tools.logging {:mvn/version "1.1.0"}
+ org.clojure/clojure {:mvn/version "1.10.3"}}
+
+ :paths ["src"]
+
+ :aliases {:test {:extra-paths ["test"]
+ :extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"}
+ com.taoensso/timbre {:mvn/version "5.1.2"}
+ com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
+ clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
+ com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}
+ org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
+ io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}}
+ :jvm-opts ["-DDISABLE_SPY=true"
+ "-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"]
+ :main-opts ["--report" "stderr" "-m" "cognitect.test-runner"]}
+
+ :jar {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git"
+ :sha "0e8731e0f24db05b74769e219051b0e92b50624a"}}
+ :main-opts ["-m" "mach.pack.alpha.skinny" "--no-libs" "--project-path" "target/out.jar"]}
+
+ :release {:extra-deps {ivarref/pom-patch {:mvn/version "0.1.16"}}}
+
+ :deploy {:extra-deps {slipset/deps-deploy {:mvn/version "0.1.3"}}
+ :main-opts ["-m" "deps-deploy.deps-deploy" "deploy"
+ "target/out.jar" "true"]}}
+
+ :mvn/repos {"my.datomic.com" {:url "https://my.datomic.com/repo"}}}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..b1c2691
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <groupId>com.github.ivarref</groupId>
+ <artifactId>yoltq</artifactId>
+ <version>0.1.3</version>
+ <name>yoltq</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.clojure</groupId>
+ <artifactId>clojure</artifactId>
+ <version>1.10.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.clojure</groupId>
+ <artifactId>tools.logging</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src</sourceDirectory>
+ </build>
+ <repositories>
+ <repository>
+ <id>clojars</id>
+ <url>https://repo.clojars.org/</url>
+ </repository>
+ </repositories>
+ <scm>
+ <connection>scm:git:git://github.com/ivarref/yoltq.git</connection>
+ <developerConnection>scm:git:ssh://git@github.com/ivarref/yoltq.git</developerConnection>
+ <tag>v0.1.3</tag>
+ <url>https://github.com/ivarref/yoltq</url>
+ </scm>
+</project> \ No newline at end of file
diff --git a/release.sh b/release.sh
new file mode 100755
index 0000000..70f67b5
--- /dev/null
+++ b/release.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+set -ex
+
+clojure -Spom
+clojure -M:test
+clojure -M:jar
+clojure -X:release ivarref.pom-patch/clojars-repo-only!
+VERSION=$(clojure -X:release ivarref.pom-patch/set-patch-version! :patch :commit-count+1)
+
+git add pom.xml
+git commit -m "Release $VERSION"
+git tag -a v$VERSION -m "Release v$VERSION"
+git push --follow-tags
+
+clojure -M:deploy
+
+echo "Released $VERSION"
+
+rm *.pom.asc \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
new file mode 100644
index 0000000..565c01d
--- /dev/null
+++ b/src/com/github/ivarref/yoltq.clj
@@ -0,0 +1,175 @@
+(ns com.github.ivarref.yoltq
+ (:require [datomic-schema.core]
+ [datomic.api :as d]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq.report-queue :as rq]
+ [com.github.ivarref.yoltq.poller :as poller]
+ [com.github.ivarref.yoltq.error-poller :as errpoller]
+ [com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
+ [com.github.ivarref.yoltq.utils :as u])
+ (:import (datomic Connection)
+ (java.util.concurrent Executors TimeUnit ExecutorService)
+ (java.time Duration)))
+
+
+(defonce ^:dynamic *config* (atom nil))
+(defonce threadpool (atom nil))
+(defonce ^:dynamic *running?* (atom false))
+(defonce ^:dynamic *test-mode* false)
+
+
+(def default-opts
+ (-> {; Default number of times a queue job will be retried before giving up
+ ; Can be overridden on a per consumer basis with
+ ; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200})
+ :max-retries 100
+
+ ; Minimum amount of time to wait before a failed queue job is retried
+ :error-backoff-time (Duration/ofSeconds 5)
+
+ ; Max time a queue job can execute before an error is logged
+ :max-execute-time (Duration/ofMinutes 5)
+
+ ; Amount of time an in progress queue job can run before it is considered failed
+ ; and will be marked as such.
+ :hung-backoff-time (Duration/ofMinutes 30)
+
+ ; Most queue jobs in init state will be consumed by the tx-report-queue listener.
+ ; However in the case where a init job was added right before the application
+ ; was shut down and did not have time to be processed by the tx-report-queue listener,
+ ; it will be consumer by the init poller. This init poller backs off by
+ ; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could
+ ; otherwise occur if competing with the tx-report-queue listener.
+ :init-backoff-time (Duration/ofSeconds 60)
+
+ ; How frequent polling for init, error and hung jobs should be done.
+ :poll-delay (Duration/ofSeconds 10)
+
+ ; Specifies the number of threads available for executing queue and polling jobs.
+ ; The final thread pool will be this size + 2.
+ ;
+ ; One thread is permanently allocated for listening to the
+ ; tx-report-queue.
+ ;
+ ; Another thread is permanently allocated for checking :max-execute-time.
+ ; This means that if all executing queue jobs are stuck and the thread pool is unavailable
+ ; as such, at least an error will be logged about this. The log entry will
+ ; contain the stacktrace of the stuck threads.
+ :pool-size 4
+
+ ; How often should the system be polled for failed queue jobs
+ :system-error-poll-delay (Duration/ofMinutes 1)
+
+ ; How often should the system invoke
+ :system-error-callback-backoff (Duration/ofHours 1)}
+
+ u/duration->nanos))
+
+
+(defn init! [{:keys [conn] :as cfg}]
+ (assert (instance? Connection conn) (str "Expected :conn to be of type datomic Connection. Was: " (or (some-> conn class str) "nil")))
+ (locking threadpool
+ @(d/transact conn i/schema)
+ (let [new-cfg (swap! *config*
+ (fn [old-conf]
+ (-> (merge-with (fn [a b] (or b a))
+ {:running-queues (atom #{})
+ :start-execute-time (atom {})}
+ default-opts
+ old-conf
+ cfg)
+ (assoc :system-error (atom {}))
+ u/duration->nanos)))]
+ new-cfg)))
+
+
+(defn add-consumer!
+ ([queue-id f]
+ (add-consumer! queue-id f {}))
+ ([queue-id f opts]
+ (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f}))))))
+
+
+(defn put [id payload]
+ (let [{:keys [bootstrap-poller! conn] :as cfg} @*config*]
+ (when (and *test-mode* bootstrap-poller!)
+ (bootstrap-poller! conn))
+ (i/put cfg id payload)))
+
+
+(defn- do-start! []
+ (let [{:keys [poll-delay pool-size system-error-poll-delay]} @*config*]
+ (reset! threadpool (Executors/newScheduledThreadPool (+ 2 pool-size)))
+ (let [pool @threadpool
+ queue-listener-ready (promise)]
+ (reset! *running?* true)
+ (.scheduleAtFixedRate pool (fn [] (poller/poll-all-queues! *running?* *config* pool)) 0 poll-delay TimeUnit/NANOSECONDS)
+ (.scheduleAtFixedRate pool (fn [] (errpoller/poll-errors *running?* *config*)) 0 system-error-poll-delay TimeUnit/NANOSECONDS)
+ (.execute pool (fn [] (rq/report-queue-listener *running?* queue-listener-ready pool *config*)))
+ (.execute pool (fn [] (slow-executor/show-slow-threads *running?* *config*)))
+ @queue-listener-ready)))
+
+
+(defn start! []
+ (locking threadpool
+ (cond (true? *test-mode*)
+ (log/info "test mode enabled, doing nothing for start!")
+
+ (true? @*running?*)
+ nil
+
+ (false? @*running?*)
+ (do-start!))))
+
+
+(defn stop! []
+ (locking threadpool
+ (cond (true? *test-mode*)
+ (log/info "test mode enabled, doing nothing for stop!")
+
+ (false? @*running?*)
+ nil
+
+ (true? @*running?*)
+ (do
+ (reset! *running?* false)
+ (when-let [^ExecutorService tp @threadpool]
+ (log/debug "shutting down old threadpool")
+ (.shutdown tp)
+ (while (not (.awaitTermination tp 1 TimeUnit/SECONDS))
+ (log/debug "waiting for threadpool to stop"))
+ (log/debug "stopped!")
+ (reset! threadpool nil))))))
+
+
+(comment
+ (do
+ (require 'com.github.ivarref.yoltq.log-init)
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"ivarref.yoltq.report-queue"} :info]
+ [#{"ivarref.yoltq.poller"} :info]
+ [#{"ivarref.yoltq*"} :info]
+ [#{"*"} :info]])
+ (stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://demo")]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [ok-items (atom [])
+ conn (d/connect uri)
+ n 100]
+ (init! {:conn conn
+ :error-backoff-time (Duration/ofSeconds 1)
+ :poll-delay (Duration/ofSeconds 1)})
+ (add-consumer! :q (fn [payload]
+ (when (> (Math/random) 0.5)
+ (throw (ex-info "oops" {})))
+ (if (= n (count (swap! received conj (:work payload))))
+ (log/info "... and we are done!")
+ (log/info "got payload" payload "total ok:" (count @received)))))
+ (start!)
+ (dotimes [x n]
+ @(d/transact conn [(put :q {:work x})]))
+ nil)))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj
new file mode 100644
index 0000000..77339f7
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/error_poller.clj
@@ -0,0 +1,109 @@
+(ns com.github.ivarref.yoltq.error-poller
+ (:require [datomic.api :as d]
+ [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [clojure.tools.logging :as log]))
+
+
+(defn get-state [v]
+ (case v
+ [:error :none] :recovery
+ [:error :some] :error
+ [:error :all] :error
+ [:recovery :none] :recovery
+ [:recovery :some] :recovery
+ [:recovery :all] :error
+ nil))
+
+
+(defn handle-error-count [{:keys [errors last-notify state]
+ :or {errors []
+ last-notify 0
+ state :recovery}}
+ {:keys [system-error-min-count system-error-callback-backoff]
+ :or {system-error-min-count 3}}
+ now-ns
+ error-count]
+ (let [new-errors (->> (conj errors error-count)
+ (take-last system-error-min-count)
+ (vec))
+ classify (fn [coll]
+ (cond
+ (not= system-error-min-count (count coll))
+ :missing
+
+ (every? pos-int? coll)
+ :all
+
+ (every? zero? coll)
+ :none
+
+ :else
+ :some))
+ old-state state]
+ (merge
+ {:errors new-errors
+ :last-notify last-notify}
+ (when-let [new-state (get-state [old-state (classify new-errors)])]
+ (merge
+ {:state new-state}
+ (when (and (= old-state :recovery)
+ (= new-state :error))
+ {:run-callback :error
+ :last-notify now-ns})
+
+ (when (and (= new-state :error)
+ (= old-state :error)
+ (> now-ns
+ (+ last-notify system-error-callback-backoff)))
+ {:run-callback :error
+ :last-notify now-ns})
+
+ (when (and (= new-state :recovery)
+ (= old-state :error))
+ {:run-callback :recovery}))))))
+
+
+(defn do-poll-errors [{:keys [conn system-error
+ on-system-error
+ on-system-recovery]
+ :or {on-system-error (fn [] nil)
+ on-system-recovery (fn [] nil)}
+ :as config}]
+ (assert (some? conn) "expected :conn to be present")
+ (assert (some? system-error) "expected :system-error to be present")
+ (let [error-count (or (d/q '[:find (count ?e) .
+ :in $ ?status
+ :where
+ [?e :com.github.ivarref.yoltq/status ?status]]
+ (d/db conn)
+ u/status-error)
+ 0)]
+ (when (pos-int? error-count)
+ (log/debug "poll-errors found" error-count "errors in system"))
+ (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)]
+ (when run-callback
+ (cond (= run-callback :error)
+ (on-system-error)
+
+ (= run-callback :recovery)
+ (on-system-recovery)
+
+ :else
+ (log/error "unhandled callback-type" run-callback))
+ (log/debug "run-callback is" run-callback))
+ new-state)))
+
+
+(defn poll-errors [running? config-atom]
+ (try
+ (when @running?
+ (do-poll-errors @config-atom))
+ (catch Throwable t
+ (log/error t "unexpected error in poll-erros:" (ex-message t))
+ nil)))
+
+
+(comment
+ (do-poll-errors @com.github.ivarref.yoltq/*config*))
+
diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj
new file mode 100644
index 0000000..3480475
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/ext_sys.clj
@@ -0,0 +1,26 @@
+(ns com.github.ivarref.yoltq.ext-sys
+ (:require [datomic.api :as d])
+ (:import (java.util UUID)))
+
+
+(def ^:dynamic *now-ns-atom* nil)
+(def ^:dynamic *squuid-atom* nil)
+(def ^:dynamic *random-atom* nil)
+
+
+(defn now-ns []
+ (if *now-ns-atom*
+ @*now-ns-atom*
+ (System/nanoTime)))
+
+
+(defn squuid []
+ (if *squuid-atom*
+ (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *squuid-atom* inc))))
+ (d/squuid)))
+
+
+(defn random-uuid []
+ (if *random-atom*
+ (UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc))))
+ (UUID/randomUUID))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
new file mode 100644
index 0000000..2acc83d
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -0,0 +1,147 @@
+(ns com.github.ivarref.yoltq.impl
+ (:require [datomic.api :as d]
+ [clojure.tools.logging :as log]
+ [clojure.string :as str]
+ [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.ext-sys :as ext]))
+
+
+(def schema
+ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
+ #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true}
+ #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true}
+ #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}])
+
+
+(defn put [config queue-name payload]
+ (if-let [_ (get-in config [:handlers queue-name])]
+ (let [id (u/squuid)]
+ (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init)
+ {:com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name queue-name
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/payload (pr-str payload)
+ :com.github.ivarref.yoltq/bindings (pr-str {})
+ :com.github.ivarref.yoltq/lock (u/random-uuid)
+ :com.github.ivarref.yoltq/tries 0
+ :com.github.ivarref.yoltq/init-time (u/now-ns)})
+ (do
+ (log/error "Did not find registered handler for queue" queue-name)
+ (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
+
+
+(defn take! [{:keys [conn cas-failures hung-log-level]
+ :or {hung-log-level :error}}
+ {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}]
+ (when queue-item-info
+ (try
+ (cond to-error?
+ (log/logp hung-log-level "queue-item" (str id) "was hung and retried too many times. Giving up!")
+
+ was-hung?
+ (log/logp hung-log-level "queue-item" (str id) "was hung, retrying ...")
+
+ :else
+ nil)
+ (let [{:keys [db-after]} @(d/transact conn tx)
+ {:com.github.ivarref.yoltq/keys [status] :as q-item} (u/get-queue-item db-after id)]
+ (log/debug "queue item" (str id) "for queue" queue-name "now has status" status)
+ q-item)
+ (catch Throwable t
+ (let [{:db/keys [error] :as m} (u/db-error-map t)]
+ (cond
+ (= :db.error/cas-failed error)
+ (do
+ (log/info ":db.error/cas-failed for queue item" (str id) "and attribute" (:a m))
+ (when cas-failures
+ (swap! cas-failures inc))
+ nil)
+
+ :else
+ (do
+ (log/error t "Unexpected failure for queue item" (str id) ":" (ex-message t))
+ nil)))))))
+
+
+(defn mark-status! [{:keys [conn]}
+ {:com.github.ivarref.yoltq/keys [id lock tries]}
+ new-status]
+ (try
+ (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status]
+ (if (= new-status u/status-done)
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)}
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})]
+ {:keys [db-after]} @(d/transact conn tx)]
+ (u/get-queue-item db-after id))
+ (catch Throwable t
+ (log/error t "unexpected error in mark-status!: " (ex-message t))
+ nil)))
+
+
+(defn fmt [id queue-name new-status tries spent-ns]
+ (str/join " " ["queue-item" (str id)
+ "for queue" queue-name
+ "now has status" new-status
+ "after" tries (if (= 1 tries)
+ "try"
+ "tries")
+ "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"]))
+
+
+(defn execute! [{:keys [handlers mark-status-fn! start-execute-time]
+ :or {mark-status-fn! mark-status!}
+ :as cfg}
+ {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}]
+ (when queue-item
+ (if (= :error status)
+ (assoc queue-item :failed? true)
+ (if-let [queue (get handlers queue-name)]
+ (let [{:keys [f allow-cas-failure?]} queue]
+ (log/debug "queue item" (str id) "for queue" queue-name "is now processing")
+ (let [{:keys [retval exception]}
+ (try
+ (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name])
+ (let [v (f payload)]
+ {:retval v})
+ (catch Throwable t
+ {:exception t})
+ (finally
+ (swap! start-execute-time dissoc (Thread/currentThread))))
+ {:db/keys [error] :as m} (u/db-error-map exception)]
+ (cond
+ (and (some? exception)
+ allow-cas-failure?
+ (= :db.error/cas-failed error)
+ (or (true? allow-cas-failure?)
+ (allow-cas-failure? (:a m))))
+ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
+ (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
+ (log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
+ (assoc q-item :retval retval :success? true :allow-cas-failure? true)))
+
+ (some? exception)
+ (when-let [q-item (mark-status-fn! cfg queue-item u/status-error)]
+ (let [{:com.github.ivarref.yoltq/keys [init-time error-time tries]} q-item
+ level (if (>= tries 3) :error :warn)]
+ (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time)))
+ (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id))
+ (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id))
+ (assoc q-item :exception exception)))
+
+ :else
+ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
+ (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
+ (log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
+ (assoc q-item :retval retval :success? true))))))
+ (do
+ (log/error "no handler for queue" queue-name)
+ nil)))))
diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj
new file mode 100644
index 0000000..ad9d32a
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/poller.clj
@@ -0,0 +1,51 @@
+(ns com.github.ivarref.yoltq.poller
+ (:require [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.impl :as i]
+ [clojure.tools.logging :as log]))
+
+
+(defn poll-once! [cfg q status]
+ (case status
+ :init (some->> (u/get-init cfg q) (i/take! cfg) (i/execute! cfg))
+ :error (some->> (u/get-error cfg q) (i/take! cfg) (i/execute! cfg))
+ :hung (some->> (u/get-hung cfg q) (i/take! cfg) (i/execute! cfg))))
+
+
+(defn poll-queue! [running?
+ {:keys [running-queues] :as cfg}
+ [queue-name status :as q]]
+ (try
+ (let [[old _] (swap-vals! running-queues conj q)]
+ (if-not (contains? old q)
+ (try
+ (log/debug "polling queue" queue-name "for status" status)
+ (let [start-time (u/now-ns)
+ last-res (loop [prev-res nil]
+ (when @running?
+ (let [res (poll-once! cfg queue-name status)]
+ (if (and res (:success? res))
+ (recur res)
+ prev-res))))]
+ (let [spent-ns (- (u/now-ns) start-time)]
+ (log/trace "done polling queue" q "in"
+ (format "%.1f" (double (/ spent-ns 1e6)))
+ "ms"))
+ last-res)
+ (finally
+ (swap! running-queues disj q)))
+ (log/debug "queue" q "is already being polled, doing nothing...")))
+ (catch Throwable t
+ (log/error t "poll-queue! crashed:" (ex-message t)))
+ (finally)))
+
+
+(defn poll-all-queues! [running? config-atom pool]
+ (try
+ (when @running?
+ (let [{:keys [handlers]} @config-atom]
+ (doseq [q (shuffle (vec (for [q-name (keys handlers)
+ status [:init :error :hung]]
+ [q-name status])))]
+ (.execute pool (fn [] (poll-queue! running? @config-atom q))))))
+ (catch Throwable t
+ (log/error t "poll-all-queues! crashed:" (ex-message t))))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/report_queue.clj b/src/com/github/ivarref/yoltq/report_queue.clj
new file mode 100644
index 0000000..a40d29a
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/report_queue.clj
@@ -0,0 +1,54 @@
+(ns com.github.ivarref.yoltq.report-queue
+ (:require [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq.impl :as i]
+ [datomic.api :as d]
+ [clojure.tools.logging :as log])
+ (:import (datomic Datom)
+ (java.util.concurrent ScheduledExecutorService BlockingQueue TimeUnit)))
+
+
+(defn process-poll-result! [cfg id-ident poll-result consumer]
+ (let [{:keys [tx-data db-after]} poll-result]
+ (when-let [new-ids (->> tx-data
+ (filter (fn [^Datom datom] (and
+ (= (.a datom) id-ident)
+ (.added datom))))
+ (mapv (fn [^Datom datom] (.v datom)))
+ (into [])
+ (not-empty))]
+ (doseq [id new-ids]
+ (consumer (fn []
+ (try
+ (let [{:com.github.ivarref.yoltq/keys [lock id status queue-name]} (u/get-queue-item db-after id)]
+ (some->>
+ (u/prepare-processing id queue-name lock status)
+ (i/take! cfg)
+ (i/execute! cfg)))
+ (catch Throwable t
+ (log/error t "unexpected error in process-poll-result!")))))))))
+
+
+(defn report-queue-listener [running?
+ ready?
+ ^ScheduledExecutorService pool
+ config-atom]
+ (let [conn (:conn @config-atom)
+ ^BlockingQueue q (d/tx-report-queue conn)
+ id-ident (d/q '[:find ?e .
+ :where [?e :db/ident :com.github.ivarref.yoltq/id]]
+ (d/db conn))]
+ (try
+ (while @running?
+ (when-let [poll-result (.poll ^BlockingQueue q 1 TimeUnit/SECONDS)]
+ (process-poll-result! @config-atom
+ id-ident
+ poll-result
+ (fn [f]
+ (when @running?
+ (.execute ^ScheduledExecutorService pool f)))))
+ (deliver ready? true))
+ (catch Throwable t
+ (log/error t "unexpected error in report-queue-listener"))
+ (finally
+ (log/debug "remove tx-report-queue")
+ (d/remove-tx-report-queue conn))))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
new file mode 100644
index 0000000..f15ef7d
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
@@ -0,0 +1,28 @@
+(ns com.github.ivarref.yoltq.slow-executor-detector
+ (:require [com.github.ivarref.yoltq.ext-sys :as ext]
+ [clojure.tools.logging :as log]
+ [clojure.string :as str]))
+
+
+(defn- do-show-slow-threads [{:keys [start-execute-time
+ max-execute-time]}]
+ (doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time]
+ (when (> (ext/now-ns) (+ start-time max-execute-time))
+ (log/error "thread" (.getName thread) "spent too much time on"
+ "queue item" (str queue-id)
+ "for queue" queue-name
+ "stacktrace: \n"
+ (str/join "\n" (mapv str (seq (.getStackTrace thread))))))))
+
+
+(defn show-slow-threads [running? config-atom]
+ (try
+ (while @running?
+ (try
+ (do-show-slow-threads @config-atom)
+ (catch Throwable t
+ (log/error t "do-show-slow-threads crashed:" (ex-message t))))
+ (dotimes [_ 3]
+ (when @running? (Thread/sleep 1000))))
+ (catch Throwable t
+ (log/error t "reap! crashed:" (ex-message t))))) \ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
new file mode 100644
index 0000000..c96d1dc
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -0,0 +1,154 @@
+(ns com.github.ivarref.yoltq.utils
+ (:require [datomic.api :as d]
+ [clojure.edn :as edn]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [clojure.tools.logging :as log])
+ (:import (datomic Connection)
+ (java.time Duration)))
+
+
+(def status-init :init)
+(def status-processing :processing)
+(def status-done :done)
+(def status-error :error)
+
+
+(defn duration->nanos [m]
+ (reduce-kv (fn [o k v]
+ (if (instance? Duration v)
+ (assoc o k (.toNanos v))
+ (assoc o k v)))
+ {}
+ m))
+
+
+(defn squuid []
+ (ext/squuid))
+
+
+(defn random-uuid []
+ (ext/random-uuid))
+
+
+(defn now-ns []
+ (ext/now-ns))
+
+
+(defn root-cause [e]
+ (if-let [root (ex-cause e)]
+ (root-cause root)
+ e))
+
+
+(defn db-error-map [^Throwable t]
+ (loop [e t]
+ (cond (nil? e) nil
+
+ (and (map? (ex-data e))
+ (contains? (ex-data e) :db/error))
+ (ex-data e)
+
+ :else
+ (recur (ex-cause e)))))
+
+
+(defn get-queue-item [db id]
+ (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id])
+ (dissoc :db/id)
+ (update :com.github.ivarref.yoltq/payload edn/read-string)
+ (update :com.github.ivarref.yoltq/bindings edn/read-string)))
+
+
+(defn prepare-processing [id queue-name old-lock old-status]
+ (let [new-lock (random-uuid)]
+ {:id id
+ :lock new-lock
+ :queue-name queue-name
+ :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing]
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]}))
+
+
+(defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name]
+ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
+ (str (if (nil? conn) "nil" conn))
+ "\nConfig was: " (str cfg)))
+ (if-let [ids (->> (d/q '[:find ?id ?lock
+ :in $ ?queue-name ?backoff
+ :where
+ [?e :com.github.ivarref.yoltq/status :init]
+ [?e :com.github.ivarref.yoltq/queue-name ?queue-name]
+ [?e :com.github.ivarref.yoltq/init-time ?init-time]
+ [(>= ?backoff ?init-time)]
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [?e :com.github.ivarref.yoltq/lock ?lock]]
+ (or db (d/db conn))
+ queue-name
+ (- (now-ns) init-backoff-time))
+ (not-empty))]
+ (let [[id old-lock] (rand-nth (into [] ids))]
+ (prepare-processing id queue-name old-lock :init))
+ (log/trace "no new-items in :init status for queue" queue-name)))
+
+
+(defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name]
+ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
+ (str (if (nil? conn) "nil" conn))
+ "\nConfig was: " (str cfg)))
+ (let [max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)]
+ (when-let [ids (->> (d/q '[:find ?id ?lock
+ :in $ ?queue-name ?backoff ?max-tries
+ :where
+ [?e :com.github.ivarref.yoltq/status :error]
+ [?e :com.github.ivarref.yoltq/queue-name ?queue-name]
+ [?e :com.github.ivarref.yoltq/error-time ?time]
+ [(>= ?backoff ?time)]
+ [?e :com.github.ivarref.yoltq/tries ?tries]
+ [(> ?max-tries ?tries)]
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [?e :com.github.ivarref.yoltq/lock ?lock]]
+ (or db (d/db conn))
+ queue-name
+ (- (now-ns) error-backoff-time)
+ (inc max-retries))
+ (not-empty))]
+ (let [[id old-lock] (rand-nth (into [] ids))]
+ (prepare-processing id queue-name old-lock :error)))))
+
+
+(defn get-hung [{:keys [conn db now hung-backoff-time max-retries] :as cfg} queue-name]
+ (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
+ (str (if (nil? conn) "nil" conn))
+ "\nConfig was: " (str cfg)))
+ (let [now (or now (now-ns))
+ max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)]
+ (when-let [ids (->> (d/q '[:find ?id ?lock ?tries
+ :in $ ?qname ?backoff
+ :where
+ [?e :com.github.ivarref.yoltq/status :processing]
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/processing-time ?time]
+ [(>= ?backoff ?time)]
+ [?e :com.github.ivarref.yoltq/tries ?tries]
+ [?e :com.github.ivarref.yoltq/id ?id]
+ [?e :com.github.ivarref.yoltq/lock ?lock]]
+ (or db (d/db conn))
+ queue-name
+ (- now hung-backoff-time))
+ (not-empty))]
+ (let [new-lock (random-uuid)
+ [id old-lock tries _t] (rand-nth (into [] ids))
+ to-error? (>= tries max-retries)]
+ {:id id
+ :lock new-lock
+ :queue-name queue-name
+ :was-hung? true
+ :to-error? to-error?
+ :tx (if (not to-error?)
+ [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}]
+ [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status status-processing status-error]
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}])}))))
diff --git a/src/com/github/ivarref/yoltq/virtual_queue.clj b/src/com/github/ivarref/yoltq/virtual_queue.clj
new file mode 100644
index 0000000..e49aca3
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/virtual_queue.clj
@@ -0,0 +1,94 @@
+(ns com.github.ivarref.yoltq.virtual-queue
+ (:require [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.report-queue :as rq]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq :as dq]
+ [datomic.api :as d]
+ [com.github.ivarref.yoltq.poller :as poller])
+ (:import (java.util.concurrent BlockingQueue TimeUnit)))
+
+
+(defn bootstrap-poller! [txq running? poller-exited? bootstrapped? conn]
+ (let [ready? (promise)]
+ (future
+ (let [q (d/tx-report-queue conn)]
+ (try
+ (while @running?
+ (when-let [poll-result (.poll ^BlockingQueue q 10 TimeUnit/MILLISECONDS)]
+ (swap! txq conj poll-result))
+ (deliver ready? true)
+ (reset! bootstrapped? true))
+ (catch Throwable t
+ (log/error t "test-poller crashed: " (ex-message t)))
+ (finally
+ (try
+ (d/remove-tx-report-queue conn)
+ (catch Throwable t
+ (log/warn t "remove-tx-report-queue failed:" (ex-message t))))
+ (deliver poller-exited? true)))))
+ @ready?))
+
+
+(defmacro with-virtual-queue!
+ [& body]
+ `(let [txq# (atom [])
+ poller-exited?# (promise)
+ bootstrapped?# (atom false)
+ running?# (atom true)
+ config# (atom {:bootstrap-poller! (partial bootstrap-poller! txq# running?# poller-exited?# bootstrapped?#)
+ :init-backoff-time 0
+ :hung-log-level :warn
+ :tx-queue txq#})]
+ (with-bindings {#'dq/*config* config#
+ #'dq/*running?* (atom false)
+ #'dq/*test-mode* true
+ #'ext/*now-ns-atom* (atom 0)
+ #'ext/*random-atom* (atom 0)
+ #'ext/*squuid-atom* (atom 0)}
+ (try
+ ~@body
+ (finally
+ (reset! running?# false)
+ (when @bootstrapped?#
+ @poller-exited?#))))))
+
+
+(defn call-with-virtual-queue!
+ [f]
+ (with-virtual-queue!
+ (f)))
+
+
+(defn run-report-queue! [min-items]
+ (let [{:keys [tx-queue conn]} @dq/*config*
+ id-ident (d/q '[:find ?e .
+ :where [?e :db/ident :com.github.ivarref.yoltq/id]]
+ (d/db conn))]
+ (let [timeout (+ 3000 (System/currentTimeMillis))]
+ (while (and (< (System/currentTimeMillis) timeout)
+ (< (count @tx-queue) min-items))
+ (Thread/sleep 10)))
+ (when (< (count @tx-queue) min-items)
+ (let [msg (str "run-report-queue: timeout waiting for " min-items " items")]
+ (log/error msg)
+ (throw (ex-info msg {}))))
+ (let [res (atom [])]
+ (doseq [itm (first (swap-vals! tx-queue (constantly [])))]
+ (rq/process-poll-result!
+ @dq/*config*
+ id-ident
+ itm
+ (fn [f] (swap! res conj (f)))))
+ @res)))
+
+
+(defn run-one-report-queue! []
+ (first (run-report-queue! 1)))
+
+
+(defn run-queue-once! [q status]
+ (poller/poll-once! @dq/*config* q status))
+
+
+(defn put! [q payload]
+ @(d/transact (:conn @dq/*config*) [(dq/put q payload)])) \ No newline at end of file
diff --git a/test/com/github/ivarref/yoltq/error_poller_test.clj b/test/com/github/ivarref/yoltq/error_poller_test.clj
new file mode 100644
index 0000000..2e0873e
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/error_poller_test.clj
@@ -0,0 +1,35 @@
+(ns com.github.ivarref.yoltq.error-poller-test
+ (:require [clojure.test :refer :all]
+ [com.github.ivarref.yoltq.error-poller :as ep]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.log-init :as logconfig]
+ [clojure.edn :as edn]))
+
+
+(deftest error-poller
+ (logconfig/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"*"} (edn/read-string
+ (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]])
+ (let [cfg {:system-error-callback-backoff 100}
+ time (atom 0)
+ tick! (fn [& [amount]]
+ (swap! time + (or amount 1)))
+ verify (fn [state now-ns error-count expected-callback]
+ (let [{:keys [errors state run-callback] :as res} (ep/handle-error-count state cfg now-ns error-count)]
+ (log/info errors "=>" state "::" run-callback)
+ (is (= expected-callback run-callback))
+ res))]
+ (-> {}
+ (verify (tick!) 0 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 :error)
+ (verify (tick! 100) 0 nil)
+ (verify (tick!) 0 :error)
+ (verify (tick!) 0 :recovery)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 nil)
+ (verify (tick!) 1 :error)
+ (verify (tick! 100) 1 nil)
+ (verify (tick!) 1 :error))))
diff --git a/test/com/github/ivarref/yoltq/http_hang_demo.clj b/test/com/github/ivarref/yoltq/http_hang_demo.clj
new file mode 100644
index 0000000..06d877b
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/http_hang_demo.clj
@@ -0,0 +1,45 @@
+(ns com.github.ivarref.yoltq.http-hang-demo
+ (:require [datomic.api :as d]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.log-init])
+ (:import (java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers)))
+
+(comment
+ (do
+ (com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq*"} :debug]
+ [#{"*"} :info]])
+ (yq/stop!)
+ (let [received (atom [])
+ uri (str "datomic:mem://hello-world-" (java.util.UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (let [conn (d/connect uri)]
+ (init! {:conn conn
+ :error-backoff-time (Duration/ofSeconds 5)
+ :poll-delay 5
+ :system-error-poll-interval [1 TimeUnit/MINUTES]
+ :system-error-callback-backoff (Duration/ofHours 1)
+ :max-execute-time (Duration/ofSeconds 3)
+ :on-system-error (fn [] (log/error "system in error state"))
+ :on-system-recovery (fn [] (log/info "system recovered"))})
+ (yq/add-consumer! :slow (fn [_payload]
+ (log/info "start slow handler...")
+ ; sudo tc qdisc del dev wlp3s0 root netem
+ ; sudo tc qdisc add dev wlp3s0 root netem delay 10000ms
+ ; https://jvns.ca/blog/2017/04/01/slow-down-your-internet-with-tc/
+ (let [request (-> (HttpRequest/newBuilder)
+ (.uri (java.net.URI. "https://postman-echo.com/get"))
+ (.timeout (Duration/ofSeconds 10))
+ (.GET)
+ (.build))]
+ (log/info "body is:" (-> (.send (HttpClient/newHttpClient) request (HttpResponse$BodyHandlers/ofString))
+ (.body))))
+ (log/info "slow handler is done!")))
+ (yq/start!)
+ @(d/transact conn [(put :slow {:work 123})])
+ #_(dotimes [x 1] @(d/transact conn [(put :q {:work x})]))
+ nil)))) \ No newline at end of file
diff --git a/test/com/github/ivarref/yoltq/log_init.clj b/test/com/github/ivarref/yoltq/log_init.clj
new file mode 100644
index 0000000..cf69e55
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/log_init.clj
@@ -0,0 +1,61 @@
+(ns com.github.ivarref.yoltq.log-init
+ (:require [clojure.term.colors :as colors]
+ [taoensso.timbre :as timbre]
+ [clojure.string :as str]))
+
+(def level-colors
+ {;:warn colors/red
+ :error colors/red})
+
+(def ^:dynamic *override-color* nil)
+
+(defn min-length [n s]
+ (loop [s s]
+ (if (>= (count s) n)
+ s
+ (recur (str s " ")))))
+
+(defn local-console-format-fn
+ "A simpler log format, suitable for readable logs during development. colorized stacktraces"
+ [data]
+ (try
+ (let [{:keys [level ?err msg_ ?ns-str ?file hostname_
+ timestamp_ ?line context]} data
+ ts (force timestamp_)]
+ (let [color-f (if (nil? *override-color*)
+ (get level-colors level str)
+ colors/green)
+ maybe-stacktrace (when ?err
+ (str "\n" (timbre/stacktrace ?err)))]
+ (cond-> (str #_(str ?ns-str ":" ?line)
+ #_(min-length (count "[yoltq:326] ")
+ (str
+ "["
+ (some-> ?ns-str
+ (str/split #"\.")
+ (last))
+ ":" ?line))
+ ts
+ " "
+ (color-f (min-length 5 (str/upper-case (name level))))
+ " "
+ #_(.getName ^Thread (Thread/currentThread))
+
+ (color-f (force msg_))
+
+ #_maybe-stacktrace))))
+
+
+ (catch Throwable t
+ (println "error in local-console-format-fn:" (ex-message t))
+ nil)))
+
+
+(defn init-logging! [min-levels]
+ (timbre/merge-config!
+ {:min-level min-levels
+ :timestamp-opts {:pattern "HH:mm:ss.SSS"
+ :timezone :jvm-default}
+ :output-fn (fn [data] (local-console-format-fn data))
+ :appenders {:println (timbre/println-appender {:stream :std-out})}}))
+
diff --git a/test/com/github/ivarref/yoltq/readme_demo.clj b/test/com/github/ivarref/yoltq/readme_demo.clj
new file mode 100644
index 0000000..eae0a3e
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/readme_demo.clj
@@ -0,0 +1,48 @@
+(ns com.github.ivarref.yoltq.readme-demo
+ (:require [clojure.tools.logging :as log]
+ [datomic.api :as d]
+ [com.github.ivarref.yoltq :as yq])
+ (:import (java.util UUID)))
+
+
+(defonce conn
+ (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (d/connect uri)))
+
+
+(com.github.ivarref.yoltq.log-init/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"com.github.ivarref.yoltq.report-queue"} :debug]
+ [#{"com.github.ivarref.yoltq.poller"} :info]
+ [#{"com.github.ivarref.yoltq*"} :debug]
+ [#{"*"} :info]])
+
+
+(yq/stop!)
+
+(yq/init! {:conn conn})
+
+
+(yq/add-consumer! :q
+ (let [cnt (atom 0)]
+ (fn [payload]
+ (when (= 1 (swap! cnt inc))
+ (throw (ex-info "failed" {})))
+ (log/info "got payload" payload))))
+
+(yq/start!)
+
+@(d/transact conn [(yq/put :q {:work 123})])
+
+(comment
+ (yq/add-consumer! :q (fn [_] (throw (ex-info "always fail" {})))))
+
+(comment
+ @(d/transact conn [(yq/put :q {:work 123})]))
+
+(comment
+ (do
+ (yq/add-consumer! :q (fn [_] :ok))
+ nil))
diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj
new file mode 100644
index 0000000..dacba68
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/test_utils.clj
@@ -0,0 +1,74 @@
+(ns com.github.ivarref.yoltq.test-utils
+ (:require [com.github.ivarref.yoltq.log-init :as logconfig]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.utils :as u]
+ [com.github.ivarref.yoltq :as dq]
+ [datomic.api :as d]
+ [clojure.string :as str]
+ [com.github.ivarref.yoltq.impl :as i]
+ [clojure.edn :as edn]
+ [com.github.ivarref.yoltq.ext-sys :as ext])
+ (:import (java.util UUID)))
+
+
+(logconfig/init-logging!
+ [[#{"datomic.*" "com.datomic.*" "org.apache.*"} :warn]
+ [#{"*"} (edn/read-string
+ (System/getProperty "TAOENSSO_TIMBRE_MIN_LEVEL_EDN" ":info"))]])
+
+
+(defn empty-conn []
+ (let [uri (str "datomic:mem://hello-world-" (UUID/randomUUID))]
+ (d/delete-database uri)
+ (d/create-database uri)
+ (d/connect uri)))
+
+
+(defn break []
+ (log/info (str/join "*" (repeat 60 ""))))
+
+
+(defn clear []
+ (.print System/out "\033[H\033[2J")
+ (.flush System/out)
+ (break))
+
+
+(defn put-transact! [id payload]
+ @(d/transact (:conn @dq/*config*) [(i/put @dq/*config* id payload)]))
+
+
+(defn advance! [tp]
+ (assert (some? ext/*now-ns-atom*) "Expected to be running in test-mode!")
+ (swap! ext/*now-ns-atom* + (if (number? tp)
+ tp
+ (.toNanos tp))))
+
+
+(defn done-count []
+ (d/q '[:find (count ?e) .
+ :where
+ [?e :com.github.ivarref.yoltq/id _]
+ [?e :com.github.ivarref.yoltq/status :done]]
+ (d/db (:conn @dq/*config*))))
+
+
+(defn get-init [& args]
+ (apply u/get-init @dq/*config* args))
+
+
+(defn get-error [& args]
+ (apply u/get-error @dq/*config* args))
+
+
+(defn get-hung [& args]
+ (apply u/get-hung @dq/*config* args))
+
+
+(defn take! [& args]
+ (apply i/take! @dq/*config* args))
+
+
+(defn execute! [& args]
+ (apply i/execute! @dq/*config* args))
+
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
new file mode 100644
index 0000000..41d2461
--- /dev/null
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -0,0 +1,232 @@
+(ns com.github.ivarref.yoltq.virtual-test
+ (:require [datomic-schema.core]
+ [clojure.test :refer :all]
+ [com.github.ivarref.yoltq.virtual-queue :as vq]
+ [com.github.ivarref.yoltq :as dq]
+ [com.github.ivarref.yoltq.test-utils :as u]
+ [datomic.api :as d]
+ [com.github.ivarref.yoltq.utils :as uu]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq :as yq]))
+
+
+(use-fixtures :each vq/call-with-virtual-queue!)
+
+
+(deftest happy-case-1
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q identity)
+ @(d/transact conn [(dq/put :q {:work 123})])
+ (is (= {:work 123} (:retval (vq/run-queue-once! :q :init))))))
+
+
+(deftest happy-case-tx-report-q
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q identity)
+ @(d/transact conn [(dq/put :q {:work 123})])
+ (is (= {:work 123} (:retval (vq/run-one-report-queue!))))
+ (is (= 1 (u/done-count)))))
+
+
+(deftest happy-case-poller
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :handlers {:q {:f (fn [payload] payload)}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (= {:work 123} (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :retval)))))
+
+
+(deftest happy-case-queue-fn-allow-cas-failure
+ (let [conn (u/empty-conn)
+ invoke-count (atom 0)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q
+ (fn [{:keys [id]}]
+ (swap! invoke-count inc)
+ @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]]))
+ {:allow-cas-failure? #{:e/val}})
+ @(d/transact conn #d/schema [[:e/id :one :string :id]
+ [:e/val :one :string]])
+ @(d/transact conn [{:e/id "demo"}
+ (dq/put :q {:id "demo"})])
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (is (nil? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!))))
+ (swap! dq/*config* dissoc :mark-status-fn!)
+
+ ; (mark-status! :done) failed but f succeeded.
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+ (is (some? (u/get-hung :q)))
+ (is (true? (some->> (u/get-hung :q)
+ (u/take!)
+ (u/execute!)
+ :allow-cas-failure?)))
+ (is (= 2 @invoke-count))))
+
+
+(deftest backoff-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (is (nil? (u/get-init :q)))
+
+ (u/advance! (dec (:init-backoff-time yq/default-opts)))
+ (is (nil? (u/get-init :q)))
+ (u/advance! 1)
+ (is (some? (u/get-init :q)))
+
+ (is (some? (some->> (u/get-init :q)
+ (u/take!)
+ (u/execute!)
+ :exception)))
+
+ (u/advance! (dec (:error-backoff-time @yq/*config*)))
+ (is (nil? (u/get-error :q)))
+ (u/advance! 1)
+ (is (some? (u/get-error :q)))))
+
+
+(deftest get-hung-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (is (= :processing (some->> (u/get-init :q)
+ (u/take!)
+ :com.github.ivarref.yoltq/status)))
+
+ (is (nil? (u/get-hung :q)))
+ (u/advance! (dec (:hung-backoff-time yq/default-opts)))
+ (is (nil? (u/get-hung :q)))
+ (u/advance! 1)
+ (is (some? (u/get-hung :q)))))
+
+
+(deftest basic-locking
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :cas-failures (atom 0)
+ :handlers {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
+ (u/put-transact! :q {:work 123})
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (u/get-init :q)))
+
+ (let [job (u/get-init :q)]
+ (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status)))
+ (u/take! job)
+ (is (= 1 @(:cas-failures @dq/*config*))))))
+
+
+(deftest retry-test
+ (let [conn (u/empty-conn)]
+ (dq/init! {:conn conn
+ :init-backoff-time (:init-backoff-time yq/default-opts)
+ :handlers {:q {:f
+ (let [c (atom 0)]
+ (fn [_]
+ (if (<= (swap! c inc) 2)
+ (throw (ex-info "janei" {}))
+ ::ok)))}}})
+ (u/put-transact! :q {:work 123})
+
+ (u/advance! (:init-backoff-time yq/default-opts))
+ (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))))
+
+
+(deftest max-retries-test
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (dq/init! {:conn conn
+ :error-backoff-time 0})
+ (dq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 1})
+ (vq/put! :q {:work 123})
+ (is (some? (:exception (vq/run-one-report-queue!))))
+
+ (dotimes [_ 10]
+ (vq/run-queue-once! :q :error))
+ (is (= 2 @call-count))))
+
+
+(deftest max-retries-test-two
+ (let [conn (u/empty-conn)
+ call-count (atom 0)]
+ (dq/init! {:conn conn
+ :error-backoff-time 0})
+ (dq/add-consumer! :q (fn [_]
+ (swap! call-count inc)
+ (throw (ex-info "janei" {})))
+ {:max-retries 3})
+ (vq/put! :q {:work 123})
+ (is (some? (:exception (vq/run-one-report-queue!))))
+
+ (dotimes [_ 20]
+ (vq/run-queue-once! :q :error))
+ (is (= 4 @call-count))))
+
+
+(deftest hung-to-error
+ (let [conn (u/empty-conn)
+ call-count (atom 0)
+ missed-mark-status (atom nil)]
+ (dq/init! {:conn conn})
+ (dq/add-consumer! :q
+ (fn [_]
+ (if (= 1 (swap! call-count inc))
+ (throw (ex-info "error" {}))
+ (log/info "return OK"))))
+ (vq/put! :q {:id "demo"})
+ (vq/run-one-report-queue!) ; now in status :error
+
+
+ (swap! dq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
+ (reset! missed-mark-status new-status)
+ (log/info "mark-status! doing nothing for new status" new-status)))
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (vq/run-queue-once! :q :error)
+ (swap! dq/*config* dissoc :mark-status-fn!)
+ (is (= :done @missed-mark-status))
+
+ (is (nil? (uu/get-hung @dq/*config* :q)))
+ (u/advance! (:hung-backoff-time @yq/*config*))
+
+ (is (some? (uu/get-hung @dq/*config* :q)))
+
+ (is (= 2 @call-count))
+
+ (is (true? (some->> (uu/get-hung (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q)
+ (i/take! @dq/*config*)
+ (i/execute! @dq/*config*)
+ :failed?)))
+
+ (u/advance! (:error-backoff-time @yq/*config*))
+ (is (some? (uu/get-error @dq/*config* :q)))
+ (is (nil? (uu/get-error (assoc-in @dq/*config* [:handlers :q :max-retries] 1) :q)))))
+
+