aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq')
-rw-r--r--src/com/github/ivarref/yoltq/error_poller.clj10
-rw-r--r--src/com/github/ivarref/yoltq/ext_sys.clj13
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj14
-rw-r--r--src/com/github/ivarref/yoltq/poller.clj19
-rw-r--r--src/com/github/ivarref/yoltq/slow_executor_detector.clj4
-rw-r--r--src/com/github/ivarref/yoltq/test_queue.clj2
-rw-r--r--src/com/github/ivarref/yoltq/utils.clj19
7 files changed, 45 insertions, 36 deletions
diff --git a/src/com/github/ivarref/yoltq/error_poller.clj b/src/com/github/ivarref/yoltq/error_poller.clj
index 1268482..ee6359e 100644
--- a/src/com/github/ivarref/yoltq/error_poller.clj
+++ b/src/com/github/ivarref/yoltq/error_poller.clj
@@ -22,7 +22,7 @@
state :recovery}}
{:keys [system-error-min-count system-error-callback-backoff]
:or {system-error-min-count 3}}
- now-ns
+ now-ms
error-count]
(let [new-errors (->> (conj errors error-count)
(take-last system-error-min-count)
@@ -50,14 +50,14 @@
(when (and (= old-state :recovery)
(= new-state :error))
{:run-callback :error
- :last-notify now-ns})
+ :last-notify now-ms})
(when (and (= new-state :error)
(= old-state :error)
- (> now-ns
+ (> now-ms
(+ last-notify system-error-callback-backoff)))
{:run-callback :error
- :last-notify now-ns})
+ :last-notify now-ms})
(when (and (= new-state :recovery)
(= old-state :error))
@@ -88,7 +88,7 @@
(log/debug "poll-errors found" error-count "errors in system")
(reset! healthy? false))
(reset! healthy? true))
- (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ns) error-count)]
+ (let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)]
(when run-callback
(cond (= run-callback :error)
(on-system-error)
diff --git a/src/com/github/ivarref/yoltq/ext_sys.clj b/src/com/github/ivarref/yoltq/ext_sys.clj
index 3480475..692b934 100644
--- a/src/com/github/ivarref/yoltq/ext_sys.clj
+++ b/src/com/github/ivarref/yoltq/ext_sys.clj
@@ -1,17 +1,18 @@
(ns com.github.ivarref.yoltq.ext-sys
(:require [datomic.api :as d])
+ (:refer-clojure :exclude [random-uuid])
(:import (java.util UUID)))
-(def ^:dynamic *now-ns-atom* nil)
+(def ^:dynamic *now-ms-atom* nil)
(def ^:dynamic *squuid-atom* nil)
(def ^:dynamic *random-atom* nil)
-(defn now-ns []
- (if *now-ns-atom*
- @*now-ns-atom*
- (System/nanoTime)))
+(defn now-ms []
+ (if *now-ms-atom*
+ @*now-ms-atom*
+ (System/currentTimeMillis)))
(defn squuid []
@@ -23,4 +24,4 @@
(defn random-uuid []
(if *random-atom*
(UUID/fromString (str "00000000-0000-0000-0000-" (format "%012d" (swap! *random-atom* inc))))
- (UUID/randomUUID))) \ No newline at end of file
+ (UUID/randomUUID)))
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index 8b75fc3..b4eef8d 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -63,7 +63,7 @@
:com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
:com.github.ivarref.yoltq/lock (u/random-uuid)
:com.github.ivarref.yoltq/tries 0
- :com.github.ivarref.yoltq/init-time (u/now-ns)}
+ :com.github.ivarref.yoltq/init-time (u/now-ms)}
(when-let [[q ext-id] (:depends-on opts)]
(when-not (d/q '[:find ?e .
:in $ ?ext-id
@@ -138,8 +138,8 @@
[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status]
(if (= new-status u/status-done)
- {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ns)}
- {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ns)})]
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)}
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})]
start-time (System/nanoTime)
{:keys [db-after]} @(d/transact conn tx)]
(when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time)))
@@ -171,7 +171,7 @@
(log/debug "queue item" (str id) "for queue" queue-name "is now processing")
(let [{:keys [retval exception]}
(try
- (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ns) id queue-name])
+ (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name])
(let [v (f payload)]
{:retval v})
(catch Throwable t
@@ -188,7 +188,7 @@
(when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
(let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
(log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
- (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
(assoc q-item :retval retval :success? true :allow-cas-failure? true)))
(some? exception)
@@ -198,14 +198,14 @@
(log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time)))
(log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id))
(log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id))
- (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
(assoc q-item :exception exception)))
:else
(when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
(let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
(log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
- (when collect-spent-time! (collect-spent-time! (- (u/now-ns) init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
(assoc q-item :retval retval :success? true))))))
(do
(log/error "no handler for queue" queue-name)
diff --git a/src/com/github/ivarref/yoltq/poller.clj b/src/com/github/ivarref/yoltq/poller.clj
index 28b158f..9cf81c7 100644
--- a/src/com/github/ivarref/yoltq/poller.clj
+++ b/src/com/github/ivarref/yoltq/poller.clj
@@ -25,17 +25,16 @@
(if-not (contains? old q)
(try
(log/debug "polling queue" queue-name "for status" status)
- (let [start-time (u/now-ns)
+ (let [start-time (u/now-ms)
last-res (loop [prev-res nil]
(when @running?
(let [res (poll-once! cfg queue-name status)]
+ (log/debug "poll-once! returned" res)
(if (and res (:success? res))
(recur res)
prev-res))))]
- (let [spent-ns (- (u/now-ns) start-time)]
- (log/trace "done polling queue" q "in"
- (format "%.1f" (double (/ spent-ns 1e6)))
- "ms"))
+ (let [spent-ms (- (u/now-ms) start-time)]
+ (log/trace "done polling queue" q "in" spent-ms "ms"))
last-res)
(finally
(swap! running-queues disj q)))
@@ -44,6 +43,14 @@
(log/error t "poll-queue! crashed:" (ex-message t)))
(finally)))
+(comment
+ (def cfg @com.github.ivarref.yoltq/*config*))
+
+(comment
+ (poll-queue!
+ (atom true)
+ @com.github.ivarref.yoltq/*config*
+ [:add-message-thread :init]))
(defn poll-all-queues! [running? config-atom pool]
(try
@@ -54,4 +61,4 @@
[q-name status])))]
(.execute pool (fn [] (poll-queue! running? @config-atom q))))))
(catch Throwable t
- (log/error t "poll-all-queues! crashed:" (ex-message t))))) \ No newline at end of file
+ (log/error t "poll-all-queues! crashed:" (ex-message t)))))
diff --git a/src/com/github/ivarref/yoltq/slow_executor_detector.clj b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
index f15ef7d..80d3718 100644
--- a/src/com/github/ivarref/yoltq/slow_executor_detector.clj
+++ b/src/com/github/ivarref/yoltq/slow_executor_detector.clj
@@ -7,7 +7,7 @@
(defn- do-show-slow-threads [{:keys [start-execute-time
max-execute-time]}]
(doseq [[^Thread thread [start-time queue-id queue-name]] @start-execute-time]
- (when (> (ext/now-ns) (+ start-time max-execute-time))
+ (when (> (ext/now-ms) (+ start-time max-execute-time))
(log/error "thread" (.getName thread) "spent too much time on"
"queue item" (str queue-id)
"for queue" queue-name
@@ -25,4 +25,4 @@
(dotimes [_ 3]
(when @running? (Thread/sleep 1000))))
(catch Throwable t
- (log/error t "reap! crashed:" (ex-message t))))) \ No newline at end of file
+ (log/error t "reap! crashed:" (ex-message t)))))
diff --git a/src/com/github/ivarref/yoltq/test_queue.clj b/src/com/github/ivarref/yoltq/test_queue.clj
index 6183216..ee9cd54 100644
--- a/src/com/github/ivarref/yoltq/test_queue.clj
+++ b/src/com/github/ivarref/yoltq/test_queue.clj
@@ -47,7 +47,7 @@
(with-bindings {#'yq/*config* config#
#'yq/*running?* (atom false)
#'yq/*test-mode* true
- #'ext/*now-ns-atom* (atom 0)
+ #'ext/*now-ms-atom* (atom 0)
#'ext/*random-atom* (atom 0)
#'ext/*squuid-atom* (atom 0)}
(try
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
index d551510..ad2444a 100644
--- a/src/com/github/ivarref/yoltq/utils.clj
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -3,6 +3,7 @@
[clojure.edn :as edn]
[com.github.ivarref.yoltq.ext-sys :as ext]
[clojure.tools.logging :as log])
+ (:refer-clojure :exclude [random-uuid])
(:import (datomic Connection)
(java.time Duration)))
@@ -13,10 +14,10 @@
(def status-error :error)
-(defn duration->nanos [m]
+(defn duration->millis [m]
(reduce-kv (fn [o k v]
(if (instance? Duration v)
- (assoc o k (.toNanos v))
+ (assoc o k (.toMillis v))
(assoc o k v)))
{}
m))
@@ -30,8 +31,8 @@
(ext/random-uuid))
-(defn now-ns []
- (ext/now-ns))
+(defn now-ms []
+ (ext/now-ms))
(defn root-cause [e]
@@ -75,7 +76,7 @@
:bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {})
:tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing]
- {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ns)}]}))
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]}))
(defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name]
@@ -94,11 +95,11 @@
[?e :com.github.ivarref.yoltq/lock ?lock]]
db
queue-name
- (- (now-ns) init-backoff-time))
+ (- (now-ms) init-backoff-time))
(not-empty))]
(let [[id old-lock] (rand-nth (into [] ids))]
(prepare-processing db id queue-name old-lock :init))
- (log/trace "no new-items in :init status for queue" queue-name))))
+ (log/debug "no new-items in :init status for queue" queue-name))))
(defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name]
@@ -120,7 +121,7 @@
[?e :com.github.ivarref.yoltq/lock ?lock]]
db
queue-name
- (- (now-ns) error-backoff-time)
+ (- (now-ms) error-backoff-time)
(inc max-retries))
(not-empty))]
(let [[id old-lock] (rand-nth (into [] ids))]
@@ -131,7 +132,7 @@
(assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
(str (if (nil? conn) "nil" conn))
"\nConfig was: " (str cfg)))
- (let [now (or now (now-ns))
+ (let [now (or now (now-ms))
max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)
db (or db (d/db conn))]
(when-let [ids (->> (d/q '[:find ?id ?lock ?tries