diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq/impl.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq/impl.clj | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj new file mode 100644 index 0000000..ffb1ad8 --- /dev/null +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -0,0 +1,248 @@ +(ns com.github.ivarref.yoltq.impl + (:require [clojure.edn :as edn] + [clojure.string :as str] + [clojure.tools.logging :as log] + [com.github.ivarref.double-trouble :as dt] + [com.github.ivarref.yoltq.ext-sys :as ext] + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) + (:import (java.time Year))) + +(def schema + [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} + #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} + #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/job-group, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} + #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true} + #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long} + #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}]) + +(defn pr-str-inner [x] + (binding [*print-dup* false + *print-meta* false + *print-readably* true + *print-length* nil + *print-level* nil + *print-namespace-maps* false] + (pr-str x))) + +(defn pr-str-safe [what x] + (try + (if (= x (edn/read-string (pr-str-inner x))) + (pr-str-inner x) + (throw (ex-info (str "Could not read-string " what) {:input x}))) + (catch Exception e + (log/error "could not read-string" what ":" (ex-message e)) + (throw e)))) + +(defn default-partition-fn [_queue-keyword] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) + +(defn put [{:keys [capture-bindings conn encode partition-fn] + :or {partition-fn default-partition-fn + encode (partial pr-str-safe :payload)} + :as config} + queue-name + payload + opts] + (if-let [q-config (get-in config [:handlers queue-name])] + (let [id (u/squuid) + encode (get q-config :encode encode) + partition-fn (get q-config :partition-fn partition-fn) + partition (partition-fn queue-name) + _ (assert (keyword? partition) "Partition must be a keyword") + depends-on (get q-config :depends-on (fn [_] nil)) + valid-payload? (get q-config :valid-payload? (fn [_] true)) + opts (merge + (when-let [deps (depends-on payload)] + {:depends-on deps}) + (or opts {})) + str-bindings (->> (reduce (fn [o k] + (assoc o (symbol k) (deref k))) + {} + (or capture-bindings [])) + (pr-str-safe :capture-bindings)) + _ (when-not (valid-payload? payload) + (log/error "Payload was not valid. Payload was:" payload) + (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + encoded (encode payload) + _ (when (not (or (bytes? encoded) (string? encoded))) + (log/error "Payload must be encoded to either a string or a byte array") + (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))] + (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) + (do + (dt/ensure-partition! conn partition) + (merge + (if (bytes? encoded) + {:com.github.ivarref.yoltq/payload-bytes encoded} + {:com.github.ivarref.yoltq/payload encoded}) + {:db/id (d/tempid partition) + :com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/bindings str-bindings + :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ms) + :com.github.ivarref.yoltq/version "2"} + (when-let [[q ext-id] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] + (d/db conn) + (pr-str-safe :depends-on [q ext-id])) + (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) + (when-let [ext-id (:id opts)] + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}) + (when-let [job-group (:job-group opts)] + {:com.github.ivarref.yoltq/job-group job-group})))) + (do + (log/error "Did not find registered handler for queue" queue-name) + (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) + + +(defn depends-on-waiting? [{:keys [conn]} + q-item] + (let [db (d/db conn)] + (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db (:id q-item))] + (when-let [[q id :as depends-on] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id] + [?e :com.github.ivarref.yoltq/status :done]] + db + (pr-str [q id])) + (log/info "queue item" (str (:id q-item)) "is waiting on" depends-on) + {:depends-on depends-on}))))) + + +(defn take! [{:keys [conn cas-failures hung-log-level tx-spent-time!] + :or {hung-log-level :error}} + {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}] + (when queue-item-info + (try + (cond to-error? + (log/logp hung-log-level "queue-item" (str id) "was hung and retried too many times. Giving up!") + + was-hung? + (log/logp hung-log-level "queue-item" (str id) "was hung, retrying ...") + + :else + nil) + (let [start-time (System/nanoTime) + {:keys [db-after]} @(d/transact conn tx) + _ (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) + {:com.github.ivarref.yoltq/keys [status] :as q-item} (u/get-queue-item db-after id)] + (log/debug "queue item" (str id) "for queue" queue-name "now has status" status) + q-item) + (catch Throwable t + (let [{:db/keys [error] :as m} (u/db-error-map t)] + (cond + (= :db.error/cas-failed error) + (do + (log/info "take! :db.error/cas-failed for queue item" (str id) "and attribute" (:a m)) + (when cas-failures + (swap! cas-failures inc)) + nil) + + :else + (do + (log/error t "Unexpected failure for queue item" (str id) ":" (ex-message t)) + nil))))))) + + +(defn mark-status! [{:keys [conn tx-spent-time!]} + {:com.github.ivarref.yoltq/keys [id lock tries]} + new-status] + (try + (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)] + [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status] + (if (= new-status u/status-done) + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)} + {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})] + start-time (System/nanoTime) + {:keys [db-after]} @(d/transact conn tx)] + (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time))) + (u/get-queue-item db-after id)) + (catch Throwable t + (log/error t "unexpected error in mark-status!: " (ex-message t)) + nil))) + + +(defn fmt [id queue-name new-status tries spent-ns] + (str/join " " ["queue-item" (str id) + "for queue" queue-name + "now has status" new-status + "after" tries (if (= 1 tries) + "try" + "tries") + "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) + + +(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!] + :or {mark-status-fn! mark-status! + decode edn/read-string} + :as cfg} + {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}] + (when queue-item + (if (= :error status) + (assoc queue-item :failed? true) + (if-let [queue (get handlers queue-name)] + (let [{:keys [f allow-cas-failure?]} queue + decode (get queue :decode decode)] + (log/debug "queue item" (str id) "for queue" queue-name "is now processing") + (let [{:keys [retval exception]} + (try + (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) + (let [payload (decode (or payload payload-bytes)) + v (f payload)] + {:retval v}) + (catch Throwable t + {:exception t}) + (finally + (swap! start-execute-time dissoc (Thread/currentThread)))) + {:db/keys [error] :as m} (u/db-error-map exception)] + (cond + (and (some? exception) + allow-cas-failure? + (= :db.error/cas-failed error) + (or (true? allow-cas-failure?) + (allow-cas-failure? (:a m)))) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) + (assoc q-item :retval retval :success? true :allow-cas-failure? true))) + + (some? exception) + (when-let [q-item (mark-status-fn! cfg queue-item u/status-error)] + (let [{:com.github.ivarref.yoltq/keys [init-time error-time tries]} q-item + level (if (>= tries 3) :error :warn)] + (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time))) + (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id)) + (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id)) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) + (assoc q-item :exception exception))) + + :else + (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)] + (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item] + (log/info (fmt id queue-name u/status-done tries (- done-time init-time))) + (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time))) + (assoc q-item :retval retval :success? true)))))) + (do + (log/error "no handler for queue" queue-name) + nil))))) |
