diff options
Diffstat (limited to 'src/com/github/ivarref/yoltq/impl.clj')
| -rw-r--r-- | src/com/github/ivarref/yoltq/impl.clj | 95 |
1 files changed, 59 insertions, 36 deletions
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index c37b0e6..ac573d1 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -1,11 +1,12 @@ (ns com.github.ivarref.yoltq.impl - (:require [datomic.api :as d] - [clojure.tools.logging :as log] + (:require [clojure.edn :as edn] [clojure.string :as str] - [com.github.ivarref.yoltq.utils :as u] + [clojure.tools.logging :as log] + [com.github.ivarref.double-trouble :as dt] [com.github.ivarref.yoltq.ext-sys :as ext] - [clojure.edn :as edn])) - + [com.github.ivarref.yoltq.utils :as u] + [datomic.api :as d]) + (:import (java.time Year))) (def schema [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} @@ -13,6 +14,7 @@ #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} + #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true} @@ -41,13 +43,22 @@ (log/error "could not read-string" what ":" (ex-message e)) (throw e)))) +(defn default-partition-fn [_queue-keyword] + (keyword "yoltq" (str "queue_" (.getValue (Year/now))))) -(defn put [{:keys [capture-bindings conn] :as config} +(defn put [{:keys [capture-bindings conn encode partition-fn] + :or {partition-fn default-partition-fn + encode (partial pr-str-safe :payload)} + :as config} queue-name payload opts] (if-let [q-config (get-in config [:handlers queue-name])] (let [id (u/squuid) + encode (get q-config :encode encode) + partition-fn (get q-config :partition-fn partition-fn) + partition (partition-fn queue-name) + _ (assert (keyword? partition) "Partition must be a keyword") depends-on (get q-config :depends-on (fn [_] nil)) valid-payload? (get q-config :valid-payload? (fn [_] true)) opts (merge @@ -58,32 +69,41 @@ (assoc o (symbol k) (deref k))) {} (or capture-bindings [])) - (pr-str-safe :capture-bindings))] - (when-not (valid-payload? payload) - (log/error "Payload was not valid. Payload was:" payload) - (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + (pr-str-safe :capture-bindings)) + _ (when-not (valid-payload? payload) + (log/error "Payload was not valid. Payload was:" payload) + (throw (ex-info (str "Payload was not valid: " payload) {:payload payload}))) + encoded (encode payload) + _ (when (not (or (bytes? encoded) (string? encoded))) + (log/error "Payload must be encoded to either a string or a byte array") + (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))] (log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init) - (merge - {:com.github.ivarref.yoltq/id id - :com.github.ivarref.yoltq/queue-name queue-name - :com.github.ivarref.yoltq/status u/status-init - :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload) - :com.github.ivarref.yoltq/bindings str-bindings - :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) - :com.github.ivarref.yoltq/lock (u/random-uuid) - :com.github.ivarref.yoltq/tries 0 - :com.github.ivarref.yoltq/init-time (u/now-ms) - :com.github.ivarref.yoltq/version "2"} - (when-let [[q ext-id] (:depends-on opts)] - (when-not (d/q '[:find ?e . - :in $ ?ext-id - :where - [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] - (d/db conn) - (pr-str-safe :depends-on [q ext-id])) - (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) - (when-let [ext-id (:id opts)] - {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))) + (do + (dt/ensure-partition! conn partition) + (merge + (if (bytes? encoded) + {:com.github.ivarref.yoltq/payload-bytes encoded} + {:com.github.ivarref.yoltq/payload encoded}) + {:db/id (d/tempid partition) + :com.github.ivarref.yoltq/id id + :com.github.ivarref.yoltq/queue-name queue-name + :com.github.ivarref.yoltq/status u/status-init + :com.github.ivarref.yoltq/bindings str-bindings + :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts) + :com.github.ivarref.yoltq/lock (u/random-uuid) + :com.github.ivarref.yoltq/tries 0 + :com.github.ivarref.yoltq/init-time (u/now-ms) + :com.github.ivarref.yoltq/version "2"} + (when-let [[q ext-id] (:depends-on opts)] + (when-not (d/q '[:find ?e . + :in $ ?ext-id + :where + [?e :com.github.ivarref.yoltq/ext-id ?ext-id]] + (d/db conn) + (pr-str-safe :depends-on [q ext-id])) + (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) + (when-let [ext-id (:id opts)] + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) @@ -169,20 +189,23 @@ "in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"])) -(defn execute! [{:keys [handlers mark-status-fn! start-execute-time collect-spent-time!] - :or {mark-status-fn! mark-status!} +(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!] + :or {mark-status-fn! mark-status! + decode edn/read-string} :as cfg} - {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}] + {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}] (when queue-item (if (= :error status) (assoc queue-item :failed? true) (if-let [queue (get handlers queue-name)] - (let [{:keys [f allow-cas-failure?]} queue] + (let [{:keys [f allow-cas-failure?]} queue + decode (get queue :decode decode)] (log/debug "queue item" (str id) "for queue" queue-name "is now processing") (let [{:keys [retval exception]} (try (swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name]) - (let [v (f payload)] + (let [payload (decode (or payload payload-bytes)) + v (f payload)] {:retval v}) (catch Throwable t {:exception t}) |
