diff options
| -rw-r--r-- | README.md | 34 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq.clj | 23 | ||||
| -rw-r--r-- | src/com/github/ivarref/yoltq/impl.clj | 5 | ||||
| -rw-r--r-- | test/com/github/ivarref/yoltq/virtual_test.clj | 22 |
4 files changed, 83 insertions, 1 deletions
@@ -277,6 +277,40 @@ For an exhaustive list of all configuration options, see [yq/default-opts](https://github.com/ivarref/yoltq/blob/main/src/com/github/ivarref/yoltq.clj#L21). +# Groups of Jobs + +Yoltq supports grouping jobs in a queue, and tracking the progress of such a +group of jobs. Consider this example: your system is used by the marketing +department to send emails to groups of users. Multiple colleagues in the +marketing department could potentially do this at the same time, but they want +to see the progress of their _own_ campagne, not that of _all_ emails being +sent. When adding the jobs to the queue, you can specify the `job-group` +parameter, in this case indicate which marketeer is running the jobs: + +```clojure +(doseq [uid user-ids] + @(d/transact conn [(yq/put :send-mail + ; Payload: + {:user-id uid :from ... :to ... :body ...} + ; Job options: + {:job-group :mail-campagne/for-marketeer-42})])) +``` + +When you want to know the progress of that specific job group, and display it in +your user interface, you can use `job-group-progress`, which returns a structure +similar to `queue-stats`: + +```clojure +(yq/job-group-progress :send-mail :mail-campagne/for-marketeer-42) +;; => [{:qname :send-mail +;; :job-group :mail-campagne/for-marketeer-42 +;; :status :init +;; :count 78} +;; {:qname :send-mail +;; :job-group :mail-campagne/for-marketeer-42 +;; :status :done +;; :count 24}] +``` # Regular and REPL usage diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj index 0f63e25..8c8ca7a 100644 --- a/src/com/github/ivarref/yoltq.clj +++ b/src/com/github/ivarref/yoltq.clj @@ -257,6 +257,29 @@ (sort-by (juxt :qname :status)) (vec)))) +(defn job-group-progress [queue-name job-group] + (let [{:keys [conn]} @*config* + db (d/db conn)] + (->> (d/q '[:find ?e ?qname ?job-group ?status + :keys :e :qname :job-group :status + :in $ ?qname ?job-group + :where + [?e :com.github.ivarref.yoltq/queue-name ?qname] + [?e :com.github.ivarref.yoltq/job-group ?job-group] + [?e :com.github.ivarref.yoltq/status ?status]] + db queue-name job-group) + (mapv #(select-keys % [:qname :job-group :status])) + (mapv (fn [qitem] {qitem 1})) + (reduce (partial merge-with +) {}) + (mapv (fn [[{:keys [qname job-group status]} v]] + (array-map + :qname qname + :job-group job-group + :status status + :count v))) + (sort-by (juxt :qname :job-group :status)) + (vec)))) + (defn get-errors [qname] (let [{:keys [conn]} @*config* db (d/db conn)] diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj index ac573d1..ffb1ad8 100644 --- a/src/com/github/ivarref/yoltq/impl.clj +++ b/src/com/github/ivarref/yoltq/impl.clj @@ -12,6 +12,7 @@ [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity} #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value} #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} + #:db{:ident :com.github.ivarref.yoltq/job-group, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true} #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string} #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes} @@ -103,7 +104,9 @@ (pr-str-safe :depends-on [q ext-id])) (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts)))) (when-let [ext-id (:id opts)] - {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))) + {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}) + (when-let [job-group (:job-group opts)] + {:com.github.ivarref.yoltq/job-group job-group})))) (do (log/error "Did not find registered handler for queue" queue-name) (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name}))))) diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj index 2800c21..a2ed269 100644 --- a/test/com/github/ivarref/yoltq/virtual_test.clj +++ b/test/com/github/ivarref/yoltq/virtual_test.clj @@ -450,3 +450,25 @@ @(d/transact conn [(yq/put :q "asdf")]) (tq/consume! :q) (is (= @got-work "asdf")))) + +(deftest job-group-test + (let [conn (u/empty-conn)] + (yq/init! {:conn conn}) + (yq/add-consumer! :q1 identity) + (yq/add-consumer! :q2 identity) + @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1}) + (yq/put :q1 {:work 456} {:job-group :group2}) + (yq/put :q2 {:work 789} {:job-group :group1})]) + (is (= [{:qname :q1 + :job-group :group1 + :status :init + :count 1}] + (yq/job-group-progress :q1 :group1))) + + (is (= {:work 123} (tq/consume! :q1))) + + (is (= [{:qname :q1 + :job-group :group1 + :status :done + :count 1}] + (yq/job-group-progress :q1 :group1))))) |
