diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq/utils.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq/utils.clj | 179 |
1 files changed, 179 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj new file mode 100644 index 0000000..9defd0e --- /dev/null +++ b/src/com/github/ivarref/yoltq/utils.clj @@ -0,0 +1,179 @@ +(ns com.github.ivarref.yoltq.utils + (:require [datomic.api :as d] + [clojure.edn :as edn] + [com.github.ivarref.yoltq.ext-sys :as ext] + [clojure.tools.logging :as log]) + (:refer-clojure :exclude [random-uuid]) + (:import (datomic Connection) + (java.time Duration))) + + +(def status-init :init) +(def status-processing :processing) +(def status-done :done) +(def status-error :error) + +(def current-version "2") + +(defn duration->millis [m] + (reduce-kv (fn [o k v] + (if (instance? Duration v) + (assoc o k (.toMillis v)) + (assoc o k v))) + {} + m)) + + +(defn squuid [] + (ext/squuid)) + + +(defn random-uuid [] + (ext/random-uuid)) + + +(defn now-ms [] + (ext/now-ms)) + + +(defn root-cause [e] + (if-let [root (ex-cause e)] + (root-cause root) + e)) + + +(defn db-error-map [^Throwable t] + (loop [e t] + (cond (nil? e) nil + + (and (map? (ex-data e)) + (contains? (ex-data e) :db/error)) + (ex-data e) + + :else + (recur (ex-cause e))))) + + +(defn get-queue-item [db id] + (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id]) + (dissoc :db/id) + (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {}))) + (update :com.github.ivarref.yoltq/bindings + (fn [s] + (when s + (->> s + (edn/read-string) + (reduce-kv (fn [o k v] + (assoc o (resolve k) v)) + {}))))))) + + +(defn prepare-processing [db id queue-name old-lock old-status] + (let [new-lock (random-uuid)] + {:id id + :lock new-lock + :queue-name queue-name + :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) + :tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]})) + + +(defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [db (or db (d/db conn))] + (if-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff ?current-version + :where + [?e :com.github.ivarref.yoltq/status :init] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/init-time ?init-time] + [(>= ?backoff ?init-time)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] + db + queue-name + (- (now-ms) init-backoff-time) + current-version) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing db id queue-name old-lock :init)) + (log/debug "no new-items in :init status for queue" queue-name)))) + +(defn- get-max-retries [cfg queue-name] + (let [v (get-in cfg [:handlers queue-name :max-retries] (:max-retries cfg))] + (if (and (number? v) (pos-int? v)) + v + Long/MAX_VALUE))) + +(defn get-error [{:keys [conn db error-backoff-time] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [db (or db (d/db conn)) + max-retries (get-max-retries cfg queue-name)] + (when-let [ids (->> (d/q '[:find ?id ?lock + :in $ ?queue-name ?backoff ?max-tries ?current-version + :where + [?e :com.github.ivarref.yoltq/status :error] + [?e :com.github.ivarref.yoltq/queue-name ?queue-name] + [?e :com.github.ivarref.yoltq/error-time ?time] + [(>= ?backoff ?time)] + [?e :com.github.ivarref.yoltq/tries ?tries] + [(>= ?max-tries ?tries)] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] + db + queue-name + (- (now-ms) error-backoff-time) + max-retries + current-version) + (not-empty))] + (let [[id old-lock] (rand-nth (into [] ids))] + (prepare-processing db id queue-name old-lock :error))))) + + +(defn get-hung [{:keys [conn db now hung-backoff-time] :as cfg} queue-name] + (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: " + (str (if (nil? conn) "nil" conn)) + "\nConfig was: " (str cfg))) + (let [now (or now (now-ms)) + max-retries (get-max-retries cfg queue-name) + db (or db (d/db conn))] + (when-let [ids (->> (d/q '[:find ?id ?lock ?tries + :in $ ?qname ?backoff ?current-version + :where + [?e :com.github.ivarref.yoltq/status :processing] + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/processing-time ?time] + [(>= ?backoff ?time)] + [?e :com.github.ivarref.yoltq/tries ?tries] + [?e :com.github.ivarref.yoltq/id ?id] + [?e :com.github.ivarref.yoltq/lock ?lock] + [?e :com.github.ivarref.yoltq/version ?current-version]] + db + queue-name + (- now hung-backoff-time) + current-version) + (not-empty))] + (let [new-lock (random-uuid) + [id old-lock tries _t] (rand-nth (into [] ids)) + to-error? (>= tries max-retries)] + {:id id + :lock new-lock + :queue-name queue-name + :was-hung? true + :to-error? to-error? + :bindings (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {}) + :tx (if (not to-error?) + [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}] + [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status status-processing status-error] + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}])})))) |
