aboutsummaryrefslogtreecommitdiff
path: root/tests/functional/consumer-with-deadletter/fiinha.go
blob: 7d88e0ea664c8eb8f979f0cbcfc434b649df85ed (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package fiinha

import (
	"errors"
	"runtime"

	"guuid"
	g "gobang"
)



const (
	topicX = "new-event-x"
	topicY = "new-event-y"
)

var forbidden3Err = errors.New("we don't like 3")



func processNewEventXToY(message Message) (UnsentMessage, error) {
	payload := string(message.Payload)
	if payload == "event 3" {
		return UnsentMessage{}, forbidden3Err
	}

	newPayload := []byte("processed " + payload)
	unsent := UnsentMessage{
		Topic:   topicY,
		FlowID:  message.FlowID,
		Payload: newPayload,
	}
	return unsent, nil
}



func MainTest() {
	g.SetLevel(g.LevelNone)

	_, file, _, ok := runtime.Caller(0)
	g.TAssertEqualS(ok, true, "can't get filename")

	databasePath := file + ".db"
	queue, err := New(databasePath)
	g.TErrorIf(err)
	defer queue.Close()

	pub := func(payload []byte, flowID guuid.UUID) {
		unsent := UnsentMessage{
			Topic:   topicX,
			FlowID:  flowID,
			Payload: payload,
		}
		_, err := queue.Publish(unsent)
		g.TErrorIf(err)
	}


	g.Testing("we can WaitFor() a message after a deadletter", func() {
		flowID := guuid.New()

		handlerFn := func(message Message) error {
			messageY, err := processNewEventXToY(message)
			if err != nil {
				return err
			}

			_, err = queue.Publish(messageY)
			return err
		}
		queue.Subscribe(topicX, "main-worker", handlerFn)
		defer queue.Unsubscribe(topicX, "main-worker")

		pub([]byte("event 1"), guuid.New())
		pub([]byte("event 2"), guuid.New())
		pub([]byte("event 3"), guuid.New())
		pub([]byte("event 4"), guuid.New())
		pub([]byte("event 5"), flowID)

		given := <- queue.WaitFor(topicY, flowID, "waiter").Channel
		g.TAssertEqual(given, []byte("processed event 5"))
	})
}