diff options
Diffstat (limited to 'tests/functional/consumer-with-deadletter/q.go')
-rw-r--r-- | tests/functional/consumer-with-deadletter/q.go | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go new file mode 100644 index 0000000..e1462d7 --- /dev/null +++ b/tests/functional/consumer-with-deadletter/q.go @@ -0,0 +1,97 @@ +package q + +import ( + "database/sql" + "errors" + "os" + "runtime" + + _ "acudego" + g "gobang" + "guuid" +) + + + +const ( + topicX = "new-event-x" + topicY = "new-event-y" +) + +var forbidden3Err = errors.New("we don't like 3") + + + +func processNewEventXToY(message Message) (UnsentMessage, error) { + payload := string(message.Payload) + if payload == "event 3" { + return UnsentMessage{}, forbidden3Err + } + + newPayload := []byte("processed " + payload) + unsent := UnsentMessage{ + Topic: topicY, + FlowID: message.FlowID, + Payload: newPayload, + } + return unsent, nil +} + + + +func MainTest() { + g.SetLevel(g.LevelNone) + + _, file, _, ok := runtime.Caller(0) + g.TAssertEqualS(ok, true, "can't get filename") + + databasePath := file + ".db" + os.Remove(databasePath) + os.Remove(databasePath + "-shm") + os.Remove(databasePath + "-wal") + + db, err := sql.Open("acude", databasePath) + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + defer queue.Close() + + + pub := func(payload []byte, flowID guuid.UUID) { + unsent := UnsentMessage{ + Topic: topicX, + FlowID: flowID, + Payload: payload, + } + _, err := queue.Publish(unsent) + g.TErrorIf(err) + } + + + g.Testing("we can WaitFor() a message after a deadletter", func() { + flowID := guuid.New() + + handlerFn := func(message Message) error { + messageY, err := processNewEventXToY(message) + if err != nil { + return err + } + + _, err = queue.Publish(messageY) + return err + } + queue.Subscribe(topicX, "main-worker", handlerFn) + defer queue.Unsubscribe(topicX, "main-worker") + + pub([]byte("event 1"), guuid.New()) + pub([]byte("event 2"), guuid.New()) + pub([]byte("event 3"), guuid.New()) + pub([]byte("event 4"), guuid.New()) + pub([]byte("event 5"), flowID) + + given := <- queue.WaitFor(topicY, flowID, "waiter").Channel + g.TAssertEqual(given, []byte("processed event 5")) + }) +} |