summaryrefslogtreecommitdiff
path: root/tests/functional/consumer-with-deadletter/q.go
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/consumer-with-deadletter/q.go')
-rw-r--r--tests/functional/consumer-with-deadletter/q.go97
1 files changed, 97 insertions, 0 deletions
diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go
new file mode 100644
index 0000000..e1462d7
--- /dev/null
+++ b/tests/functional/consumer-with-deadletter/q.go
@@ -0,0 +1,97 @@
+package q
+
+import (
+ "database/sql"
+ "errors"
+ "os"
+ "runtime"
+
+ _ "acudego"
+ g "gobang"
+ "guuid"
+)
+
+
+
+const (
+ topicX = "new-event-x"
+ topicY = "new-event-y"
+)
+
+var forbidden3Err = errors.New("we don't like 3")
+
+
+
+func processNewEventXToY(message Message) (UnsentMessage, error) {
+ payload := string(message.Payload)
+ if payload == "event 3" {
+ return UnsentMessage{}, forbidden3Err
+ }
+
+ newPayload := []byte("processed " + payload)
+ unsent := UnsentMessage{
+ Topic: topicY,
+ FlowID: message.FlowID,
+ Payload: newPayload,
+ }
+ return unsent, nil
+}
+
+
+
+func MainTest() {
+ g.SetLevel(g.LevelNone)
+
+ _, file, _, ok := runtime.Caller(0)
+ g.TAssertEqualS(ok, true, "can't get filename")
+
+ databasePath := file + ".db"
+ os.Remove(databasePath)
+ os.Remove(databasePath + "-shm")
+ os.Remove(databasePath + "-wal")
+
+ db, err := sql.Open("acude", databasePath)
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ defer queue.Close()
+
+
+ pub := func(payload []byte, flowID guuid.UUID) {
+ unsent := UnsentMessage{
+ Topic: topicX,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ _, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+ }
+
+
+ g.Testing("we can WaitFor() a message after a deadletter", func() {
+ flowID := guuid.New()
+
+ handlerFn := func(message Message) error {
+ messageY, err := processNewEventXToY(message)
+ if err != nil {
+ return err
+ }
+
+ _, err = queue.Publish(messageY)
+ return err
+ }
+ queue.Subscribe(topicX, "main-worker", handlerFn)
+ defer queue.Unsubscribe(topicX, "main-worker")
+
+ pub([]byte("event 1"), guuid.New())
+ pub([]byte("event 2"), guuid.New())
+ pub([]byte("event 3"), guuid.New())
+ pub([]byte("event 4"), guuid.New())
+ pub([]byte("event 5"), flowID)
+
+ given := <- queue.WaitFor(topicY, flowID, "waiter").Channel
+ g.TAssertEqual(given, []byte("processed event 5"))
+ })
+}