summaryrefslogtreecommitdiff
path: root/tests/functional
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional')
l---------tests/functional/consume-one-produce-many/main.go1
-rw-r--r--tests/functional/consume-one-produce-many/q.go5
l---------tests/functional/consumer-with-deadletter/main.go1
-rw-r--r--tests/functional/consumer-with-deadletter/q.go97
l---------tests/functional/custom-prefix/main.go1
-rw-r--r--tests/functional/custom-prefix/q.go5
l---------tests/functional/distinct-consumers-separate-instances/main.go1
-rw-r--r--tests/functional/distinct-consumers-separate-instances/q.go5
l---------tests/functional/flow-id/main.go1
-rw-r--r--tests/functional/flow-id/q.go5
l---------tests/functional/idempotency/main.go1
-rw-r--r--tests/functional/idempotency/q.go5
l---------tests/functional/new-instance-takeover/main.go1
-rw-r--r--tests/functional/new-instance-takeover/q.go127
l---------tests/functional/wait-after-publish/main.go1
-rw-r--r--tests/functional/wait-after-publish/q.go64
l---------tests/functional/waiter/main.go1
-rw-r--r--tests/functional/waiter/q.go5
18 files changed, 327 insertions, 0 deletions
diff --git a/tests/functional/consume-one-produce-many/main.go b/tests/functional/consume-one-produce-many/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/consume-one-produce-many/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/consume-one-produce-many/q.go b/tests/functional/consume-one-produce-many/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/consume-one-produce-many/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/consumer-with-deadletter/main.go b/tests/functional/consumer-with-deadletter/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/consumer-with-deadletter/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go
new file mode 100644
index 0000000..e1462d7
--- /dev/null
+++ b/tests/functional/consumer-with-deadletter/q.go
@@ -0,0 +1,97 @@
+package q
+
+import (
+ "database/sql"
+ "errors"
+ "os"
+ "runtime"
+
+ _ "acudego"
+ g "gobang"
+ "guuid"
+)
+
+
+
+const (
+ topicX = "new-event-x"
+ topicY = "new-event-y"
+)
+
+var forbidden3Err = errors.New("we don't like 3")
+
+
+
+func processNewEventXToY(message Message) (UnsentMessage, error) {
+ payload := string(message.Payload)
+ if payload == "event 3" {
+ return UnsentMessage{}, forbidden3Err
+ }
+
+ newPayload := []byte("processed " + payload)
+ unsent := UnsentMessage{
+ Topic: topicY,
+ FlowID: message.FlowID,
+ Payload: newPayload,
+ }
+ return unsent, nil
+}
+
+
+
+func MainTest() {
+ g.SetLevel(g.LevelNone)
+
+ _, file, _, ok := runtime.Caller(0)
+ g.TAssertEqualS(ok, true, "can't get filename")
+
+ databasePath := file + ".db"
+ os.Remove(databasePath)
+ os.Remove(databasePath + "-shm")
+ os.Remove(databasePath + "-wal")
+
+ db, err := sql.Open("acude", databasePath)
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ defer queue.Close()
+
+
+ pub := func(payload []byte, flowID guuid.UUID) {
+ unsent := UnsentMessage{
+ Topic: topicX,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ _, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+ }
+
+
+ g.Testing("we can WaitFor() a message after a deadletter", func() {
+ flowID := guuid.New()
+
+ handlerFn := func(message Message) error {
+ messageY, err := processNewEventXToY(message)
+ if err != nil {
+ return err
+ }
+
+ _, err = queue.Publish(messageY)
+ return err
+ }
+ queue.Subscribe(topicX, "main-worker", handlerFn)
+ defer queue.Unsubscribe(topicX, "main-worker")
+
+ pub([]byte("event 1"), guuid.New())
+ pub([]byte("event 2"), guuid.New())
+ pub([]byte("event 3"), guuid.New())
+ pub([]byte("event 4"), guuid.New())
+ pub([]byte("event 5"), flowID)
+
+ given := <- queue.WaitFor(topicY, flowID, "waiter").Channel
+ g.TAssertEqual(given, []byte("processed event 5"))
+ })
+}
diff --git a/tests/functional/custom-prefix/main.go b/tests/functional/custom-prefix/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/custom-prefix/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/custom-prefix/q.go b/tests/functional/custom-prefix/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/custom-prefix/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/distinct-consumers-separate-instances/main.go b/tests/functional/distinct-consumers-separate-instances/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/distinct-consumers-separate-instances/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/distinct-consumers-separate-instances/q.go b/tests/functional/distinct-consumers-separate-instances/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/distinct-consumers-separate-instances/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/flow-id/main.go b/tests/functional/flow-id/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/flow-id/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/flow-id/q.go b/tests/functional/flow-id/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/flow-id/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/idempotency/main.go b/tests/functional/idempotency/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/idempotency/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/idempotency/q.go b/tests/functional/idempotency/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/idempotency/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}
diff --git a/tests/functional/new-instance-takeover/main.go b/tests/functional/new-instance-takeover/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/new-instance-takeover/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go
new file mode 100644
index 0000000..a678415
--- /dev/null
+++ b/tests/functional/new-instance-takeover/q.go
@@ -0,0 +1,127 @@
+package q
+
+import (
+"fmt"
+ "database/sql"
+ "runtime"
+ "os"
+
+ g "gobang"
+ "guuid"
+)
+
+
+
+const topic = "topic"
+
+
+
+func pub(queue IQueue, topic string, flowID guuid.UUID) {
+ unsent := UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: []byte{},
+ }
+ _, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+}
+
+func handlerFn(publish func(guuid.UUID)) func(Message) error {
+ return func(message Message) error {
+ publish(message.FlowID)
+ return nil
+ }
+}
+
+func startInstance(
+ databasePath string,
+ instanceID int,
+ name string,
+) (*sql.DB, IQueue, error) {
+ db, err := sql.Open("acude", databasePath)
+ g.TErrorIf(err)
+
+ iqueue, err := New(db)
+ g.TErrorIf(err)
+ queue := iqueue.(queueT)
+
+ notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
+ queries, err := initDB(db, defaultPrefix, notifyFn, instanceID)
+ g.TErrorIf(err)
+
+ err = queue.queries.close()
+ g.TErrorIf(err)
+
+ queue.queries = queries
+
+ pub_ := func(topic string) func(guuid.UUID) {
+ return func(flowID guuid.UUID) {
+ pub(queue, topic, flowID)
+ }
+ }
+
+ individual := "individual-" + name
+ shared := "shared"
+
+ queue.Subscribe(topic, individual, handlerFn(pub_(individual)))
+ queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name)))
+
+ return db, queue, nil
+}
+
+
+
+func MainTest() {
+ // https://sqlite.org/forum/forumpost/2507664507
+ g.Init()
+
+ _, file, _, ok := runtime.Caller(0)
+ g.TAssertEqualS(ok, true, "can't get filename")
+
+ dbpath := file + ".db"
+ dbpath = "/mnt/dois/andreh/t.db"
+ os.Remove(dbpath)
+ os.Remove(dbpath + "-shm")
+ os.Remove(dbpath + "-wal")
+
+ instanceID1 := os.Getpid()
+ instanceID2 := instanceID1 + 1
+
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+
+ g.Testing("new instances take ownership of topic+name combo", func() {
+ if false {
+ fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1)
+ }
+
+ db, q1, err := startInstance(dbpath, instanceID1, "first")
+ g.TErrorIf(err)
+ defer db.Close()
+ defer q1.Close()
+
+ pub(q1, topic, guuid.New())
+ pub(q1, topic, guuid.New())
+ pub(q1, topic, flowID1)
+
+ <- q1.WaitFor("individual-first", flowID1, "w").Channel
+ <- q1.WaitFor( "shared-first", flowID1, "w").Channel
+ // println("waited 1")
+
+ db, q2, err := startInstance(dbpath, instanceID2, "second")
+ g.TErrorIf(err)
+ defer db.Close()
+ defer q2.Close()
+
+ <- q2.WaitFor("individual-second", flowID1, "w").Channel
+
+ pub(q2, topic, guuid.New())
+ pub(q2, topic, guuid.New())
+ pub(q2, topic, flowID2)
+
+ // FIXME: notify multiple instances so we can add this:
+ // <- q2.WaitFor("individual-first", flowID2, "w").Channel
+ <- q2.WaitFor("individual-second", flowID2, "w").Channel
+ <- q2.WaitFor( "shared-second", flowID2, "w").Channel
+ })
+}
diff --git a/tests/functional/wait-after-publish/main.go b/tests/functional/wait-after-publish/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/wait-after-publish/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go
new file mode 100644
index 0000000..701258a
--- /dev/null
+++ b/tests/functional/wait-after-publish/q.go
@@ -0,0 +1,64 @@
+package q
+
+import (
+ "database/sql"
+ "os"
+ "runtime"
+
+ g "gobang"
+ "guuid"
+)
+
+
+
+const topic = "topic"
+
+
+
+func MainTest() {
+ _, file, _, ok := runtime.Caller(0)
+ g.TAssertEqualS(ok, true, "can't get filename")
+
+ databasePath := file + ".db"
+ os.Remove(databasePath)
+ os.Remove(databasePath + "-shm")
+ os.Remove(databasePath + "-wal")
+
+ db, err := sql.Open("acude", databasePath)
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ defer queue.Close()
+
+ pub := func(flowID guuid.UUID, payload []byte) {
+ unsent := UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ _, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+ }
+
+
+ g.Testing("we can WaitFor() a message before its publishing", func() {
+ flowID := guuid.New()
+ waiter := queue.WaitFor(topic, flowID, "waiter").Channel
+
+ pub(flowID, []byte("payload before"))
+
+ given := <- waiter
+ g.TAssertEqual(given, []byte("payload before"))
+ })
+
+ g.Testing("we can also do it after its publishing", func() {
+ flowID := guuid.New()
+
+ pub(flowID, []byte("payload after"))
+
+ given := <- queue.WaitFor(topic, flowID, "waiter").Channel
+ g.TAssertEqual(given, []byte("payload after"))
+ })
+}
diff --git a/tests/functional/waiter/main.go b/tests/functional/waiter/main.go
new file mode 120000
index 0000000..f67563d
--- /dev/null
+++ b/tests/functional/waiter/main.go
@@ -0,0 +1 @@
+../../main.go \ No newline at end of file
diff --git a/tests/functional/waiter/q.go b/tests/functional/waiter/q.go
new file mode 100644
index 0000000..48d66d3
--- /dev/null
+++ b/tests/functional/waiter/q.go
@@ -0,0 +1,5 @@
+package q
+
+func MainTest() {
+ // FIXME
+}