diff options
Diffstat (limited to 'tests/functional')
l--------- | tests/functional/consume-one-produce-many/main.go | 1 | ||||
-rw-r--r-- | tests/functional/consume-one-produce-many/q.go | 5 | ||||
l--------- | tests/functional/consumer-with-deadletter/main.go | 1 | ||||
-rw-r--r-- | tests/functional/consumer-with-deadletter/q.go | 97 | ||||
l--------- | tests/functional/custom-prefix/main.go | 1 | ||||
-rw-r--r-- | tests/functional/custom-prefix/q.go | 5 | ||||
l--------- | tests/functional/distinct-consumers-separate-instances/main.go | 1 | ||||
-rw-r--r-- | tests/functional/distinct-consumers-separate-instances/q.go | 5 | ||||
l--------- | tests/functional/flow-id/main.go | 1 | ||||
-rw-r--r-- | tests/functional/flow-id/q.go | 5 | ||||
l--------- | tests/functional/idempotency/main.go | 1 | ||||
-rw-r--r-- | tests/functional/idempotency/q.go | 5 | ||||
l--------- | tests/functional/new-instance-takeover/main.go | 1 | ||||
-rw-r--r-- | tests/functional/new-instance-takeover/q.go | 127 | ||||
l--------- | tests/functional/wait-after-publish/main.go | 1 | ||||
-rw-r--r-- | tests/functional/wait-after-publish/q.go | 64 | ||||
l--------- | tests/functional/waiter/main.go | 1 | ||||
-rw-r--r-- | tests/functional/waiter/q.go | 5 |
18 files changed, 327 insertions, 0 deletions
diff --git a/tests/functional/consume-one-produce-many/main.go b/tests/functional/consume-one-produce-many/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/consume-one-produce-many/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/consume-one-produce-many/q.go b/tests/functional/consume-one-produce-many/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/consume-one-produce-many/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/consumer-with-deadletter/main.go b/tests/functional/consumer-with-deadletter/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/consumer-with-deadletter/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go new file mode 100644 index 0000000..e1462d7 --- /dev/null +++ b/tests/functional/consumer-with-deadletter/q.go @@ -0,0 +1,97 @@ +package q + +import ( + "database/sql" + "errors" + "os" + "runtime" + + _ "acudego" + g "gobang" + "guuid" +) + + + +const ( + topicX = "new-event-x" + topicY = "new-event-y" +) + +var forbidden3Err = errors.New("we don't like 3") + + + +func processNewEventXToY(message Message) (UnsentMessage, error) { + payload := string(message.Payload) + if payload == "event 3" { + return UnsentMessage{}, forbidden3Err + } + + newPayload := []byte("processed " + payload) + unsent := UnsentMessage{ + Topic: topicY, + FlowID: message.FlowID, + Payload: newPayload, + } + return unsent, nil +} + + + +func MainTest() { + g.SetLevel(g.LevelNone) + + _, file, _, ok := runtime.Caller(0) + g.TAssertEqualS(ok, true, "can't get filename") + + databasePath := file + ".db" + os.Remove(databasePath) + os.Remove(databasePath + "-shm") + os.Remove(databasePath + "-wal") + + db, err := sql.Open("acude", databasePath) + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + defer queue.Close() + + + pub := func(payload []byte, flowID guuid.UUID) { + unsent := UnsentMessage{ + Topic: topicX, + FlowID: flowID, + Payload: payload, + } + _, err := queue.Publish(unsent) + g.TErrorIf(err) + } + + + g.Testing("we can WaitFor() a message after a deadletter", func() { + flowID := guuid.New() + + handlerFn := func(message Message) error { + messageY, err := processNewEventXToY(message) + if err != nil { + return err + } + + _, err = queue.Publish(messageY) + return err + } + queue.Subscribe(topicX, "main-worker", handlerFn) + defer queue.Unsubscribe(topicX, "main-worker") + + pub([]byte("event 1"), guuid.New()) + pub([]byte("event 2"), guuid.New()) + pub([]byte("event 3"), guuid.New()) + pub([]byte("event 4"), guuid.New()) + pub([]byte("event 5"), flowID) + + given := <- queue.WaitFor(topicY, flowID, "waiter").Channel + g.TAssertEqual(given, []byte("processed event 5")) + }) +} diff --git a/tests/functional/custom-prefix/main.go b/tests/functional/custom-prefix/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/custom-prefix/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/custom-prefix/q.go b/tests/functional/custom-prefix/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/custom-prefix/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/distinct-consumers-separate-instances/main.go b/tests/functional/distinct-consumers-separate-instances/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/distinct-consumers-separate-instances/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/distinct-consumers-separate-instances/q.go b/tests/functional/distinct-consumers-separate-instances/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/distinct-consumers-separate-instances/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/flow-id/main.go b/tests/functional/flow-id/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/flow-id/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/flow-id/q.go b/tests/functional/flow-id/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/flow-id/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/idempotency/main.go b/tests/functional/idempotency/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/idempotency/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/idempotency/q.go b/tests/functional/idempotency/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/idempotency/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} diff --git a/tests/functional/new-instance-takeover/main.go b/tests/functional/new-instance-takeover/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/new-instance-takeover/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go new file mode 100644 index 0000000..a678415 --- /dev/null +++ b/tests/functional/new-instance-takeover/q.go @@ -0,0 +1,127 @@ +package q + +import ( +"fmt" + "database/sql" + "runtime" + "os" + + g "gobang" + "guuid" +) + + + +const topic = "topic" + + + +func pub(queue IQueue, topic string, flowID guuid.UUID) { + unsent := UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: []byte{}, + } + _, err := queue.Publish(unsent) + g.TErrorIf(err) +} + +func handlerFn(publish func(guuid.UUID)) func(Message) error { + return func(message Message) error { + publish(message.FlowID) + return nil + } +} + +func startInstance( + databasePath string, + instanceID int, + name string, +) (*sql.DB, IQueue, error) { + db, err := sql.Open("acude", databasePath) + g.TErrorIf(err) + + iqueue, err := New(db) + g.TErrorIf(err) + queue := iqueue.(queueT) + + notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger) + queries, err := initDB(db, defaultPrefix, notifyFn, instanceID) + g.TErrorIf(err) + + err = queue.queries.close() + g.TErrorIf(err) + + queue.queries = queries + + pub_ := func(topic string) func(guuid.UUID) { + return func(flowID guuid.UUID) { + pub(queue, topic, flowID) + } + } + + individual := "individual-" + name + shared := "shared" + + queue.Subscribe(topic, individual, handlerFn(pub_(individual))) + queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name))) + + return db, queue, nil +} + + + +func MainTest() { + // https://sqlite.org/forum/forumpost/2507664507 + g.Init() + + _, file, _, ok := runtime.Caller(0) + g.TAssertEqualS(ok, true, "can't get filename") + + dbpath := file + ".db" + dbpath = "/mnt/dois/andreh/t.db" + os.Remove(dbpath) + os.Remove(dbpath + "-shm") + os.Remove(dbpath + "-wal") + + instanceID1 := os.Getpid() + instanceID2 := instanceID1 + 1 + + flowID1 := guuid.New() + flowID2 := guuid.New() + + g.Testing("new instances take ownership of topic+name combo", func() { + if false { + fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1) + } + + db, q1, err := startInstance(dbpath, instanceID1, "first") + g.TErrorIf(err) + defer db.Close() + defer q1.Close() + + pub(q1, topic, guuid.New()) + pub(q1, topic, guuid.New()) + pub(q1, topic, flowID1) + + <- q1.WaitFor("individual-first", flowID1, "w").Channel + <- q1.WaitFor( "shared-first", flowID1, "w").Channel + // println("waited 1") + + db, q2, err := startInstance(dbpath, instanceID2, "second") + g.TErrorIf(err) + defer db.Close() + defer q2.Close() + + <- q2.WaitFor("individual-second", flowID1, "w").Channel + + pub(q2, topic, guuid.New()) + pub(q2, topic, guuid.New()) + pub(q2, topic, flowID2) + + // FIXME: notify multiple instances so we can add this: + // <- q2.WaitFor("individual-first", flowID2, "w").Channel + <- q2.WaitFor("individual-second", flowID2, "w").Channel + <- q2.WaitFor( "shared-second", flowID2, "w").Channel + }) +} diff --git a/tests/functional/wait-after-publish/main.go b/tests/functional/wait-after-publish/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/wait-after-publish/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go new file mode 100644 index 0000000..701258a --- /dev/null +++ b/tests/functional/wait-after-publish/q.go @@ -0,0 +1,64 @@ +package q + +import ( + "database/sql" + "os" + "runtime" + + g "gobang" + "guuid" +) + + + +const topic = "topic" + + + +func MainTest() { + _, file, _, ok := runtime.Caller(0) + g.TAssertEqualS(ok, true, "can't get filename") + + databasePath := file + ".db" + os.Remove(databasePath) + os.Remove(databasePath + "-shm") + os.Remove(databasePath + "-wal") + + db, err := sql.Open("acude", databasePath) + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + defer queue.Close() + + pub := func(flowID guuid.UUID, payload []byte) { + unsent := UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + _, err := queue.Publish(unsent) + g.TErrorIf(err) + } + + + g.Testing("we can WaitFor() a message before its publishing", func() { + flowID := guuid.New() + waiter := queue.WaitFor(topic, flowID, "waiter").Channel + + pub(flowID, []byte("payload before")) + + given := <- waiter + g.TAssertEqual(given, []byte("payload before")) + }) + + g.Testing("we can also do it after its publishing", func() { + flowID := guuid.New() + + pub(flowID, []byte("payload after")) + + given := <- queue.WaitFor(topic, flowID, "waiter").Channel + g.TAssertEqual(given, []byte("payload after")) + }) +} diff --git a/tests/functional/waiter/main.go b/tests/functional/waiter/main.go new file mode 120000 index 0000000..f67563d --- /dev/null +++ b/tests/functional/waiter/main.go @@ -0,0 +1 @@ +../../main.go
\ No newline at end of file diff --git a/tests/functional/waiter/q.go b/tests/functional/waiter/q.go new file mode 100644 index 0000000..48d66d3 --- /dev/null +++ b/tests/functional/waiter/q.go @@ -0,0 +1,5 @@ +package q + +func MainTest() { + // FIXME +} |