aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/impl.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq/impl.clj')
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj248
1 files changed, 248 insertions, 0 deletions
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
new file mode 100644
index 0000000..ffb1ad8
--- /dev/null
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -0,0 +1,248 @@
+(ns com.github.ivarref.yoltq.impl
+ (:require [clojure.edn :as edn]
+ [clojure.string :as str]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.double-trouble :as dt]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d])
+ (:import (java.time Year)))
+
+(def schema
+ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
+ #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value}
+ #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/job-group, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes}
+ #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true}
+ #:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true}
+ #:db{:ident :com.github.ivarref.yoltq/init-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/processing-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/done-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}
+ #:db{:ident :com.github.ivarref.yoltq/version, :cardinality :db.cardinality/one, :valueType :db.type/string, :index true}])
+
+(defn pr-str-inner [x]
+ (binding [*print-dup* false
+ *print-meta* false
+ *print-readably* true
+ *print-length* nil
+ *print-level* nil
+ *print-namespace-maps* false]
+ (pr-str x)))
+
+(defn pr-str-safe [what x]
+ (try
+ (if (= x (edn/read-string (pr-str-inner x)))
+ (pr-str-inner x)
+ (throw (ex-info (str "Could not read-string " what) {:input x})))
+ (catch Exception e
+ (log/error "could not read-string" what ":" (ex-message e))
+ (throw e))))
+
+(defn default-partition-fn [_queue-keyword]
+ (keyword "yoltq" (str "queue_" (.getValue (Year/now)))))
+
+(defn put [{:keys [capture-bindings conn encode partition-fn]
+ :or {partition-fn default-partition-fn
+ encode (partial pr-str-safe :payload)}
+ :as config}
+ queue-name
+ payload
+ opts]
+ (if-let [q-config (get-in config [:handlers queue-name])]
+ (let [id (u/squuid)
+ encode (get q-config :encode encode)
+ partition-fn (get q-config :partition-fn partition-fn)
+ partition (partition-fn queue-name)
+ _ (assert (keyword? partition) "Partition must be a keyword")
+ depends-on (get q-config :depends-on (fn [_] nil))
+ valid-payload? (get q-config :valid-payload? (fn [_] true))
+ opts (merge
+ (when-let [deps (depends-on payload)]
+ {:depends-on deps})
+ (or opts {}))
+ str-bindings (->> (reduce (fn [o k]
+ (assoc o (symbol k) (deref k)))
+ {}
+ (or capture-bindings []))
+ (pr-str-safe :capture-bindings))
+ _ (when-not (valid-payload? payload)
+ (log/error "Payload was not valid. Payload was:" payload)
+ (throw (ex-info (str "Payload was not valid: " payload) {:payload payload})))
+ encoded (encode payload)
+ _ (when (not (or (bytes? encoded) (string? encoded)))
+ (log/error "Payload must be encoded to either a string or a byte array")
+ (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))]
+ (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init)
+ (do
+ (dt/ensure-partition! conn partition)
+ (merge
+ (if (bytes? encoded)
+ {:com.github.ivarref.yoltq/payload-bytes encoded}
+ {:com.github.ivarref.yoltq/payload encoded})
+ {:db/id (d/tempid partition)
+ :com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name queue-name
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/bindings str-bindings
+ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
+ :com.github.ivarref.yoltq/lock (u/random-uuid)
+ :com.github.ivarref.yoltq/tries 0
+ :com.github.ivarref.yoltq/init-time (u/now-ms)
+ :com.github.ivarref.yoltq/version "2"}
+ (when-let [[q ext-id] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
+ (d/db conn)
+ (pr-str-safe :depends-on [q ext-id]))
+ (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
+ (when-let [ext-id (:id opts)]
+ {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})
+ (when-let [job-group (:job-group opts)]
+ {:com.github.ivarref.yoltq/job-group job-group}))))
+ (do
+ (log/error "Did not find registered handler for queue" queue-name)
+ (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
+
+
+(defn depends-on-waiting? [{:keys [conn]}
+ q-item]
+ (let [db (d/db conn)]
+ (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db (:id q-item))]
+ (when-let [[q id :as depends-on] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]
+ [?e :com.github.ivarref.yoltq/status :done]]
+ db
+ (pr-str [q id]))
+ (log/info "queue item" (str (:id q-item)) "is waiting on" depends-on)
+ {:depends-on depends-on})))))
+
+
+(defn take! [{:keys [conn cas-failures hung-log-level tx-spent-time!]
+ :or {hung-log-level :error}}
+ {:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}]
+ (when queue-item-info
+ (try
+ (cond to-error?
+ (log/logp hung-log-level "queue-item" (str id) "was hung and retried too many times. Giving up!")
+
+ was-hung?
+ (log/logp hung-log-level "queue-item" (str id) "was hung, retrying ...")
+
+ :else
+ nil)
+ (let [start-time (System/nanoTime)
+ {:keys [db-after]} @(d/transact conn tx)
+ _ (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time)))
+ {:com.github.ivarref.yoltq/keys [status] :as q-item} (u/get-queue-item db-after id)]
+ (log/debug "queue item" (str id) "for queue" queue-name "now has status" status)
+ q-item)
+ (catch Throwable t
+ (let [{:db/keys [error] :as m} (u/db-error-map t)]
+ (cond
+ (= :db.error/cas-failed error)
+ (do
+ (log/info "take! :db.error/cas-failed for queue item" (str id) "and attribute" (:a m))
+ (when cas-failures
+ (swap! cas-failures inc))
+ nil)
+
+ :else
+ (do
+ (log/error t "Unexpected failure for queue item" (str id) ":" (ex-message t))
+ nil)))))))
+
+
+(defn mark-status! [{:keys [conn tx-spent-time!]}
+ {:com.github.ivarref.yoltq/keys [id lock tries]}
+ new-status]
+ (try
+ (let [tx [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock lock (u/random-uuid)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
+ [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status u/status-processing new-status]
+ (if (= new-status u/status-done)
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/done-time (u/now-ms)}
+ {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time (u/now-ms)})]
+ start-time (System/nanoTime)
+ {:keys [db-after]} @(d/transact conn tx)]
+ (when tx-spent-time! (tx-spent-time! (- (System/nanoTime) start-time)))
+ (u/get-queue-item db-after id))
+ (catch Throwable t
+ (log/error t "unexpected error in mark-status!: " (ex-message t))
+ nil)))
+
+
+(defn fmt [id queue-name new-status tries spent-ns]
+ (str/join " " ["queue-item" (str id)
+ "for queue" queue-name
+ "now has status" new-status
+ "after" tries (if (= 1 tries)
+ "try"
+ "tries")
+ "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"]))
+
+
+(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!]
+ :or {mark-status-fn! mark-status!
+ decode edn/read-string}
+ :as cfg}
+ {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}]
+ (when queue-item
+ (if (= :error status)
+ (assoc queue-item :failed? true)
+ (if-let [queue (get handlers queue-name)]
+ (let [{:keys [f allow-cas-failure?]} queue
+ decode (get queue :decode decode)]
+ (log/debug "queue item" (str id) "for queue" queue-name "is now processing")
+ (let [{:keys [retval exception]}
+ (try
+ (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name])
+ (let [payload (decode (or payload payload-bytes))
+ v (f payload)]
+ {:retval v})
+ (catch Throwable t
+ {:exception t})
+ (finally
+ (swap! start-execute-time dissoc (Thread/currentThread))))
+ {:db/keys [error] :as m} (u/db-error-map exception)]
+ (cond
+ (and (some? exception)
+ allow-cas-failure?
+ (= :db.error/cas-failed error)
+ (or (true? allow-cas-failure?)
+ (allow-cas-failure? (:a m))))
+ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
+ (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
+ (log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
+ (assoc q-item :retval retval :success? true :allow-cas-failure? true)))
+
+ (some? exception)
+ (when-let [q-item (mark-status-fn! cfg queue-item u/status-error)]
+ (let [{:com.github.ivarref.yoltq/keys [init-time error-time tries]} q-item
+ level (if (>= tries 3) :error :warn)]
+ (log/logp level exception (fmt id queue-name u/status-error tries (- error-time init-time)))
+ (log/logp level exception "error message was:" (str \" (ex-message exception) \") "for queue-item" (str id))
+ (log/logp level exception "ex-data was:" (ex-data exception) "for queue-item" (str id))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
+ (assoc q-item :exception exception)))
+
+ :else
+ (when-let [q-item (mark-status-fn! cfg queue-item u/status-done)]
+ (let [{:com.github.ivarref.yoltq/keys [init-time done-time tries]} q-item]
+ (log/info (fmt id queue-name u/status-done tries (- done-time init-time)))
+ (when collect-spent-time! (collect-spent-time! (- (u/now-ms) init-time)))
+ (assoc q-item :retval retval :success? true))))))
+ (do
+ (log/error "no handler for queue" queue-name)
+ nil)))))