aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--README.md38
-rw-r--r--deps.edn2
-rw-r--r--src/com/github/ivarref/yoltq.clj8
-rw-r--r--src/com/github/ivarref/yoltq/error_poller.clj10
-rw-r--r--src/com/github/ivarref/yoltq/ext_sys.clj13
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj14
-rw-r--r--src/com/github/ivarref/yoltq/poller.clj19
-rw-r--r--src/com/github/ivarref/yoltq/slow_executor_detector.clj4
-rw-r--r--src/com/github/ivarref/yoltq/test_queue.clj2
-rw-r--r--src/com/github/ivarref/yoltq/utils.clj19
-rw-r--r--test/com/github/ivarref/yoltq/test_utils.clj9
12 files changed, 81 insertions, 60 deletions
diff --git a/.gitignore b/.gitignore
index cb9a7ca..c82fdd7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,4 +9,5 @@ tree.txt
.stage-url.txt
*.pom.asc
*.pom
-temp/ \ No newline at end of file
+temp/
+.clj-kondo/
diff --git a/README.md b/README.md
index 45ba8c4..9c5669c 100644
--- a/README.md
+++ b/README.md
@@ -333,12 +333,21 @@ easier.
## Change log
-### 2022-03-27 [v0.2.41](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41)
+### 20..-..-.. vHEAD [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...HEAD)
+* Critical bugfix.
+```
+Started using (System/currentTimeMillis) and not (System/nanoTime)
+when storing time in the database.
+```
+
+* Bump Clojure to `1.11.0`.
+
+### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41)
* Added function `healthy?` that returns:
```
- true if no errors
- false if one or more errors
- nil if error-poller is yet to be executed.
+ true if no errors
+ false if one or more errors
+ nil if error-poller is yet to be executed.
```
* Added default functions for `:on-system-error` and `:on-system-recovery`
@@ -348,22 +357,23 @@ easier.
* Added function `queue-stats` that returns a nicely "formatted"
vector of queue stats, for example:
```
-(queue-stats)
-=>
-[{:qname :add-message-thread, :status :done, :count 10274}
- {:qname :add-message-thread, :status :init, :count 30}
- {:qname :add-message-thread, :status :processing, :count 1}
- {:qname :send-message, :status :done, :count 21106}
- {:qname :send-message, :status :init, :count 56}]
+ (queue-stats)
+ =>
+ [{:qname :add-message-thread, :status :done, :count 10274}
+ {:qname :add-message-thread, :status :init, :count 30}
+ {:qname :add-message-thread, :status :processing, :count 1}
+ {:qname :send-message, :status :done, :count 21106}
+ {:qname :send-message, :status :init, :count 56}]
```
-### 2021-09-27 [v0.2.39](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39)
+### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39)
Added `:valid-payload?` option for queue consumers.
-### 2021-09-27 [v0.2.37](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37)
+### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37)
Improved error reporting.
-### 2021-09-24 v0.2.33: First publicly announced release.
+### 2021-09-24 v0.2.33
+First publicly announced release.
## License
diff --git a/deps.edn b/deps.edn
index d0f0a26..8e769e1 100644
--- a/deps.edn
+++ b/deps.edn
@@ -1,5 +1,5 @@
{:deps {org.clojure/tools.logging {:mvn/version "1.1.0"}
- org.clojure/clojure {:mvn/version "1.10.3"}}
+ org.clojure/clojure {:mvn/version "1.11.0"}}
:paths ["src"]
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 03a364f..17aa40a 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -19,7 +19,7 @@
(def default-opts
(-> {; Default number of times a queue job will be retried before giving up
- ; Can be overridden on a per consumer basis with
+ ; Can be overridden on a per-consumer basis with
; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200})
:max-retries 100
@@ -34,7 +34,7 @@
:hung-backoff-time (Duration/ofMinutes 30)
; Most queue jobs in init state will be consumed by the tx-report-queue listener.
- ; However in the case where a init job was added right before the application
+ ; However, in the case where an init job was added right before the application
; was shut down and did not have time to be processed by the tx-report-queue listener,
; it will be consumer by the init poller. This init poller backs off by
; :init-backoff-time to avoid unnecessary compare-and-swap lock failures that could
@@ -66,7 +66,7 @@
; How often should the system invoke
:system-error-callback-backoff (Duration/ofHours 1)}
- u/duration->nanos))
+ u/duration->millis))
(defn init! [{:keys [conn] :as cfg}]
@@ -83,7 +83,7 @@
default-opts
(if *test-mode* old-conf (select-keys old-conf [:handlers]))
cfg)
- u/duration->nanos)))]
+ u/duration->millis)))]
new-cfg)))
diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj
index 1268482..ee6359e 100644
--- a/src/com/github/ivarref/yoltq/error_poller.clj
+++ b/src/com/github/ivarref/yoltq/error_poller.clj
@@ -22,7 +22,7 @@
state :recovery}}
{:keys [system-error-min-count system-error-callback-backoff]
:or {system-error-min-count 3}}
- now-ns
+ now-ms
error-count]
(let [new-errors (->> (conj errors error-count)
(take-last system-error-min-count)
@@ -50,14 +50,14 @@
(when (and (= old-state :recovery)
(= new-state :error))
{:run-callback :error
- :last-notify now-ns})
+ :last-notify now-ms})
(when (and (= new-state :error)
(= old-state :error)
- (> now-ns
+ (> now-ms
(+ last-notify system-error-callback-backoff)))
{:run-callback :error
- :last-notify now-ns})
+ :last-notify now-ms})
(when (and (= new-state :recovery)
(= old-state :error))
@@ -88,7 +88,7 @@
(log/debug "poll-errors found" error-count "errors in system")
(reset! healthy? false))
(reset! healthy? true))
- (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)]
+ (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)]
(when run-callback
(cond (= run-callback :error)
(on-system-error)
diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj
index 3480475..692b934 100644
--- a/src/com/github/ivarref/yoltq/ext_sys.clj
+++ b/src/com/github/ivarref/yoltq/ext_sys.clj
@@ -1,17 +1,18 @@
(ns com.github.ivarref.yoltq.ext-sys
(:require [datomic.api :as d])
+ (:refer-clojure :exclude [random-uuid])
(:import (java.util UUID)))
-(def ^:dynamic *now-ns-atom* nil)
+(def ^:dynamic *now-ms-atom* nil)
(def ^:dynamic *squuid-atom* nil)
(def ^:dynamic *random-atom* nil)
-(defn now-ns []
- (if *now-ns-atom*
- @*now-ns-atom*
- (System/nanoTime)))
+(defn now-ms []
+ (if *now-ms-atom*
+ @*now-ms-atom*
+ (System/currentTimeMillis)))
(defn squuid []
@@ -23,4 +24,4 @@
(defn random-uuid []
(if *random-atom*
(UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc))))
- (UUID/randomUUID))) \ No newline at end of file
+ (UUID/randomUUID)))
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index 8b75fc3..b4eef8d 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -63,7 +63,7 @@
:com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
:com.github.ivarref.yoltq/lock (u/random-uuid)
:com.github.ivarref.yoltq/tries 0
- :com.github.ivarref.yoltq/init-time (u/now-ns)}
+ :com.github.ivarref.yoltq/init-time (u/now-ms)}
(when-let [[q ext-id] (:depends-on opts)]
(when-not (d/q '[:find ?e .
:in $ ?ext-id
@@ -138,8 +138,8 @@
[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status]
(if (= new-status u/status-done)
- {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)}
- {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})]
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)}
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})]
start-time (System/nanoTime)
{:keys [db-after]} @(d/transact conn tx)]
(when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time)))
@@ -171,7 +171,7 @@
(log/debug "queue item" (str id) "for queue" queue-name "is now processing")
(let [{:keys [retval exception]}
(try
- (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name])
+ (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name])
(let [v (f payload)]
{:retval v})
(catch Throwable t
@@ -188,7 +188,7 @@
(when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
(let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
(log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
- (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
(assoc q-item :retval retval :success? true :allow-cas-failure? true)))
(some? exception)
@@ -198,14 +198,14 @@
(log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time)))
(log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id))
(log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id))
- (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
(assoc q-item :exception exception)))
:else
(when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
(let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
(log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
- (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
(assoc q-item :retval retval :success? true))))))
(do
(log/error "no handler for queue" queue-name)
diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj
index 28b158f..9cf81c7 100644
--- a/src/com/github/ivarref/yoltq/poller.clj
+++ b/src/com/github/ivarref/yoltq/poller.clj
@@ -25,17 +25,16 @@
(if-not (contains? old q)
(try
(log/debug "polling queue" queue-name "for status" status)
- (let [start-time (u/now-ns)
+ (let [start-time (u/now-ms)
last-res (loop [prev-res nil]
(when @running?
(let [res (poll-once! cfg queue-name status)]
+ (log/debug "poll-once! returned" res)
(if (and res (:success? res))
(recur res)
prev-res))))]
- (let [spent-ns (- (u/now-ns) start-time)]
- (log/trace "done polling queue" q "in"
- (format "%.1f" (double (/ spent-ns 1e6)))
- "ms"))
+ (let [spent-ms (- (u/now-ms) start-time)]
+ (log/trace "done polling queue" q "in" spent-ms "ms"))
last-res)
(finally
(swap! running-queues disj q)))
@@ -44,6 +43,14 @@
(log/error t "poll-queue! crashed:" (ex-message t)))
(finally)))
+(comment
+ (def cfg @com.github.ivarref.yoltq/*config*))
+
+(comment
+ (poll-queue!
+ (atom true)
+ @com.github.ivarref.yoltq/*config*
+ [:add-message-thread :init]))
(defn poll-all-queues! [running? config-atom pool]
(try
@@ -54,4 +61,4 @@
[q-name status])))]
(.execute pool (fn [] (poll-queue! running? @config-atom q))))))
(catch Throwable t
- (log/error t "poll-all-queues! crashed:" (ex-message t))))) \ No newline at end of file
+ (log/error t "poll-all-queues! crashed:" (ex-message t)))))
diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
index f15ef7d..80d3718 100644
--- a/src/com/github/ivarref/yoltq/slow_executor_detector.clj
+++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
@@ -7,7 +7,7 @@
(defn- do-show-slow-threads [{:keys [start-execute-time
max-execute-time]}]
(doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time]
- (when (> (ext/now-ns) (+ start-time max-execute-time))
+ (when (> (ext/now-ms) (+ start-time max-execute-time))
(log/error "thread" (.getName thread) "spent too much time on"
"queue item" (str queue-id)
"for queue" queue-name
@@ -25,4 +25,4 @@
(dotimes [_ 3]
(when @running? (Thread/sleep 1000))))
(catch Throwable t
- (log/error t "reap! crashed:" (ex-message t))))) \ No newline at end of file
+ (log/error t "reap! crashed:" (ex-message t)))))
diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj
index 6183216..ee9cd54 100644
--- a/src/com/github/ivarref/yoltq/test_queue.clj
+++ b/src/com/github/ivarref/yoltq/test_queue.clj
@@ -47,7 +47,7 @@
(with-bindings {#'yq/*config* config#
#'yq/*running?* (atom false)
#'yq/*test-mode* true
- #'ext/*now-ns-atom* (atom 0)
+ #'ext/*now-ms-atom* (atom 0)
#'ext/*random-atom* (atom 0)
#'ext/*squuid-atom* (atom 0)}
(try
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
index d551510..ad2444a 100644
--- a/src/com/github/ivarref/yoltq/utils.clj
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -3,6 +3,7 @@
[clojure.edn :as edn]
[com.github.ivarref.yoltq.ext-sys :as ext]
[clojure.tools.logging :as log])
+ (:refer-clojure :exclude [random-uuid])
(:import (datomic Connection)
(java.time Duration)))
@@ -13,10 +14,10 @@
(def status-error :error)
-(defn duration->nanos [m]
+(defn duration->millis [m]
(reduce-kv (fn [o k v]
(if (instance? Duration v)
- (assoc o k (.toNanos v))
+ (assoc o k (.toMillis v))
(assoc o k v)))
{}
m))
@@ -30,8 +31,8 @@
(ext/random-uuid))
-(defn now-ns []
- (ext/now-ns))
+(defn now-ms []
+ (ext/now-ms))
(defn root-cause [e]
@@ -75,7 +76,7 @@
:bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {})
:tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing]
- {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]}))
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]}))
(defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name]
@@ -94,11 +95,11 @@
[?e :com.github.ivarref.yoltq/lock ?lock]]
db
queue-name
- (- (now-ns) init-backoff-time))
+ (- (now-ms) init-backoff-time))
(not-empty))]
(let [[id old-lock] (rand-nth (into [] ids))]
(prepare-processing db id queue-name old-lock :init))
- (log/trace "no new-items in :init status for queue" queue-name))))
+ (log/debug "no new-items in :init status for queue" queue-name))))
(defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name]
@@ -120,7 +121,7 @@
[?e :com.github.ivarref.yoltq/lock ?lock]]
db
queue-name
- (- (now-ns) error-backoff-time)
+ (- (now-ms) error-backoff-time)
(inc max-retries))
(not-empty))]
(let [[id old-lock] (rand-nth (into [] ids))]
@@ -131,7 +132,7 @@
(assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
(str (if (nil? conn) "nil" conn))
"\nConfig was: " (str cfg)))
- (let [now (or now (now-ns))
+ (let [now (or now (now-ms))
max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)
db (or db (d/db conn))]
(when-let [ids (->> (d/q '[:find ?id ?lock ?tries
diff --git a/test/com/github/ivarref/yoltq/test_utils.clj b/test/com/github/ivarref/yoltq/test_utils.clj
index 5427ff5..e4151c2 100644
--- a/test/com/github/ivarref/yoltq/test_utils.clj
+++ b/test/com/github/ivarref/yoltq/test_utils.clj
@@ -8,7 +8,8 @@
[com.github.ivarref.yoltq.impl :as i]
[clojure.edn :as edn]
[com.github.ivarref.yoltq.ext-sys :as ext])
- (:import (java.util UUID)))
+ (:import (java.util UUID)
+ (java.time Duration)))
(logconfig/init-logging!
@@ -39,10 +40,10 @@
(defn advance! [tp]
- (assert (some? ext/*now-ns-atom*) "Expected to be running in test-mode!")
- (swap! ext/*now-ns-atom* + (if (number? tp)
+ (assert (some? ext/*now-ms-atom*) "Expected to be running in test-mode!")
+ (swap! ext/*now-ms-atom* + (if (number? tp)
tp
- (.toNanos tp))))
+ (.toMillis ^Duration tp))))
(defn done-count []