aboutsummaryrefslogtreecommitdiff
path: root/tests/functional/consumer-with-deadletter/fiinha.go
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/consumer-with-deadletter/fiinha.go')
-rw-r--r--tests/functional/consumer-with-deadletter/fiinha.go85
1 files changed, 0 insertions, 85 deletions
diff --git a/tests/functional/consumer-with-deadletter/fiinha.go b/tests/functional/consumer-with-deadletter/fiinha.go
deleted file mode 100644
index 292d327..0000000
--- a/tests/functional/consumer-with-deadletter/fiinha.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package fiinha
-
-import (
- "errors"
- "runtime"
-
- "uuid"
- g "gobang"
-)
-
-
-
-const (
- topicX = "new-event-x"
- topicY = "new-event-y"
-)
-
-var forbidden3Err = errors.New("we don't like 3")
-
-
-
-func processNewEventXToY(message Message) (UnsentMessage, error) {
- payload := string(message.Payload)
- if payload == "event 3" {
- return UnsentMessage{}, forbidden3Err
- }
-
- newPayload := []byte("processed " + payload)
- unsent := UnsentMessage{
- Topic: topicY,
- FlowID: message.FlowID,
- Payload: newPayload,
- }
- return unsent, nil
-}
-
-
-
-func MainTest() {
- g.SetLevel(g.LogLevel_None)
-
- _, file, _, ok := runtime.Caller(0)
- g.TAssertEqualS(ok, true, "can't get filename")
-
- databasePath := file + ".db"
- queue, err := New(databasePath)
- g.TErrorIf(err)
- defer queue.Close()
-
- pub := func(payload []byte, flowID uuid.UUID) {
- unsent := UnsentMessage{
- Topic: topicX,
- FlowID: flowID,
- Payload: payload,
- }
- _, err := queue.Publish(unsent)
- g.TErrorIf(err)
- }
-
-
- g.Testing("we can WaitFor() a message after a deadletter", func() {
- flowID := uuid.New()
-
- handlerFn := func(message Message) error {
- messageY, err := processNewEventXToY(message)
- if err != nil {
- return err
- }
-
- _, err = queue.Publish(messageY)
- return err
- }
- queue.Subscribe(topicX, "main-worker", handlerFn)
- defer queue.Unsubscribe(topicX, "main-worker")
-
- pub([]byte("event 1"), uuid.New())
- pub([]byte("event 2"), uuid.New())
- pub([]byte("event 3"), uuid.New())
- pub([]byte("event 4"), uuid.New())
- pub([]byte("event 5"), flowID)
-
- given := <- queue.WaitFor(topicY, flowID, "waiter").Channel
- g.TAssertEqual(given, []byte("processed event 5"))
- })
-}