aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorire <refsdal.ivar@gmail.com>2025-09-22 11:20:50 +0200
committerire <refsdal.ivar@gmail.com>2025-09-22 11:20:50 +0200
commitbfd0d662f1ef212ab194ce3f78b04dc527f42d95 (patch)
treec2f19875e64c3363d29c57b060ba61dd2ef53e20
parentUpdate for release (diff)
parentAdded documentation for job-group feature (diff)
downloadfiinha-bfd0d662f1ef212ab194ce3f78b04dc527f42d95.tar.gz
fiinha-bfd0d662f1ef212ab194ce3f78b04dc527f42d95.tar.xz
Merge branch 'batches-of-jobs'
-rw-r--r--README.md34
-rw-r--r--src/com/github/ivarref/yoltq.clj23
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj5
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj22
4 files changed, 83 insertions, 1 deletions
diff --git a/README.md b/README.md
index 8747162..5fca23d 100644
--- a/README.md
+++ b/README.md
@@ -277,6 +277,40 @@ For an exhaustive list of all configuration options,
see
[yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21).
+# Groups of Jobs
+
+Yoltq supports grouping jobs in a queue, and tracking the progress of such a
+group of jobs. Consider this example: your system is used by the marketing
+department to send emails to groups of users. Multiple colleagues in the
+marketing department could potentially do this at the same time, but they want
+to see the progress of their _own_ campagne, not that of _all_ emails being
+sent. When adding the jobs to the queue, you can specify the `job-group`
+parameter, in this case indicate which marketeer is running the jobs:
+
+```clojure
+(doseq [uid user-ids]
+ @(d/transact conn [(yq/put :send-mail
+ ; Payload:
+ {:user-id uid :from ... :to ... :body ...}
+ ; Job options:
+ {:job-group :mail-campagne/for-marketeer-42})]))
+```
+
+When you want to know the progress of that specific job group, and display it in
+your user interface, you can use `job-group-progress`, which returns a structure
+similar to `queue-stats`:
+
+```clojure
+(yq/job-group-progress :send-mail :mail-campagne/for-marketeer-42)
+;; => [{:qname :send-mail
+;; :job-group :mail-campagne/for-marketeer-42
+;; :status :init
+;; :count 78}
+;; {:qname :send-mail
+;; :job-group :mail-campagne/for-marketeer-42
+;; :status :done
+;; :count 24}]
+```
# Regular and REPL usage
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 0f63e25..8c8ca7a 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -257,6 +257,29 @@
(sort-by (juxt :qname :status))
(vec))))
+(defn job-group-progress [queue-name job-group]
+ (let [{:keys [conn]} @*config*
+ db (d/db conn)]
+ (->> (d/q '[:find ?e ?qname ?job-group ?status
+ :keys :e :qname :job-group :status
+ :in $ ?qname ?job-group
+ :where
+ [?e :com.github.ivarref.yoltq/queue-name ?qname]
+ [?e :com.github.ivarref.yoltq/job-group ?job-group]
+ [?e :com.github.ivarref.yoltq/status ?status]]
+ db queue-name job-group)
+ (mapv #(select-keys % [:qname :job-group :status]))
+ (mapv (fn [qitem] {qitem 1}))
+ (reduce (partial merge-with +) {})
+ (mapv (fn [[{:keys [qname job-group status]} v]]
+ (array-map
+ :qname qname
+ :job-group job-group
+ :status status
+ :count v)))
+ (sort-by (juxt :qname :job-group :status))
+ (vec))))
+
(defn get-errors [qname]
(let [{:keys [conn]} @*config*
db (d/db conn)]
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index ac573d1..ffb1ad8 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -12,6 +12,7 @@
[#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
#:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value}
#:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+ #:db{:ident :com.github.ivarref.yoltq/job-group, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes}
@@ -103,7 +104,9 @@
(pr-str-safe :depends-on [q ext-id]))
(throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
(when-let [ext-id (:id opts)]
- {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))))
+ {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})
+ (when-let [job-group (:job-group opts)]
+ {:com.github.ivarref.yoltq/job-group job-group}))))
(do
(log/error "Did not find registered handler for queue" queue-name)
(throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
index 2800c21..a2ed269 100644
--- a/test/com/github/ivarref/yoltq/virtual_test.clj
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -450,3 +450,25 @@
@(d/transact conn [(yq/put :q "asdf")])
(tq/consume! :q)
(is (= @got-work "asdf"))))
+
+(deftest job-group-test
+ (let [conn (u/empty-conn)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q1 identity)
+ (yq/add-consumer! :q2 identity)
+ @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1})
+ (yq/put :q1 {:work 456} {:job-group :group2})
+ (yq/put :q2 {:work 789} {:job-group :group1})])
+ (is (= [{:qname :q1
+ :job-group :group1
+ :status :init
+ :count 1}]
+ (yq/job-group-progress :q1 :group1)))
+
+ (is (= {:work 123} (tq/consume! :q1)))
+
+ (is (= [{:qname :q1
+ :job-group :group1
+ :status :done
+ :count 1}]
+ (yq/job-group-progress :q1 :group1)))))