diff options
Diffstat (limited to 'tests/functional')
18 files changed, 0 insertions, 287 deletions
diff --git a/tests/functional/consume-one-produce-many/fiinha.go b/tests/functional/consume-one-produce-many/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/consume-one-produce-many/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/consume-one-produce-many/main.go b/tests/functional/consume-one-produce-many/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/consume-one-produce-many/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/consumer-with-deadletter/fiinha.go b/tests/functional/consumer-with-deadletter/fiinha.go deleted file mode 100644 index 292d327..0000000 --- a/tests/functional/consumer-with-deadletter/fiinha.go +++ /dev/null @@ -1,85 +0,0 @@ -package fiinha - -import ( - "errors" - "runtime" - - "uuid" - g "gobang" -) - - - -const ( - topicX = "new-event-x" - topicY = "new-event-y" -) - -var forbidden3Err = errors.New("we don't like 3") - - - -func processNewEventXToY(message Message) (UnsentMessage, error) { - payload := string(message.Payload) - if payload == "event 3" { - return UnsentMessage{}, forbidden3Err - } - - newPayload := []byte("processed " + payload) - unsent := UnsentMessage{ - Topic: topicY, - FlowID: message.FlowID, - Payload: newPayload, - } - return unsent, nil -} - - - -func MainTest() { - g.SetLevel(g.LogLevel_None) - - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - databasePath := file + ".db" - queue, err := New(databasePath) - g.TErrorIf(err) - defer queue.Close() - - pub := func(payload []byte, flowID uuid.UUID) { - unsent := UnsentMessage{ - Topic: topicX, - FlowID: flowID, - Payload: payload, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) - } - - - g.Testing("we can WaitFor() a message after a deadletter", func() { - flowID := uuid.New() - - handlerFn := func(message Message) error { - messageY, err := processNewEventXToY(message) - if err != nil { - return err - } - - _, err = queue.Publish(messageY) - return err - } - queue.Subscribe(topicX, "main-worker", handlerFn) - defer queue.Unsubscribe(topicX, "main-worker") - - pub([]byte("event 1"), uuid.New()) - pub([]byte("event 2"), uuid.New()) - pub([]byte("event 3"), uuid.New()) - pub([]byte("event 4"), uuid.New()) - pub([]byte("event 5"), flowID) - - given := <- queue.WaitFor(topicY, flowID, "waiter").Channel - g.TAssertEqual(given, []byte("processed event 5")) - }) -} diff --git a/tests/functional/consumer-with-deadletter/main.go b/tests/functional/consumer-with-deadletter/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/consumer-with-deadletter/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/custom-prefix/fiinha.go b/tests/functional/custom-prefix/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/custom-prefix/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/custom-prefix/main.go b/tests/functional/custom-prefix/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/custom-prefix/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/distinct-consumers-separate-instances/fiinha.go b/tests/functional/distinct-consumers-separate-instances/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/distinct-consumers-separate-instances/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/distinct-consumers-separate-instances/main.go b/tests/functional/distinct-consumers-separate-instances/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/distinct-consumers-separate-instances/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/flow-id/fiinha.go b/tests/functional/flow-id/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/flow-id/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/flow-id/main.go b/tests/functional/flow-id/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/flow-id/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/idempotency/fiinha.go b/tests/functional/idempotency/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/idempotency/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/idempotency/main.go b/tests/functional/idempotency/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/idempotency/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/new-instance-takeover/fiinha.go b/tests/functional/new-instance-takeover/fiinha.go deleted file mode 100644 index 5e6ad4b..0000000 --- a/tests/functional/new-instance-takeover/fiinha.go +++ /dev/null @@ -1,109 +0,0 @@ -package fiinha - -import ( - "runtime" - "os" - - "uuid" - g "gobang" -) - - - -const topic = "topic" - - - -func pub(queue IQueue, topic string, flowID uuid.UUID) { - unsent := UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: []byte{}, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) -} - -func handlerFn(publish func(uuid.UUID)) func(Message) error { - return func(message Message) error { - publish(message.FlowID) - return nil - } -} - -func startInstance( - dbpath string, - instanceID int, - name string, -) (IQueue, error) { - iqueue, err := New(dbpath) - g.TErrorIf(err) - queue := iqueue.(queueT) - - notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger) - queries, err := initDB(dbpath, defaultPrefix, notifyFn, instanceID) - g.TErrorIf(err) - - err = queue.queries.close() - g.TErrorIf(err) - - queue.queries = queries - - pub_ := func(topic string) func(uuid.UUID) { - return func(flowID uuid.UUID) { - pub(queue, topic, flowID) - } - } - - individual := "individual-" + name - shared := "shared" - - queue.Subscribe(topic, individual, handlerFn(pub_(individual))) - queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name))) - - return queue, nil -} - - - -func MainTest() { - g.Init() - - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - dbpath := file + ".db" - instanceID1 := os.Getpid() - instanceID2 := instanceID1 + 1 - - flowID1 := uuid.New() - flowID2 := uuid.New() - - g.Testing("new instances take ownership of topic+name combo", func() { - q1, err := startInstance(dbpath, instanceID1, "first") - g.TErrorIf(err) - defer q1.Close() - - pub(q1, topic, uuid.New()) - pub(q1, topic, uuid.New()) - pub(q1, topic, flowID1) - - <- q1.WaitFor("individual-first", flowID1, "w").Channel - <- q1.WaitFor( "shared-first", flowID1, "w").Channel - - q2, err := startInstance(dbpath, instanceID2, "second") - g.TErrorIf(err) - defer q2.Close() - - <- q2.WaitFor("individual-second", flowID1, "w").Channel - - pub(q2, topic, uuid.New()) - pub(q2, topic, uuid.New()) - pub(q2, topic, flowID2) - - // FIXME: notify multiple instances so we can add this: - // <- q2.WaitFor("individual-first", flowID2, "w").Channel - <- q2.WaitFor("individual-second", flowID2, "w").Channel - <- q2.WaitFor( "shared-second", flowID2, "w").Channel - }) -} diff --git a/tests/functional/new-instance-takeover/main.go b/tests/functional/new-instance-takeover/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/new-instance-takeover/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/wait-after-publish/fiinha.go b/tests/functional/wait-after-publish/fiinha.go deleted file mode 100644 index 71b9b56..0000000 --- a/tests/functional/wait-after-publish/fiinha.go +++ /dev/null @@ -1,54 +0,0 @@ -package fiinha - -import ( - "runtime" - - "uuid" - g "gobang" -) - - - -const topic = "topic" - - - -func MainTest() { - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - databasePath := file + ".db" - queue, err := New(databasePath) - g.TErrorIf(err) - defer queue.Close() - - pub := func(flowID uuid.UUID, payload []byte) { - unsent := UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) - } - - - g.Testing("we can WaitFor() a message before its publishing", func() { - flowID := uuid.New() - waiter := queue.WaitFor(topic, flowID, "waiter").Channel - - pub(flowID, []byte("payload before")) - - given := <- waiter - g.TAssertEqual(given, []byte("payload before")) - }) - - g.Testing("we can also do it after its publishing", func() { - flowID := uuid.New() - - pub(flowID, []byte("payload after")) - - given := <- queue.WaitFor(topic, flowID, "waiter").Channel - g.TAssertEqual(given, []byte("payload after")) - }) -} diff --git a/tests/functional/wait-after-publish/main.go b/tests/functional/wait-after-publish/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/wait-after-publish/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/waiter/fiinha.go b/tests/functional/waiter/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/waiter/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/waiter/main.go b/tests/functional/waiter/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/waiter/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file |
