aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq/utils.clj
blob: ad2444afa11595c7347f8b80fb5807312c37ec12 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
(ns com.github.ivarref.yoltq.utils
  (:require [datomic.api :as d]
            [clojure.edn :as edn]
            [com.github.ivarref.yoltq.ext-sys :as ext]
            [clojure.tools.logging :as log])
  (:refer-clojure :exclude [random-uuid])
  (:import (datomic Connection)
           (java.time Duration)))


(def status-init :init)
(def status-processing :processing)
(def status-done :done)
(def status-error :error)


(defn duration->millis [m]
  (reduce-kv (fn [o k v]
               (if (instance? Duration v)
                 (assoc o k (.toMillis v))
                 (assoc o k v)))
             {}
             m))


(defn squuid []
  (ext/squuid))


(defn random-uuid []
  (ext/random-uuid))


(defn now-ms []
  (ext/now-ms))


(defn root-cause [e]
  (if-let [root (ex-cause e)]
    (root-cause root)
    e))


(defn db-error-map [^Throwable t]
  (loop [e t]
    (cond (nil? e) nil

          (and (map? (ex-data e))
               (contains? (ex-data e) :db/error))
          (ex-data e)

          :else
          (recur (ex-cause e)))))


(defn get-queue-item [db id]
  (-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id])
      (dissoc :db/id)
      (update :com.github.ivarref.yoltq/payload edn/read-string)
      (update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {})))
      (update :com.github.ivarref.yoltq/bindings
              (fn [s]
                (when s
                  (->> s
                       (edn/read-string)
                       (reduce-kv (fn [o k v]
                                    (assoc o (resolve k) v))
                                  {})))))))


(defn prepare-processing [db id queue-name old-lock old-status]
  (let [new-lock (random-uuid)]
    {:id         id
     :lock       new-lock
     :queue-name queue-name
     :bindings   (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {})
     :tx         [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
                  [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status old-status status-processing]
                  {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/processing-time (now-ms)}]}))


(defn get-init [{:keys [conn db init-backoff-time] :as cfg} queue-name]
  (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
                                           (str (if (nil? conn) "nil" conn))
                                           "\nConfig was: " (str cfg)))
  (let [db (or db (d/db conn))]
    (if-let [ids (->> (d/q '[:find ?id ?lock
                             :in $ ?queue-name ?backoff
                             :where
                             [?e :com.github.ivarref.yoltq/status :init]
                             [?e :com.github.ivarref.yoltq/queue-name ?queue-name]
                             [?e :com.github.ivarref.yoltq/init-time ?init-time]
                             [(>= ?backoff ?init-time)]
                             [?e :com.github.ivarref.yoltq/id ?id]
                             [?e :com.github.ivarref.yoltq/lock ?lock]]
                           db
                           queue-name
                           (- (now-ms) init-backoff-time))
                      (not-empty))]
      (let [[id old-lock] (rand-nth (into [] ids))]
        (prepare-processing db id queue-name old-lock :init))
      (log/debug "no new-items in :init status for queue" queue-name))))


(defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name]
  (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
                                           (str (if (nil? conn) "nil" conn))
                                           "\nConfig was: " (str cfg)))
  (let [db (or db (d/db conn))
        max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)]
    (when-let [ids (->> (d/q '[:find ?id ?lock
                               :in $ ?queue-name ?backoff ?max-tries
                               :where
                               [?e :com.github.ivarref.yoltq/status :error]
                               [?e :com.github.ivarref.yoltq/queue-name ?queue-name]
                               [?e :com.github.ivarref.yoltq/error-time ?time]
                               [(>= ?backoff ?time)]
                               [?e :com.github.ivarref.yoltq/tries ?tries]
                               [(> ?max-tries ?tries)]
                               [?e :com.github.ivarref.yoltq/id ?id]
                               [?e :com.github.ivarref.yoltq/lock ?lock]]
                             db
                             queue-name
                             (- (now-ms) error-backoff-time)
                             (inc max-retries))
                        (not-empty))]
      (let [[id old-lock] (rand-nth (into [] ids))]
        (prepare-processing db id queue-name old-lock :error)))))


(defn get-hung [{:keys [conn db now hung-backoff-time max-retries] :as cfg} queue-name]
  (assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
                                           (str (if (nil? conn) "nil" conn))
                                           "\nConfig was: " (str cfg)))
  (let [now (or now (now-ms))
        max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)
        db (or db (d/db conn))]
    (when-let [ids (->> (d/q '[:find ?id ?lock ?tries
                               :in $ ?qname ?backoff
                               :where
                               [?e :com.github.ivarref.yoltq/status :processing]
                               [?e :com.github.ivarref.yoltq/queue-name ?qname]
                               [?e :com.github.ivarref.yoltq/processing-time ?time]
                               [(>= ?backoff ?time)]
                               [?e :com.github.ivarref.yoltq/tries ?tries]
                               [?e :com.github.ivarref.yoltq/id ?id]
                               [?e :com.github.ivarref.yoltq/lock ?lock]]
                             db
                             queue-name
                             (- now hung-backoff-time))
                        (not-empty))]
      (let [new-lock (random-uuid)
            [id old-lock tries _t] (rand-nth (into [] ids))
            to-error? (>= tries max-retries)]
        {:id         id
         :lock       new-lock
         :queue-name queue-name
         :was-hung?  true
         :to-error?  to-error?
         :bindings   (get (get-queue-item db id) :com.github.ivarref.yoltq/bindings {})
         :tx         (if (not to-error?)
                       [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
                        [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
                        {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}]
                       [[:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/lock old-lock new-lock]
                        [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/tries tries (inc tries)]
                        [:db/cas [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/status status-processing status-error]
                        {:db/id [:com.github.ivarref.yoltq/id id] :com.github.ivarref.yoltq/error-time now}])}))))