aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan van den Oord <stefan@medicinemen.eu>2024-06-14 16:08:59 +0200
committerStefan van den Oord <stefan@medicinemen.eu>2024-06-14 16:08:59 +0200
commit85d13545275678a1077b9600fce136ae10dcb809 (patch)
tree66acd3b2d5933de18945dd8d34420199bb27046e /src
parentUpdate dependency to use the new free Datomic version (diff)
downloadfiinha-85d13545275678a1077b9600fce136ae10dcb809.tar.gz
fiinha-85d13545275678a1077b9600fce136ae10dcb809.tar.xz
#3 Add optional batch name to queue jobs
Diffstat (limited to 'src')
-rw-r--r--src/com/github/ivarref/yoltq.clj23
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj5
2 files changed, 27 insertions, 1 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 379d701..1ba286e 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -230,6 +230,29 @@
(sort-by (juxt :qname :status))
(vec))))
+(defn batch-progress [queue-name batch-name]
+ (let [{:keys [conn]} @*config*
+ db (d/db conn)]
+ (->> (d/q '[:find ?e ?qname ?bname ?status
+ :keys :e :qname :bname :status
+ :in $ ?qname ?bname
+ :where
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/batch-name ?bname]
+ [?e :com.github.ivarref.yoltq/status ?status]]
+ db queue-name batch-name)
+ (mapv #(select-keys % [:qname :bname :status]))
+ (mapv (fn [qitem] {qitem 1}))
+ (reduce (partial merge-with +) {})
+ (mapv (fn [[{:keys [qname bname status]} v]]
+ (array-map
+ :qname qname
+ :batch-name bname
+ :status status
+ :count v)))
+ (sort-by (juxt :qname :batch-name :status))
+ (vec))))
+
(defn get-errors [qname]
(let [{:keys [conn]} @*config*
db (d/db conn)]
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index ac573d1..6d2aa3d 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -12,6 +12,7 @@
[#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
#:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value}
#:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/batch-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes}
@@ -103,7 +104,9 @@
(pr-str-safe :depends-on [q ext-id]))
(throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
(when-let [ext-id (:id opts)]
- {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))))
+ {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})
+ (when-let [batch-name (:batch-name opts)]
+ {:com.github.ivarref.yoltq/batch-name batch-name}))))
(do
(log/error "Did not find registered handler for queue" queue-name)
(throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))