aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvar Refsdal <refsdal.ivar@gmail.com>2022-11-18 14:12:50 +0100
committerIvar Refsdal <refsdal.ivar@gmail.com>2022-11-18 14:12:50 +0100
commit8f945d8c0189ad73d862c988faa511e0a7b017df (patch)
treecc207a5fddedd9726bee43ef0ad7d74066e88ca6 /src
parentRelease 0.2.62: Add function processing-time-stats (diff)
downloadfiinha-8f945d8c0189ad73d862c988faa511e0a7b017df.tar.gz
fiinha-8f945d8c0189ad73d862c988faa511e0a7b017df.tar.xz
Release 0.2.63: Add support for :encode and :decode function. Add :partition-fn. Fixes #1
Diffstat (limited to 'src')
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj95
-rw-r--r--src/com/github/ivarref/yoltq/utils.clj1
2 files changed, 59 insertions, 37 deletions
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index c37b0e6..ac573d1 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -1,11 +1,12 @@
(ns com.github.ivarref.yoltq.impl
- (:require [datomic.api :as d]
- [clojure.tools.logging :as log]
+ (:require [clojure.edn :as edn]
[clojure.string :as str]
- [com.github.ivarref.yoltq.utils :as u]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.double-trouble :as dt]
[com.github.ivarref.yoltq.ext-sys :as ext]
- [clojure.edn :as edn]))
-
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d])
+ (:import (java.time Year)))
(def schema
[#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
@@ -13,6 +14,7 @@
#:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes}
#:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true}
@@ -41,13 +43,22 @@
(log/error "could not read-string" what ":" (ex-message e))
(throw e))))
+(defn default-partition-fn [_queue-keyword]
+ (keyword "yoltq" (str "queue_" (.getValue (Year/now)))))
-(defn put [{:keys [capture-bindings conn] :as config}
+(defn put [{:keys [capture-bindings conn encode partition-fn]
+ :or {partition-fn default-partition-fn
+ encode (partial pr-str-safe :payload)}
+ :as config}
queue-name
payload
opts]
(if-let [q-config (get-in config [:handlers queue-name])]
(let [id (u/squuid)
+ encode (get q-config :encode encode)
+ partition-fn (get q-config :partition-fn partition-fn)
+ partition (partition-fn queue-name)
+ _ (assert (keyword? partition) "Partition must be a keyword")
depends-on (get q-config :depends-on (fn [_] nil))
valid-payload? (get q-config :valid-payload? (fn [_] true))
opts (merge
@@ -58,32 +69,41 @@
(assoc o (symbol k) (deref k)))
{}
(or capture-bindings []))
- (pr-str-safe :capture-bindings))]
- (when-not (valid-payload? payload)
- (log/error "Payload was not valid. Payload was:" payload)
- (throw (ex-info (str "Payload was not valid: " payload) {:payload payload})))
+ (pr-str-safe :capture-bindings))
+ _ (when-not (valid-payload? payload)
+ (log/error "Payload was not valid. Payload was:" payload)
+ (throw (ex-info (str "Payload was not valid: " payload) {:payload payload})))
+ encoded (encode payload)
+ _ (when (not (or (bytes? encoded) (string? encoded)))
+ (log/error "Payload must be encoded to either a string or a byte array")
+ (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))]
(log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init)
- (merge
- {:com.github.ivarref.yoltq/id id
- :com.github.ivarref.yoltq/queue-name queue-name
- :com.github.ivarref.yoltq/status u/status-init
- :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload)
- :com.github.ivarref.yoltq/bindings str-bindings
- :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
- :com.github.ivarref.yoltq/lock (u/random-uuid)
- :com.github.ivarref.yoltq/tries 0
- :com.github.ivarref.yoltq/init-time (u/now-ms)
- :com.github.ivarref.yoltq/version "2"}
- (when-let [[q ext-id] (:depends-on opts)]
- (when-not (d/q '[:find ?e .
- :in $ ?ext-id
- :where
- [?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
- (d/db conn)
- (pr-str-safe :depends-on [q ext-id]))
- (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
- (when-let [ext-id (:id opts)]
- {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))
+ (do
+ (dt/ensure-partition! conn partition)
+ (merge
+ (if (bytes? encoded)
+ {:com.github.ivarref.yoltq/payload-bytes encoded}
+ {:com.github.ivarref.yoltq/payload encoded})
+ {:db/id (d/tempid partition)
+ :com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name queue-name
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/bindings str-bindings
+ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
+ :com.github.ivarref.yoltq/lock (u/random-uuid)
+ :com.github.ivarref.yoltq/tries 0
+ :com.github.ivarref.yoltq/init-time (u/now-ms)
+ :com.github.ivarref.yoltq/version "2"}
+ (when-let [[q ext-id] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
+ (d/db conn)
+ (pr-str-safe :depends-on [q ext-id]))
+ (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
+ (when-let [ext-id (:id opts)]
+ {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))))
(do
(log/error "Did not find registered handler for queue" queue-name)
(throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
@@ -169,20 +189,23 @@
"in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"]))
-(defn execute! [{:keys [handlers mark-status-fn! start-execute-time collect-spent-time!]
- :or {mark-status-fn! mark-status!}
+(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!]
+ :or {mark-status-fn! mark-status!
+ decode edn/read-string}
:as cfg}
- {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}]
+ {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}]
(when queue-item
(if (= :error status)
(assoc queue-item :failed? true)
(if-let [queue (get handlers queue-name)]
- (let [{:keys [f allow-cas-failure?]} queue]
+ (let [{:keys [f allow-cas-failure?]} queue
+ decode (get queue :decode decode)]
(log/debug "queue item" (str id) "for queue" queue-name "is now processing")
(let [{:keys [retval exception]}
(try
(swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name])
- (let [v (f payload)]
+ (let [payload (decode (or payload payload-bytes))
+ v (f payload)]
{:retval v})
(catch Throwable t
{:exception t})
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
index 39572a9..7665b6d 100644
--- a/src/com/github/ivarref/yoltq/utils.clj
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -57,7 +57,6 @@
(defn get-queue-item [db id]
(-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id])
(dissoc :db/id)
- (update :com.github.ivarref.yoltq/payload edn/read-string)
(update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {})))
(update :com.github.ivarref.yoltq/bindings
(fn [s]