blob: 29c67dad981009d3dceab42242b864722f54c546 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# yoltq
An opinionated Datomic queue for building (more) reliable systems.
Implements the [transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html)
pattern.
Supports retries, backoff and more.
On-prem only.
## Installation
...
## 1-minute example
```clojure
(require '[com.github.ivarref.yoltq :as yq])
(def conn (datomic.api/connect "..."))
; Initialize system
(yq/init! {:conn conn})
; Add a queue consumer that will intentionally fail on the first attempt
(yq/add-consumer! :q
(let [cnt (atom 0)]
(fn [payload]
(when (= 1 (swap! cnt inc))
; A consumer throwing an exception is considered a queue job failure
(throw (ex-info "failed" {})))
; Anything else than a throwing exception is considered a queue job success
; This includes nil, false and everything else.
(log/info "got payload" payload))))
; Start threadpool
(yq/start!)
; Queue a job
@(d/transact conn [(yq/put :q {:work 123})])
; On your console you will see something like this:
; 17:29:54.598 DEBUG queue item 613... for queue :q is pending status :init
; 17:29:54.602 DEBUG queue item 613... for queue :q now has status :processing
; 17:29:54.603 DEBUG queue item 613... for queue :q is now processing
; 17:29:54.605 WARN queue-item 613... for queue :q now has status :error after 1 try in 4.8 ms
; 17:29:54.607 WARN error message was: "failed" for queue-item 613...
; 17:29:54.615 WARN ex-data was: {} for queue-item 613...
; The item is so far failed...
; But after approximately 10 seconds have elapsed, the item will be retried:
; 17:30:05.596 DEBUG queue item 613... for queue :q now has status :processing
; 17:30:05.597 DEBUG queue item 613... for queue :q is now processing
; 17:30:05.597 INFO got payload {:work 123}
; 17:30:05.599 INFO queue-item 613... for queue :q now has status :done after 2 tries in 5999.3 ms
; And then it has succeeded.
```
## Rationale
Integrating with external systems that may be unavailable can be tricky.
Imagine the following code:
```clojure
(defn post-handler [user-input]
(let [db-item (process user-input)
ext-ref (clj-http.client/post ext-service {...})] ; may throw exception
@(d/transact conn [(assoc db-item :some/ext-ref ext-ref)])))
```
What if the POST request fails? Should it be retried? For how long?
Should it be allowed to fail? How do you then process failures later?
The queue way to solve this would be:
```clojure
(defn get-ext-ref [{:keys [id]}]
(let [ext-ref (clj-http.client/post ext-service {...})] ; may throw exception
@(d/transact conn [[:db/cas [:some/id id]
:some/ext-ref
nil
ext-ref]])))
(yq/add-consumer! :get-ext-ref get-ext-ref {:allow-cas-failure? true})
(defn post-handler [user-input]
(let [{:some/keys [id] :as db-item} (process user-input)
@(d/transact conn [db-item
(yq/put :get-ext-ref {:id id})])))
```
Here `post-handler` will always succeed as long as the transaction commits.
`get-ext-ref` may fail multiple times if `ext-service` is down.
This is fine as long as it eventually succeeds.
There is a special case where `get-ext-ref` succeeds, but
saving the new queue job status to the database fails.
Thus `get-ext-ref` and any queue consumer should tolerate to
be executed successfully several times.
For `get-ext-ref` this is solved by using
the database function [:db/cas (compare-and-swap)](https://docs.datomic.com/on-prem/transactions/transaction-functions.html#dbfn-cas)
to achieve a write-once behaviour.
The yoltq system treats cas failures as job successes
when a consumer has `:allow-cas-failure?` set to `true` in its options.
|