aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/impl.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq/impl.clj')
-rw-r--r--src/com/github/ivarref/yoltq/impl.clj51
1 files changed, 41 insertions, 10 deletions
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index 9c95cff..9811c93 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -8,9 +8,11 @@
(def schema
[#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
+ #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value}
#:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true}
#:db{:ident :com.github.ivarref.yoltq/lock, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :noHistory true}
@@ -20,8 +22,10 @@
#:db{:ident :com.github.ivarref.yoltq/error-time, :cardinality :db.cardinality/one, :valueType :db.type/long}])
-(defn put [{:keys [capture-bindings] :as config}
- queue-name payload]
+(defn put [{:keys [capture-bindings conn] :as config}
+ queue-name
+ payload
+ opts]
(if-let [_ (get-in config [:handlers queue-name])]
(let [id (u/squuid)
str-bindings (->> (reduce (fn [o k]
@@ -30,19 +34,46 @@
(or capture-bindings []))
(pr-str))]
(log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init)
- {:com.github.ivarref.yoltq/id id
- :com.github.ivarref.yoltq/queue-name queue-name
- :com.github.ivarref.yoltq/status u/status-init
- :com.github.ivarref.yoltq/payload (pr-str payload)
- :com.github.ivarref.yoltq/bindings str-bindings
- :com.github.ivarref.yoltq/lock (u/random-uuid)
- :com.github.ivarref.yoltq/tries 0
- :com.github.ivarref.yoltq/init-time (u/now-ns)})
+ (merge
+ {:com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name queue-name
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/payload (pr-str payload)
+ :com.github.ivarref.yoltq/bindings str-bindings
+ :com.github.ivarref.yoltq/opts (pr-str (or opts {}))
+ :com.github.ivarref.yoltq/lock (u/random-uuid)
+ :com.github.ivarref.yoltq/tries 0
+ :com.github.ivarref.yoltq/init-time (u/now-ns)}
+ (when-let [[q ext-id] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
+ (d/db conn)
+ (pr-str [q ext-id]))
+ (throw (ex-info ":depends-on not found in database" opts))))
+ (when-let [ext-id (:id opts)]
+ {:com.github.ivarref.yoltq/ext-id (pr-str [queue-name ext-id])})))
(do
(log/error "Did not find registered handler for queue" queue-name)
(throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
+(defn depends-on-waiting? [{:keys [conn]}
+ {:keys [id]}]
+ (let [db (d/db conn)]
+ (when-let [{:com.github.ivarref.yoltq/keys [opts]} (u/get-queue-item db id)]
+ (when-let [[q id :as depends-on] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]
+ [?e :com.github.ivarref.yoltq/status :done]]
+ db
+ (pr-str [q id]))
+ {:depends-on depends-on})))))
+
+
(defn take! [{:keys [conn cas-failures hung-log-level tx-spent-time!]
:or {hung-log-level :error}}
{:keys [tx id queue-name was-hung? to-error?] :as queue-item-info}]