diff options
Diffstat (limited to 'tests/functional/consumer-with-deadletter/fiinha.go')
| -rw-r--r-- | tests/functional/consumer-with-deadletter/fiinha.go | 85 |
1 files changed, 0 insertions, 85 deletions
diff --git a/tests/functional/consumer-with-deadletter/fiinha.go b/tests/functional/consumer-with-deadletter/fiinha.go deleted file mode 100644 index 292d327..0000000 --- a/tests/functional/consumer-with-deadletter/fiinha.go +++ /dev/null @@ -1,85 +0,0 @@ -package fiinha - -import ( - "errors" - "runtime" - - "uuid" - g "gobang" -) - - - -const ( - topicX = "new-event-x" - topicY = "new-event-y" -) - -var forbidden3Err = errors.New("we don't like 3") - - - -func processNewEventXToY(message Message) (UnsentMessage, error) { - payload := string(message.Payload) - if payload == "event 3" { - return UnsentMessage{}, forbidden3Err - } - - newPayload := []byte("processed " + payload) - unsent := UnsentMessage{ - Topic: topicY, - FlowID: message.FlowID, - Payload: newPayload, - } - return unsent, nil -} - - - -func MainTest() { - g.SetLevel(g.LogLevel_None) - - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - databasePath := file + ".db" - queue, err := New(databasePath) - g.TErrorIf(err) - defer queue.Close() - - pub := func(payload []byte, flowID uuid.UUID) { - unsent := UnsentMessage{ - Topic: topicX, - FlowID: flowID, - Payload: payload, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) - } - - - g.Testing("we can WaitFor() a message after a deadletter", func() { - flowID := uuid.New() - - handlerFn := func(message Message) error { - messageY, err := processNewEventXToY(message) - if err != nil { - return err - } - - _, err = queue.Publish(messageY) - return err - } - queue.Subscribe(topicX, "main-worker", handlerFn) - defer queue.Unsubscribe(topicX, "main-worker") - - pub([]byte("event 1"), uuid.New()) - pub([]byte("event 2"), uuid.New()) - pub([]byte("event 3"), uuid.New()) - pub([]byte("event 4"), uuid.New()) - pub([]byte("event 5"), flowID) - - given := <- queue.WaitFor(topicY, flowID, "waiter").Channel - g.TAssertEqual(given, []byte("processed event 5")) - }) -} |
