1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package fiinha
import (
"errors"
"runtime"
"guuid"
g "gobang"
)
const (
topicX = "new-event-x"
topicY = "new-event-y"
)
var forbidden3Err = errors.New("we don't like 3")
func processNewEventXToY(message Message) (UnsentMessage, error) {
payload := string(message.Payload)
if payload == "event 3" {
return UnsentMessage{}, forbidden3Err
}
newPayload := []byte("processed " + payload)
unsent := UnsentMessage{
Topic: topicY,
FlowID: message.FlowID,
Payload: newPayload,
}
return unsent, nil
}
func MainTest() {
g.SetLevel(g.LevelNone)
_, file, _, ok := runtime.Caller(0)
g.TAssertEqualS(ok, true, "can't get filename")
databasePath := file + ".db"
queue, err := New(databasePath)
g.TErrorIf(err)
defer queue.Close()
pub := func(payload []byte, flowID guuid.UUID) {
unsent := UnsentMessage{
Topic: topicX,
FlowID: flowID,
Payload: payload,
}
_, err := queue.Publish(unsent)
g.TErrorIf(err)
}
g.Testing("we can WaitFor() a message after a deadletter", func() {
flowID := guuid.New()
handlerFn := func(message Message) error {
messageY, err := processNewEventXToY(message)
if err != nil {
return err
}
_, err = queue.Publish(messageY)
return err
}
queue.Subscribe(topicX, "main-worker", handlerFn)
defer queue.Unsubscribe(topicX, "main-worker")
pub([]byte("event 1"), guuid.New())
pub([]byte("event 2"), guuid.New())
pub([]byte("event 3"), guuid.New())
pub([]byte("event 4"), guuid.New())
pub([]byte("event 5"), flowID)
given := <- queue.WaitFor(topicY, flowID, "waiter").Channel
g.TAssertEqual(given, []byte("processed event 5"))
})
}
|