aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/benchmarks/deadletters/fiinha.go24
l---------tests/benchmarks/deadletters/main.go1
-rw-r--r--tests/benchmarks/lookup/fiinha.go24
l---------tests/benchmarks/lookup/main.go1
-rw-r--r--tests/benchmarks/multiple-consumers/fiinha.go24
l---------tests/benchmarks/multiple-consumers/main.go1
-rw-r--r--tests/benchmarks/multiple-produces/fiinha.go24
l---------tests/benchmarks/multiple-produces/main.go1
-rw-r--r--tests/benchmarks/reaper/fiinha.go24
l---------tests/benchmarks/reaper/main.go1
-rw-r--r--tests/benchmarks/replay/fiinha.go24
l---------tests/benchmarks/replay/main.go1
-rw-r--r--tests/benchmarks/single-consumer/fiinha.go24
l---------tests/benchmarks/single-consumer/main.go1
-rw-r--r--tests/benchmarks/single-producer/fiinha.go24
l---------tests/benchmarks/single-producer/main.go1
-rw-r--r--tests/benchmarks/subscribe/fiinha.go24
l---------tests/benchmarks/subscribe/main.go1
-rw-r--r--tests/benchmarks/unsubscribe/fiinha.go24
l---------tests/benchmarks/unsubscribe/main.go1
-rw-r--r--tests/benchmarks/waiter/fiinha.go24
l---------tests/benchmarks/waiter/main.go1
-rwxr-xr-xtests/cli-opts.sh4
-rw-r--r--tests/fiinha.go5889
-rw-r--r--tests/functional/consume-one-produce-many/fiinha.go5
l---------tests/functional/consume-one-produce-many/main.go1
-rw-r--r--tests/functional/consumer-with-deadletter/fiinha.go85
l---------tests/functional/consumer-with-deadletter/main.go1
-rw-r--r--tests/functional/custom-prefix/fiinha.go5
l---------tests/functional/custom-prefix/main.go1
-rw-r--r--tests/functional/distinct-consumers-separate-instances/fiinha.go5
l---------tests/functional/distinct-consumers-separate-instances/main.go1
-rw-r--r--tests/functional/flow-id/fiinha.go5
l---------tests/functional/flow-id/main.go1
-rw-r--r--tests/functional/idempotency/fiinha.go5
l---------tests/functional/idempotency/main.go1
-rw-r--r--tests/functional/new-instance-takeover/fiinha.go109
l---------tests/functional/new-instance-takeover/main.go1
-rw-r--r--tests/functional/wait-after-publish/fiinha.go54
l---------tests/functional/wait-after-publish/main.go1
-rw-r--r--tests/functional/waiter/fiinha.go5
l---------tests/functional/waiter/main.go1
-rw-r--r--tests/fuzz/api-check/fiinha.go35
l---------tests/fuzz/api-check/main.go1
-rw-r--r--tests/fuzz/cli-check/fiinha.go35
l---------tests/fuzz/cli-check/main.go1
-rw-r--r--tests/fuzz/equal-produced-consumed-order-check/fiinha.go35
l---------tests/fuzz/equal-produced-consumed-order-check/main.go1
-rw-r--r--tests/fuzz/exactly-once-check/fiinha.go35
l---------tests/fuzz/exactly-once-check/main.go1
-rw-r--r--tests/fuzz/queries-check/fiinha.go35
l---------tests/fuzz/queries-check/main.go1
-rw-r--r--tests/fuzz/total-order-check/fiinha.go35
l---------tests/fuzz/total-order-check/main.go1
-rwxr-xr-xtests/integration.sh4
-rw-r--r--tests/main.go7
-rw-r--r--tests/queries.sql387
57 files changed, 0 insertions, 7069 deletions
diff --git a/tests/benchmarks/deadletters/fiinha.go b/tests/benchmarks/deadletters/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/deadletters/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/deadletters/main.go b/tests/benchmarks/deadletters/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/deadletters/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/lookup/fiinha.go b/tests/benchmarks/lookup/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/lookup/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/lookup/main.go b/tests/benchmarks/lookup/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/lookup/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/multiple-consumers/fiinha.go b/tests/benchmarks/multiple-consumers/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/multiple-consumers/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/multiple-consumers/main.go b/tests/benchmarks/multiple-consumers/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/multiple-consumers/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/multiple-produces/fiinha.go b/tests/benchmarks/multiple-produces/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/multiple-produces/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/multiple-produces/main.go b/tests/benchmarks/multiple-produces/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/multiple-produces/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/reaper/fiinha.go b/tests/benchmarks/reaper/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/reaper/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/reaper/main.go b/tests/benchmarks/reaper/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/reaper/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/replay/fiinha.go b/tests/benchmarks/replay/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/replay/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/replay/main.go b/tests/benchmarks/replay/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/replay/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/single-consumer/fiinha.go b/tests/benchmarks/single-consumer/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/single-consumer/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/single-consumer/main.go b/tests/benchmarks/single-consumer/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/single-consumer/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/single-producer/fiinha.go b/tests/benchmarks/single-producer/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/single-producer/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/single-producer/main.go b/tests/benchmarks/single-producer/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/single-producer/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/subscribe/fiinha.go b/tests/benchmarks/subscribe/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/subscribe/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/subscribe/main.go b/tests/benchmarks/subscribe/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/subscribe/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/unsubscribe/fiinha.go b/tests/benchmarks/unsubscribe/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/unsubscribe/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/unsubscribe/main.go b/tests/benchmarks/unsubscribe/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/unsubscribe/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/benchmarks/waiter/fiinha.go b/tests/benchmarks/waiter/fiinha.go
deleted file mode 100644
index 9c0b641..0000000
--- a/tests/benchmarks/waiter/fiinha.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package fiinha
-
-import (
- "flag"
- "time"
-)
-
-
-
-var nFlag = flag.Int(
- "n",
- 1_000,
- "The number of iterations to execute",
-)
-
-func MainTest() {
- // FIXME
- flag.Parse()
- n := *nFlag
-
- for i := 0; i < n; i++ {
- time.Sleep(time.Millisecond * 1)
- }
-}
diff --git a/tests/benchmarks/waiter/main.go b/tests/benchmarks/waiter/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/benchmarks/waiter/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/cli-opts.sh b/tests/cli-opts.sh
deleted file mode 100755
index fcb62ca..0000000
--- a/tests/cli-opts.sh
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/sh
-set -eu
-
-exit
diff --git a/tests/fiinha.go b/tests/fiinha.go
deleted file mode 100644
index 0901190..0000000
--- a/tests/fiinha.go
+++ /dev/null
@@ -1,5889 +0,0 @@
-package fiinha
-
-import (
- "bytes"
- "database/sql"
- "errors"
- "fmt"
- "io"
- "log/slog"
- "os"
- "reflect"
- "sort"
- "strings"
- "sync"
- "time"
-
- "golite"
- "uuid"
- g "gobang"
-)
-
-
-
-var instanceID = os.Getpid()
-
-
-
-func test_defaultPrefix() {
- g.TestStart("defaultPrefix")
-
- g.Testing("the defaultPrefix is valid", func() {
- g.TErrorIf(g.ValidateSQLTablePrefix(defaultPrefix))
- })
-}
-
-func test_tryRollback() {
- g.TestStart("tryRollback()")
-
- myErr := errors.New("bottom error")
-
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
-
-
- g.Testing("the error is propagated if rollback doesn't fail", func() {
- tx, err := db.Begin()
- g.TErrorIf(err)
-
- err = tryRollback(tx, myErr)
- g.TAssertEqual(err, myErr)
- })
-
- g.Testing("a wrapped error when rollback fails", func() {
- tx, err := db.Begin()
- g.TErrorIf(err)
-
- err = tx.Commit()
- g.TErrorIf(err)
-
- err = tryRollback(tx, myErr)
- g.TAssertEqual(reflect.DeepEqual(err, myErr), false)
- g.TAssertEqual(errors.Is(err, myErr), true)
- })
-}
-
-func test_inTx() {
- g.TestStart("inTx()")
-
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
-
-
- g.Testing("when fn() errors, we propagate it", func() {
- myErr := errors.New("to be propagated")
- err := inTx(db, func(tx *sql.Tx) error {
- return myErr
- })
- g.TAssertEqual(err, myErr)
- })
-
- g.Testing("on nil error we get nil", func() {
- err := inTx(db, func(tx *sql.Tx) error {
- return nil
- })
- g.TErrorIf(err)
- })
-}
-
-func test_serialized() {
- // FIXME
-}
-
-func test_execSerialized() {
- // FIXME
-}
-
-func test_createTables() {
- g.TestStart("createTables()")
-
- const (
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- defer db.Close()
-
-
- g.Testing("tables exist afterwards", func() {
- const tmpl_read = `
- SELECT id FROM "%s_messages" LIMIT 1;
- `
- qRead := fmt.Sprintf(tmpl_read, prefix)
-
- _, err := db.Exec(qRead)
- g.TErrorNil(err)
-
- err = createTables(db, prefix)
- g.TErrorIf(err)
-
- _, err = db.Exec(qRead)
- g.TErrorIf(err)
- })
-
- g.Testing("we can do it multiple times", func() {
- g.TErrorIf(g.SomeError(
- createTables(db, prefix),
- createTables(db, prefix),
- createTables(db, prefix),
- ))
- })
-}
-
-func test_takeStmt() {
- g.TestStart("takeStmt()")
-
- const (
- topic = "take() topic"
- consumer = "take() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- g.TErrorIf(takeErr)
- defer g.SomeFnError(
- takeClose,
- db.Close,
- )
-
- const tmpl = `
- SELECT owner_id from "%s_owners"
- WHERE
- topic = ? AND
- consumer = ?;
- `
- sqlOwner := fmt.Sprintf(tmpl, prefix)
-
-
- g.Testing("when there is no owner, we become it", func() {
- var ownerID int
- err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
- g.TAssertEqual(err, sql.ErrNoRows)
-
- err = take(topic, consumer)
- g.TErrorIf(err)
-
- err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
- g.TErrorIf(err)
- g.TAssertEqual(ownerID, instanceID)
- })
-
- g.Testing("if there is already an owner, we overtake it", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- var ownerID int
- err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
- g.TErrorIf(err)
- g.TAssertEqual(ownerID, instanceID)
-
- err = take(topic, consumer)
- g.TErrorIf(err)
-
- err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
- g.TErrorIf(err)
- g.TAssertEqual(ownerID, otherCfg.instanceID)
- })
- g.Testing("no error if closed more than once", func() {
- g.TErrorIf(g.SomeError(
- takeClose(),
- takeClose(),
- takeClose(),
- ))
- })
-}
-
-func test_publishStmt() {
- g.TestStart("publishStmt()")
-
- const (
- topic = "publish() topic"
- payloadStr = "publish() payload"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- publish, publishClose, publishErr := publishStmt(cfg)
- g.TErrorIf(publishErr)
- defer g.SomeFnError(
- publishClose,
- db.Close,
- )
-
-
- g.Testing("we can publish a message", func() {
- messageID := uuid.New()
- message, err := publish(unsent, messageID)
- g.TErrorIf(err)
-
- g.TAssertEqual(message.id, int64(1))
- g.TAssertEqual(message.uuid, messageID)
- g.TAssertEqual(message.topic, topic)
- g.TAssertEqual(message.flowID, flowID)
- g.TAssertEqual(message.payload, payload)
- })
-
- g.Testing("we can publish the same message repeatedly", func() {
- messageID1 := uuid.New()
- messageID2 := uuid.New()
- message1, err1 := publish(unsent, messageID1)
- message2, err2 := publish(unsent, messageID2)
- g.TErrorIf(g.SomeError(err1, err2))
-
- g.TAssertEqual(message1.id, message2.id - 1)
- g.TAssertEqual(message1.topic, message2.topic)
- g.TAssertEqual(message1.flowID, message2.flowID)
- g.TAssertEqual(message1.payload, message2.payload)
-
- g.TAssertEqual(message1.uuid, messageID1)
- g.TAssertEqual(message2.uuid, messageID2)
- })
-
- g.Testing("publishing a message with the same UUID errors", func() {
- messageID := uuid.New()
- message1, err1 := publish(unsent, messageID)
- _, err2 := publish(unsent, messageID)
- g.TErrorIf(err1)
-
- g.TAssertEqual(message1.uuid, messageID)
- g.TAssertEqual(message1.topic, topic)
- g.TAssertEqual(message1.flowID, flowID)
- g.TAssertEqual(message1.payload, payload)
-
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- publishClose(),
- publishClose(),
- publishClose(),
- ))
- })
-}
-
-func test_findStmt() {
- g.TestStart("findStmt()")
-
- const (
- topic = "find() topic"
- payloadStr = "find() payload"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- publish, publishClose, publishErr := publishStmt(cfg)
- find, findClose, findErr := findStmt(cfg)
- g.TErrorIf(g.SomeError(
- publishErr,
- findErr,
- ))
- defer g.SomeFnError(
- publishClose,
- findClose,
- db.Close,
- )
-
- pub := func(flowID uuid.UUID) uuid.UUID {
- unsentWithFlowID := unsent
- unsentWithFlowID.FlowID = flowID
- messageID := uuid.New()
- _, err := publish(unsentWithFlowID, messageID)
- g.TErrorIf(err)
- return messageID
- }
-
-
- g.Testing("we can find a message by topic and flowID", func() {
- flowID := uuid.New()
- messageID := pub(flowID)
- message, err := find(topic, flowID)
- g.TErrorIf(err)
-
- g.TAssertEqual(message.uuid, messageID)
- g.TAssertEqual(message.topic, topic)
- g.TAssertEqual(message.flowID, flowID)
- g.TAssertEqual(message.payload, payload)
- })
-
- g.Testing("a non-existent message gives us an error", func() {
- message, err := find(topic, uuid.New())
- g.TAssertEqual(message, messageT{})
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("findig twice yields the exact same message", func() {
- flowID := uuid.New()
- messageID := pub(flowID)
- message1, err1 := find(topic, flowID)
- message2, err2 := find(topic, flowID)
- g.TErrorIf(g.SomeError(err1, err2))
-
- g.TAssertEqual(message1.uuid, messageID)
- g.TAssertEqual(message1, message2)
- })
-
- g.Testing("returns the latest entry if multiple are available", func() {
- flowID := uuid.New()
-
- _ , err0 := find(topic, flowID)
- pub(flowID)
- message1, err1 := find(topic, flowID)
- pub(flowID)
- message2, err2 := find(topic, flowID)
-
- g.TAssertEqual(err0, sql.ErrNoRows)
- g.TErrorIf(g.SomeError(err1, err2))
- g.TAssertEqual(message1.uuid == message2.uuid, false)
- g.TAssertEqual(message1.id < message2.id, true)
- })
-
- g.Testing("no error if closed more than once", func() {
- g.TErrorIf(g.SomeError(
- findClose(),
- findClose(),
- findClose(),
- ))
- })
-}
-
-func test_nextStmt() {
- g.TestStart("nextStmt()")
-
- const (
- topic = "next() topic"
- payloadStr = "next() payload"
- consumer = "next() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- next, nextClose, nextErr := nextStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- g.TErrorIf(takeErr)
- g.TErrorIf(publishErr)
- g.TErrorIf(nextErr)
- g.TErrorIf(commitErr)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- nextErr,
- commitErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- nextClose,
- commitClose,
- db.Close,
- )
-
- pub := func(topic string) messageT {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message
- }
-
-
- g.Testing("we get an error on empty topic", func() {
- _, err := next(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("we don't get messages from other topics", func() {
- pub("other topic")
- _, err := next(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("we can get the next message", func() {
- expectedMessage := pub(topic)
- pub(topic)
- pub(topic)
- message, err := next(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(message, expectedMessage)
- })
-
- g.Testing("we keep getting the next until we commit", func() {
- message1, err1 := next(topic, consumer)
- message2, err2 := next(topic, consumer)
- g.TErrorIf(commit(consumer, message1.uuid))
- message3, err3 := next(topic, consumer)
- g.TErrorIf(g.SomeError(err1, err2, err3))
-
- g.TAssertEqual(message1, message2)
- g.TAssertEqual(message2.uuid != message3.uuid, true)
- })
-
- g.Testing("each consumer has its own next message", func() {
- g.TErrorIf(take(topic, "other consumer"))
- message1, err1 := next(topic, consumer)
- message2, err2 := next(topic, "other consumer")
- g.TErrorIf(g.SomeError(err1, err2))
- g.TAssertEqual(message1.uuid != message2.uuid, true)
- })
-
- g.Testing("error when we're not the owner", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- _, err := next(topic, consumer)
- g.TErrorIf(err)
-
- err = take(topic, consumer)
- g.TErrorIf(err)
-
- _, err = next(topic, consumer)
- g.TAssertEqual(err, fmt.Errorf(
- notOwnerErrorFmt,
- otherCfg.instanceID,
- topic,
- consumer,
- instanceID,
- ))
- })
-
- g.Testing("we can close more than once", func() {
- g.TErrorIf(g.SomeError(
- nextClose(),
- nextClose(),
- nextClose(),
- ))
- })
-}
-
-func test_messageEach() {
- g.TestStart("messageEach()")
-
- const (
- topic = "messageEach() topic"
- payloadStr = "messageEach() payload"
- consumer = "messageEach() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- pending, pendingClose, pendingErr := pendingStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- pendingErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- pendingClose,
- db.Close,
- )
-
- pub := func() uuid.UUID {
- message, err := publish(unsent, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
- g.TErrorIf(take(topic, consumer))
-
-
- g.Testing("not called on empty set", func() {
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- messageEach(rows, func(messageT) error {
- g.Unreachable()
- return nil
- })
- })
-
- g.Testing("the callback is called once for each entry", func() {
- messageIDs := []uuid.UUID{
- pub(),
- pub(),
- pub(),
- }
-
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- var collectedIDs []uuid.UUID
- err = messageEach(rows, func(message messageT) error {
- collectedIDs = append(collectedIDs, message.uuid)
- return nil
- })
- g.TErrorIf(err)
-
- g.TAssertEqual(collectedIDs, messageIDs)
- })
-
- g.Testing("we halt if the timestamp is ill-formatted", func() {
- messageID := pub()
- message_id_bytes := messageID[:]
- pub()
- pub()
- pub()
-
- const tmplUpdate = `
- UPDATE "%s_messages"
- SET timestamp = '01/01/1970'
- WHERE uuid = ?;
- `
- sqlUpdate := fmt.Sprintf(tmplUpdate, prefix)
- _, err := db.Exec(sqlUpdate, message_id_bytes)
- g.TErrorIf(err)
-
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- n := 0
- err = messageEach(rows, func(messageT) error {
- n++
- return nil
- })
-
- g.TAssertEqual(
- err,
- &time.ParseError{
- Layout: time.RFC3339Nano,
- Value: "01/01/1970",
- LayoutElem: "2006",
- ValueElem: "01/01/1970",
- Message: "",
- },
- )
- g.TAssertEqual(n, 3)
-
- const tmplDelete = `
- DELETE FROM "%s_messages"
- WHERE uuid = ?;
- `
- sqlDelete := fmt.Sprintf(tmplDelete, prefix)
- _, err = db.Exec(sqlDelete, message_id_bytes)
- g.TErrorIf(err)
- })
-
- g.Testing("we halt if the callback returns an error", func() {
- myErr := errors.New("callback error early return")
-
- rows1, err1 := pending(topic, consumer)
- g.TErrorIf(err1)
-
- n1 := 0
- err1 = messageEach(rows1, func(messageT) error {
- n1++
- if n1 == 4 {
- return myErr
- }
- return nil
- })
-
- rows2, err2 := pending(topic, consumer)
- g.TErrorIf(err2)
-
- n2 := 0
- err2 = messageEach(rows2, func(messageT) error {
- n2++
- return nil
- })
-
- g.TAssertEqual(err1, myErr)
- g.TErrorIf(err2)
- g.TAssertEqual(n1, 4)
- g.TAssertEqual(n2, 6)
- })
-
- g.Testing("noop when given nil for *sql.Rows", func() {
- err := messageEach(nil, func(messageT) error {
- g.Unreachable()
- return nil
- })
- g.TErrorIf(err)
- })
-}
-
-func test_pendingStmt() {
- g.TestStart("pendingStmt()")
-
- const (
- topic = "pending() topic"
- payloadStr = "pending() payload"
- consumer = "pending() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- pending, pendingClose, pendingErr := pendingStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- pendingErr,
- commitErr,
- toDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- pendingClose,
- commitClose,
- toDeadClose,
- db.Close,
- )
-
- pub := func(topic string) messageT {
- g.TErrorIf(take(topic, consumer))
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message
- }
- g.TErrorIf(take(topic, consumer))
-
- collectPending := func(topic string, consumer string) []messageT {
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- var messages []messageT
- err = messageEach(rows, func(message messageT) error {
- messages = append(messages, message)
- return nil
- })
- g.TErrorIf(err)
- return messages
- }
-
-
- g.Testing("an empty database has 0 pending items", func() {
- g.TAssertEqual(len(collectPending(topic, consumer)), 0)
- })
-
- g.Testing("after publishing we get all messages", func() {
- expected := []messageT{
- pub(topic),
- pub(topic),
- }
-
- g.TAssertEqualI(collectPending(topic, consumer), expected)
- })
-
- g.Testing("we get the same messages when calling again", func() {
- messages1 := collectPending(topic, consumer)
- messages2 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
- g.TAssertEqualI(messages1, messages2)
- })
-
- g.Testing("we don't get messages from other topics", func() {
- pub("other topic")
-
- g.TAssertEqual(len(collectPending(topic, consumer)), 2)
- g.TAssertEqual(len(collectPending("other topic", consumer)), 1)
- })
-
- g.Testing("after others commit, pending still returns them", func() {
- g.TErrorIf(take(topic, "other consumer"))
- messages1 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
- g.TErrorIf(
- commit("other consumer", messages1[0].uuid),
- )
-
- messages2 := collectPending(topic, consumer)
- g.TAssertEqualI(messages1, messages2)
- })
-
- g.Testing("committing other topic doesn't change current", func() {
- messages1 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
-
- message := pub("other topic")
-
- g.TErrorIf(commit(consumer, message.uuid))
-
- messages2 := collectPending(topic, consumer)
- g.TAssertEqualI(messages1, messages2)
- })
-
- g.Testing("after commiting, pending doesn't return them again", func() {
- messages1 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
-
- g.TErrorIf(commit(consumer, messages1[0].uuid))
-
- messages2 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages2), 1)
- g.TAssertEqual(messages2[0], messages1[1])
-
- g.TErrorIf(commit(consumer, messages1[1].uuid))
-
- messages3 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages3), 0)
- })
-
- g.Testing("on deadletter, pending also doesn't return them", func() {
- messages0 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages0), 0)
-
- message1 := pub(topic)
- message2 := pub(topic)
-
- messages1 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages1), 2)
-
- err = toDead(consumer, message1.uuid, uuid.New())
- g.TErrorIf(err)
-
- messages2 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages2), 1)
- g.TAssertEqual(messages2[0], message2)
-
- err = toDead(consumer, message2.uuid, uuid.New())
- g.TErrorIf(err)
-
- messages3 := collectPending(topic, consumer)
- g.TAssertEqual(len(messages3), 0)
- })
-
- g.Testing("if commits are unordered, pending is still sorted", func() {
- message1 := pub(topic)
- message2 := pub(topic)
- message3 := pub(topic)
-
- g.TAssertEqual(collectPending(topic, consumer), []messageT{
- message1,
- message2,
- message3,
- })
-
- g.TErrorIf(commit(consumer, message2.uuid))
- g.TAssertEqual(collectPending(topic, consumer), []messageT{
- message1,
- message3,
- })
-
- g.TErrorIf(commit(consumer, message1.uuid))
- g.TAssertEqual(collectPending(topic, consumer), []messageT{
- message3,
- })
-
- g.TErrorIf(commit(consumer, message3.uuid))
- g.TAssertEqual(len(collectPending(topic, consumer)), 0)
- })
-
- g.Testing("when we're not the owners we get nothing", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- message1 := pub(topic)
- message2 := pub(topic)
- message3 := pub(topic)
- message4 := pub(topic)
- message5 := pub(topic)
-
- expected := []messageT{
- message1,
- message2,
- message3,
- message4,
- message5,
- }
-
- g.TAssertEqual(collectPending(topic, consumer), expected)
-
- err := take(topic, consumer)
- g.TErrorIf(err)
-
- rows, err := pending(topic, consumer)
- g.TErrorIf(err)
-
- err = messageEach(rows, func(messageT) error {
- g.Unreachable()
- return nil
- })
- g.TErrorIf(err)
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- pendingClose(),
- pendingClose(),
- pendingClose(),
- ))
- })
-}
-
-func test_commitStmt() {
- g.TestStart("commitStmt()")
-
- const (
- topic = "commit() topic"
- payloadStr = "commit() payload"
- consumer = "commit() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- commitErr,
- toDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- commitClose,
- toDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
- cmt := func(consumer string, messageID uuid.UUID) error {
- g.TErrorIf(take(topic, consumer))
-
- return commit(consumer, messageID)
- }
-
-
- g.Testing("we can't commit twice", func() {
- messageID := pub(topic)
-
- err1 := cmt(consumer, messageID)
- err2 := cmt(consumer, messageID)
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("we can't commit non-existent messages", func() {
- err := cmt(consumer, uuid.New())
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintNotNull,
- )
- })
-
- g.Testing("multiple consumers may commit a message", func() {
- messageID := pub(topic)
-
- g.TErrorIf(g.SomeError(
- cmt(consumer, messageID),
- cmt("other consumer", messageID),
- cmt("yet another consumer", messageID),
- ))
- })
-
- g.Testing("a consumer can commit to multiple topics", func() {
- messageID1 := pub(topic)
- messageID2 := pub("other topic")
- messageID3 := pub("yet another topic")
-
- g.TErrorIf(g.SomeError(
- cmt(consumer, messageID1),
- cmt(consumer, messageID2),
- cmt(consumer, messageID3),
- ))
- })
-
- g.Testing("a consumer can consume many messages from a topic", func() {
- messageID1 := pub(topic)
- messageID2 := pub(topic)
- messageID3 := pub(topic)
-
- g.TErrorIf(g.SomeError(
- cmt(consumer, messageID1),
- cmt(consumer, messageID2),
- cmt(consumer, messageID3),
- ))
- })
-
- g.Testing("we can't commit a dead message", func() {
- messageID := pub(topic)
-
- err1 := toDead(consumer, messageID, uuid.New())
- err2 := cmt(consumer, messageID)
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("error if we don't own the topic/consumer", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- messageID := pub(topic)
-
- err := take(topic, consumer)
- g.TErrorIf(err)
-
- err = commit(consumer, messageID)
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintTrigger,
- )
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- commitClose(),
- commitClose(),
- commitClose(),
- ))
- })
-}
-
-func test_toDeadStmt() {
- g.TestStart("toDeadStmt()")
-
- const (
- topic = "toDead() topic"
- payloadStr = "toDead() payload"
- consumer = "toDead() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- commitErr,
- toDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- commitClose,
- toDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
- asDead := func(
- consumer string,
- messageID uuid.UUID,
- deadletterID uuid.UUID,
- ) error {
- g.TErrorIf(take(topic, consumer))
- return toDead(consumer, messageID, deadletterID)
- }
-
-
- g.Testing("we can't mark as dead twice", func() {
- messageID := pub(topic)
-
- err1 := asDead(consumer, messageID, uuid.New())
- err2 := asDead(consumer, messageID, uuid.New())
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("we can't reuse a deadletter id", func() {
- messageID1 := pub(topic)
- messageID2 := pub(topic)
- deadletterID := uuid.New()
-
- err1 := asDead(consumer, messageID1, deadletterID)
- err2 := asDead(consumer, messageID2, deadletterID)
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
-
- })
-
- g.Testing("we can't mark as dead non-existent messages", func() {
- err := asDead(consumer, uuid.New(), uuid.New())
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintNotNull,
- )
- })
-
- g.Testing("multiple consumers may mark a message as dead", func() {
- messageID := pub(topic)
-
- g.TErrorIf(g.SomeError(
- asDead(consumer, messageID, uuid.New()),
- asDead("another consumer", messageID, uuid.New()),
- asDead("yet another consumer", messageID, uuid.New()),
- ))
- })
-
- g.Testing("a consumer can mark as dead in multiple topics", func() {
- messageID1 := pub(topic)
- messageID2 := pub("other topic")
- messageID3 := pub("yet other topic")
-
- g.TErrorIf(g.SomeError(
- asDead(consumer, messageID1, uuid.New()),
- asDead(consumer, messageID2, uuid.New()),
- asDead(consumer, messageID3, uuid.New()),
- ))
- })
-
- g.Testing("a consumer can produce many deadletters in a topic", func() {
- messageID1 := pub(topic)
- messageID2 := pub(topic)
- messageID3 := pub(topic)
-
- g.TErrorIf(g.SomeError(
- asDead(consumer, messageID1, uuid.New()),
- asDead(consumer, messageID2, uuid.New()),
- asDead(consumer, messageID3, uuid.New()),
- ))
- })
-
- g.Testing("a consumer can intercalate commits and deadletters", func() {
- messageID1 := pub(topic)
- messageID2 := pub(topic)
- messageID3 := pub(topic)
- messageID4 := pub(topic)
- messageID5 := pub(topic)
-
- g.TErrorIf(g.SomeError(
- asDead(consumer, messageID1, uuid.New()),
- commit(consumer, messageID2),
- commit(consumer, messageID3),
- asDead(consumer, messageID4, uuid.New()),
- commit(consumer, messageID5),
- ))
- })
-
- g.Testing("we can't mark a committed message as dead", func() {
- messageID := pub(topic)
-
- err1 := commit(consumer, messageID)
- err2 := asDead(consumer, messageID, uuid.New())
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("error if we don't own the message's consumer/topic", func() {
- otherCfg := cfg
- otherCfg.instanceID = instanceID + 1
-
- messageID1 := pub(topic)
- messageID2 := pub(topic)
-
- take, takeClose, takeErr := takeStmt(otherCfg)
- g.TErrorIf(takeErr)
- defer takeClose()
-
- err := toDead(consumer, messageID1, uuid.New())
- g.TErrorIf(err)
-
- err = take(topic, consumer)
- g.TErrorIf(err)
-
- err = toDead(consumer, messageID2, uuid.New())
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintTrigger,
- )
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- toDeadClose(),
- toDeadClose(),
- toDeadClose(),
- ))
- })
-}
-
-func test_replayStmt() {
- g.TestStart("replayStmt()")
-
- const (
- topic = "replay() topic"
- payloadStr = "replay() payload"
- consumer = "replay() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- replayErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- replayClose,
- db.Close,
- )
-
- pub := func() messageT {
- message, err := publish(unsent, uuid.New())
- g.TErrorIf(err)
- return message
- }
- g.TErrorIf(take(topic, consumer))
-
-
- g.Testing("we can replay a message", func() {
- message := pub()
- deadletterID := uuid.New()
- replayedID := uuid.New()
-
- err1 := toDead(consumer, message.uuid, deadletterID)
- replayed, err2 := replay(deadletterID, replayedID)
- g.TErrorIf(g.SomeError(err1, err2))
-
- g.TAssertEqual(replayed.uuid, replayedID)
- g.TAssertEqual(replayed.id == message.id, false)
- g.TAssertEqual(replayed.uuid == message.uuid, false)
- })
-
- g.Testing("a replayed message keeps its payload", func() {
- message := pub()
- deadletterID := uuid.New()
- err := toDead(consumer, message.uuid, deadletterID)
- g.TErrorIf(err)
-
- replayed, err := replay(deadletterID, uuid.New())
- g.TErrorIf(err)
- g.TAssertEqual(message.flowID, replayed.flowID)
- g.TAssertEqual(message.payload, replayed.payload)
- })
-
- g.Testing("we can't replay a dead message twice", func() {
- message := pub()
- deadletterID := uuid.New()
-
- err := toDead(consumer, message.uuid, deadletterID)
- g.TErrorIf(err)
-
- _, err1 := replay(deadletterID, uuid.New())
- _, err2 := replay(deadletterID, uuid.New())
- g.TErrorIf(err1)
- g.TAssertEqual(
- err2.(golite.Error).ExtendedCode,
- golite.ErrConstraintUnique,
- )
- })
-
- g.Testing("we cant replay non-existent messages", func() {
- _, err := replay(uuid.New(), uuid.New())
- g.TAssertEqual(
- err.(golite.Error).ExtendedCode,
- golite.ErrConstraintNotNull,
- )
- })
-
- g.Testing("messages can die and then be replayed many times", func() {
- message := pub()
- deadletterID1 := uuid.New()
- deadletterID2 := uuid.New()
-
- err := toDead(consumer, message.uuid, deadletterID1)
- g.TErrorIf(err)
-
- replayed1, err := replay(deadletterID1, uuid.New())
- g.TErrorIf(err)
-
- err = toDead(consumer, replayed1.uuid, deadletterID2)
- g.TErrorIf(err)
-
- replayed2, err := replay(deadletterID2, uuid.New())
- g.TErrorIf(err)
-
- g.TAssertEqual(message.flowID, replayed1.flowID)
- g.TAssertEqual(replayed1.flowID, replayed2.flowID)
- })
-
- g.Testing("no actual closing occurs", func() {
- g.TErrorIf(g.SomeError(
- replayClose(),
- replayClose(),
- replayClose(),
- ))
- })
-}
-
-func test_oneDeadStmt() {
- g.TestStart("oneDeadStmt()")
-
- const (
- topic = "oneDead() topic"
- payloadStr = "oneDead() payload"
- consumer = "oneDead() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- replayErr,
- oneDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- replayClose,
- oneDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
-
- g.Testing("error on missing deadletters", func() {
- _, err := oneDead(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("deadletters on other topics don't show for us", func() {
- err := toDead(consumer, pub("other topic"), uuid.New())
- g.TErrorIf(err)
-
- _, err = oneDead(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("deadletters for other consumers don't show for use", func() {
- g.TErrorIf(take(topic, "other consumer"))
- err := toDead("other consumer", pub(topic), uuid.New())
- g.TErrorIf(err)
-
- _, err = oneDead(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-
- g.Testing("after being replayed deadletters aren't returned", func() {
- messageID1 := uuid.New()
- messageID2 := uuid.New()
- messageID3 := uuid.New()
-
- err1 := toDead(consumer, pub(topic), messageID1)
- err2 := toDead(consumer, pub(topic), messageID2)
- err3 := toDead(consumer, pub(topic), messageID3)
- g.TErrorIf(g.SomeError(err1, err2, err3))
-
- deadletter, err := oneDead(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(deadletter.uuid, messageID1)
-
- _, err = replay(messageID2, uuid.New())
- g.TErrorIf(err)
-
- deadletter, err = oneDead(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(deadletter.uuid, messageID1)
-
- _, err = replay(messageID1, uuid.New())
- g.TErrorIf(err)
-
- deadletter, err = oneDead(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(deadletter.uuid, messageID3)
-
- _, err = replay(messageID3, uuid.New())
- g.TErrorIf(err)
-
- _, err = oneDead(topic, consumer)
- g.TAssertEqual(err, sql.ErrNoRows)
- })
-}
-
-func test_deadletterEach() {
- g.TestStart("deadletterEach")
-
- const (
- topic = "deadletterEach() topic"
- payloadStr = "deadletterEach() payload"
- consumer = "deadletterEach() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- allDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- allDeadClose,
- db.Close,
- )
-
- pub := func() uuid.UUID {
- message, err := publish(unsent, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
- dead := func(messageID uuid.UUID) uuid.UUID {
- deadletterID := uuid.New()
- err := toDead(consumer, messageID, deadletterID)
- g.TErrorIf(err)
-
- return deadletterID
- }
- g.TErrorIf(take(topic, consumer))
-
-
- g.Testing("not called on empty set", func() {
- rows, err := allDead(topic, consumer)
- g.TErrorIf(err)
-
- n := 0
- deadletterEach(rows, func(deadletterT, messageT) error {
- n++
- return nil
- })
- })
-
- g.Testing("the callback is called once for each entry", func() {
- expected := []uuid.UUID{
- dead(pub()),
- dead(pub()),
- dead(pub()),
- }
-
- rows, err := allDead(topic, consumer)
- g.TErrorIf(err)
-
- var deadletterIDs []uuid.UUID
- deadletterEach(rows, func(
- deadletter deadletterT,
- _ messageT,
- ) error {
- deadletterIDs = append(deadletterIDs, deadletter.uuid)
- return nil
- })
-
- g.TAssertEqual(deadletterIDs, expected)
- })
-
- g.Testing("we halt if the timestamp is ill-formatted", func() {
- messageID := pub()
- message_id_bytes := messageID[:]
- dead(messageID)
- dead(pub())
- dead(pub())
- dead(pub())
- dead(pub())
-
- const tmplUpdate = `
- UPDATE "%s_offsets"
- SET timestamp = '01-01-1970'
- WHERE message_id IN (
- SELECT id FROM "%s_messages" WHERE uuid = ?
- );
- `
- sqlUpdate := fmt.Sprintf(tmplUpdate, prefix, prefix)
- _, err := db.Exec(sqlUpdate, message_id_bytes)
- g.TErrorIf(err)
-
- rows, err := allDead(topic, consumer)
- g.TErrorIf(err)
-
- n := 0
- err = deadletterEach(rows, func(deadletterT, messageT) error {
- n++
- return nil
- })
-
- g.TAssertEqual(
- err,
- &time.ParseError{
- Layout: time.RFC3339Nano,
- Value: "01-01-1970",
- LayoutElem: "2006",
- ValueElem: "01-01-1970",
- Message: "",
- },
- )
- g.TAssertEqual(n, 3)
-
- const tmplDelete = `
- DELETE FROM "%s_offsets"
- WHERE message_id IN (
- SELECT id FROM "%s_messages" WHERE uuid = ?
- );
- `
- sqlDelete := fmt.Sprintf(tmplDelete, prefix, prefix)
- _, err = db.Exec(sqlDelete, message_id_bytes)
- g.TErrorIf(err)
- })
-
- g.Testing("we halt if the callback returns an error", func() {
- myErr := errors.New("early return error")
-
- rows1, err1 := allDead(topic, consumer)
- g.TErrorIf(err1)
-
- n1 := 0
- err1 = deadletterEach(rows1, func(deadletterT, messageT) error {
- n1++
- if n1 == 1 {
- return myErr
- }
- return nil
- })
-
- rows2, err2 := allDead(topic, consumer)
- g.TErrorIf(err2)
-
- n2 := 0
- err = deadletterEach(rows2, func(deadletterT, messageT) error {
- n2++
- return nil
- })
-
- g.TAssertEqual(err1, myErr)
- g.TErrorIf(err2)
- g.TAssertEqual(n1, 1)
- g.TAssertEqual(n2, 7)
- })
-}
-
-func test_allDeadStmt() {
- g.TestStart("allDeadStmt()")
-
- const (
- topic = "allDead() topic"
- payloadStr = "allDead() payload"
- consumer = "allDead() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- replayErr,
- allDeadErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- replayClose,
- allDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
- collectAll := func(
- topic string,
- consumer string,
- ) ([]deadletterT, []messageT) {
- var (
- deadletters []deadletterT
- messages []messageT
- )
- eachFn := func(
- deadletter deadletterT,
- message messageT,
- ) error {
- deadletters = append(deadletters, deadletter)
- messages = append(messages, message)
- return nil
- }
-
- rows, err := allDead(topic, consumer)
- g.TErrorIf(err)
-
- err = deadletterEach(rows, eachFn)
- g.TErrorIf(err)
-
- return deadletters, messages
- }
-
-
- g.Testing("no entry on empty deadletters", func() {
- deadletterIDs, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(deadletterIDs), 0)
- })
-
- g.Testing("deadletters on other topics don't show up", func() {
- err := toDead(consumer, pub("other topic"), uuid.New())
- g.TErrorIf(err)
-
- deadletters, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(deadletters), 0)
- })
-
- g.Testing("deadletters of other consumers don't show up", func() {
- g.TErrorIf(take(topic, "other consumer"))
- err := toDead("other consumer", pub(topic), uuid.New())
- g.TErrorIf(err)
-
- deadletterIDs, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(deadletterIDs), 0)
- })
-
- g.Testing("deadletters are given in order", func() {
- deadletterIDs := []uuid.UUID{
- uuid.New(),
- uuid.New(),
- uuid.New(),
- }
- messageIDs := []uuid.UUID{
- pub(topic),
- pub(topic),
- pub(topic),
- }
-
- err1 := toDead(consumer, messageIDs[0], deadletterIDs[0])
- err2 := toDead(consumer, messageIDs[1], deadletterIDs[1])
- err3 := toDead(consumer, messageIDs[2], deadletterIDs[2])
- g.TErrorIf(g.SomeError(err1, err2, err3))
-
- deadletters, messages := collectAll(topic, consumer)
- g.TAssertEqual(deadletters[0].uuid, deadletterIDs[0])
- g.TAssertEqual(deadletters[1].uuid, deadletterIDs[1])
- g.TAssertEqual(deadletters[2].uuid, deadletterIDs[2])
- g.TAssertEqual( messages[0].uuid, messageIDs[0])
- g.TAssertEqual( messages[1].uuid, messageIDs[1])
- g.TAssertEqual( messages[2].uuid, messageIDs[2])
- g.TAssertEqual(len(deadletters), 3)
- g.TAssertEqual(len(messages), 3)
- })
-
- g.Testing("after being replayed, they stop appearing", func() {
- deadletters, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(deadletters), 3)
-
- _, err := replay(deadletters[0].uuid, uuid.New())
- g.TErrorIf(err)
- collecteds, _ := collectAll(topic, consumer)
- g.TAssertEqual(len(collecteds), 2)
-
- _, err = replay(deadletters[1].uuid, uuid.New())
- g.TErrorIf(err)
- collecteds, _ = collectAll(topic, consumer)
- g.TAssertEqual(len(collecteds), 1)
-
- _, err = replay(deadletters[2].uuid, uuid.New())
- g.TErrorIf(err)
- collecteds, _ = collectAll(topic, consumer)
- g.TAssertEqual(len(collecteds), 0)
- })
-}
-
-func test_sizeStmt() {
- g.TestStart("sizeStmt()")
-
- const (
- topic = "size() topic"
- payloadStr = "size() payload"
- consumer = "size() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
- size, sizeClose, sizeErr := sizeStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- toDeadErr,
- replayErr,
- oneDeadErr,
- sizeErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- toDeadClose,
- replayClose,
- sizeClose,
- oneDeadClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
-
- g.Testing("0 on empty topic", func() {
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("other topics don't fall into our count", func() {
- pub("other topic")
-
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("otherwise we just get the sum", func() {
- pub(topic)
- pub(topic)
- pub(topic)
- pub(topic)
- pub(topic)
-
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 5)
- })
-
- g.Testing("deadletters aren't taken into account", func() {
- sixthMessageID := pub(topic)
- err := toDead(consumer, sixthMessageID, uuid.New())
- g.TErrorIf(err)
-
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 6)
- })
-
- g.Testing("after replay, deadletters increases the size", func() {
- deadletter, err := oneDead(topic, consumer)
- g.TErrorIf(err)
-
- _, err = replay(deadletter.uuid, uuid.New())
- g.TErrorIf(err)
-
- n, err := size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(n, 7)
- })
-}
-
-func test_countStmt() {
- g.TestStart("countStmt()")
-
- const (
- topic = "count() topic"
- payloadStr = "count() payload"
- consumer = "count() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- next, nextClose, nextErr := nextStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- count, countClose, countErr := countStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- nextErr,
- commitErr,
- toDeadErr,
- countErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- nextClose,
- commitClose,
- toDeadClose,
- countClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
-
- g.Testing("0 on empty topic", func() {
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("other topics don't add to our count", func() {
- err := commit(consumer, pub("other topic"))
- g.TErrorIf(err)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("other consumers don't influence our count", func() {
- g.TErrorIf(take(topic, "other consumer"))
- err := commit("other consumer", pub(topic))
- g.TErrorIf(err)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("unconsumed messages don't count", func() {
- pub(topic)
- pub(topic)
- pub(topic)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 0)
- })
-
- g.Testing("consumed messages do count", func() {
- message, err := next(topic, consumer)
- g.TErrorIf(err)
- err = commit(consumer, message.uuid)
- g.TErrorIf(err)
-
- message, err = next(topic, consumer)
- g.TErrorIf(err)
- err = commit(consumer, message.uuid)
- g.TErrorIf(err)
-
- message, err = next(topic, consumer)
- g.TErrorIf(err)
- err = commit(consumer, message.uuid)
- g.TErrorIf(err)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 3)
- })
-
- g.Testing("deadletters count as consumed", func() {
- message, err := next(topic, consumer)
- g.TErrorIf(err)
-
- err = toDead(consumer, message.uuid, uuid.New())
- g.TErrorIf(err)
-
- n, err := count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(n, 4)
- })
-}
-
-func test_hasDataStmt() {
- g.TestStart("hasDataStmt()")
-
- const (
- topic = "hasData() topic"
- payloadStr = "hasData() payload"
- consumer = "hasData() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- db, err := sql.Open(golite.DriverName, dbpath)
- g.TErrorIf(err)
- g.TErrorIf(createTables(db, prefix))
-
- cfg := dbconfigT{
- shared: db,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- next, nextClose, nextErr := nextStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- hasData, hasDataClose, hasDataErr := hasDataStmt(cfg)
- g.TErrorIf(g.SomeError(
- takeErr,
- publishErr,
- nextErr,
- commitErr,
- toDeadErr,
- hasDataErr,
- ))
- defer g.SomeFnError(
- takeClose,
- publishClose,
- nextClose,
- commitClose,
- toDeadClose,
- hasDataClose,
- db.Close,
- )
-
- pub := func(topic string) uuid.UUID {
- g.TErrorIf(take(topic, consumer))
-
- unsentWithTopic := unsent
- unsentWithTopic.Topic = topic
- message, err := publish(unsentWithTopic, uuid.New())
- g.TErrorIf(err)
- return message.uuid
- }
-
-
- g.Testing("false on empty topic", func() {
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, false)
- })
-
- g.Testing("other topics don't change the response", func() {
- pub("other topic")
-
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, false)
- })
-
- g.Testing("published messages flip the flag", func() {
- pub(topic)
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, true)
- })
-
- g.Testing("other consumers don't influence us", func() {
- g.TErrorIf(take(topic, "other consumer"))
- message, err := next(topic, "other consumer")
- g.TErrorIf(err)
-
- err = commit("other consumer", message.uuid)
- g.TErrorIf(err)
-
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, true)
- })
-
- g.Testing("consuming messages unflips the result", func() {
- message, err := next(topic, consumer)
- g.TErrorIf(err)
-
- err = commit(consumer, message.uuid)
- g.TErrorIf(err)
-
- has, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has, false)
- })
-
- g.Testing("same for deadletters", func() {
- has0, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has0, false)
-
- messageID1 := pub(topic)
- messageID2 := pub(topic)
-
- has1, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has1, true)
-
- err = toDead(consumer, messageID1, uuid.New())
- g.TErrorIf(err)
- err = commit(consumer, messageID2)
- g.TErrorIf(err)
-
- has2, err := hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(has2, false)
- })
-}
-
-func test_initDB() {
- g.TestStart("initDB()")
-
- const (
- topic = "initDB() topic"
- payloadStr = "initDB() payload"
- consumer = "initDB() consumer"
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- var messages []messageT
- notifyFn := func(message messageT) {
- messages = append(messages, message)
- }
-
- queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
- g.TErrorIf(err)
- defer queries.close()
-
- g.TErrorIf(queries.take(topic, consumer))
-
-
- g.Testing("we can perform all the wrapped operations", func() {
- messageID := uuid.New()
- newMessageID := uuid.New()
- deadletterID := uuid.New()
-
- messageV1, err := queries.publish(unsent, messageID)
- g.TErrorIf(err)
-
- messageV2, err := queries.next(topic, consumer)
- g.TErrorIf(err)
-
- var messagesV3 []messageT
- pendingFn := func(message messageT) error {
- messagesV3 = append(messagesV3, message)
- return nil
- }
- err = queries.pending(topic, consumer, pendingFn)
- g.TErrorIf(err)
- g.TAssertEqual(len(messagesV3), 1)
- messageV3 := messagesV3[0]
-
- err = queries.toDead(consumer, messageID, deadletterID)
- g.TErrorIf(err)
-
- deadletterV1, err := queries.oneDead(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(deadletterV1.uuid, deadletterID)
-
- var (
- deadlettersV2 []deadletterT
- messagesV4 []messageT
- )
- deadletterFn := func(
- deadletter deadletterT,
- message messageT,
- ) error {
- deadlettersV2 = append(deadlettersV2, deadletter)
- messagesV4 = append(messagesV4, message)
- return nil
- }
- err = queries.allDead(topic, consumer, deadletterFn)
- g.TErrorIf(err)
- g.TAssertEqual(len(deadlettersV2), 1)
- g.TAssertEqual(deadlettersV2[0].uuid, deadletterID)
- g.TAssertEqual(len(messagesV4), 1)
- messageV4 := messagesV4[0]
-
- g.TAssertEqual(messageV1, messageV2)
- g.TAssertEqual(messageV1, messageV3)
- g.TAssertEqual(messageV1, messageV4)
-
- newMessageV1, err := queries.replay(deadletterID, newMessageID)
- g.TErrorIf(err)
-
- err = queries.commit(consumer, newMessageID)
- g.TErrorIf(err)
-
- newMessageV0 := messageV1
- newMessageV0.id = newMessageV1.id
- newMessageV0.uuid = newMessageID
- newMessageV0.timestamp = newMessageV1.timestamp
- g.TAssertEqual(newMessageV1, newMessageV0)
-
- size, err := queries.size(topic)
- g.TErrorIf(err)
- g.TAssertEqual(size, 2)
-
- count, err := queries.count(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(count, 2)
-
- hasData, err := queries.hasData(topic, consumer)
- g.TErrorIf(err)
- g.TAssertEqual(hasData, false)
- })
-}
-
-func test_queriesTclose() {
- g.TestStart("queriesT.close()")
-
- const (
- dbpath = golite.InMemory
- prefix = defaultPrefix
- )
-
- notifyFn := func(messageT) {}
- queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
- g.TErrorIf(err)
-
-
- g.Testing("closing more than once does not error", func() {
- g.TErrorIf(g.SomeError(
- queries.close(),
- queries.close(),
- ))
- })
-}
-
-func test_newPinger() {
- g.TestStart("newPinger()")
-
- g.Testing("onPing() on a closed pinger is a noop", func() {
- pinger := newPinger[string]()
- pinger.close()
- pinger.onPing(func(s string) {
- panic(s)
- })
- pinger.tryPing("ignored")
- })
-
- g.Testing("onPing() on a closed pinger with data gets that", func() {
- pinger := newPinger[string]()
- pinger.tryPing("received")
- pinger.tryPing("ignored")
- g.TAssertEqual(pinger.closed(), false)
-
- pinger.close()
- g.TAssertEqual(pinger.closed(), true)
-
- c := make(chan string)
- count := 0
- go pinger.onPing(func(s string) {
- if count > 0 {
- panic(s)
- }
- count++
-
- c <- s
- })
-
- given := <- c
- g.TAssertEqual(given, "received")
- })
-
- g.Testing("when onPing is late, we loose messages", func() {
- pinger := newPinger[string]()
- pinger.tryPing("first seen")
- pinger.tryPing("second dropped")
- pinger.tryPing("third dropped")
-
- c := make(chan string)
- go pinger.onPing(func(s string) {
- c <- s
- })
- s := <- c
-
- close(c)
- pinger.close()
- g.TAssertEqual(s, "first seen")
- })
-
- g.Testing("if onPing is on time, it may not loose", func() {
- pinger := newPinger[string]()
- pinger.tryPing("first seen")
-
- c := make(chan string)
- go pinger.onPing(func(s string) {
- c <- s
- })
-
- s1 := <- c
- pinger.tryPing("second seen")
- s2 := <- c
-
- close(c)
- pinger.close()
- g.TAssertEqual(s1, "first seen")
- g.TAssertEqual(s2, "second seen")
- })
-
- g.Testing("if onPing takes too long, it still looses messages", func() {
- pinger := newPinger[string]()
- pinger.tryPing("first seen")
-
- c1 := make(chan string)
- c2 := make(chan struct{})
- go pinger.onPing(func(s string) {
- c1 <- s
- c2 <- struct{}{}
- })
-
- s1 := <- c1
- pinger.tryPing("second seen")
- pinger.tryPing("third dropped")
- <- c2
- s2 := <- c1
- <- c2
-
- close(c2)
- close(c1)
- pinger.close()
- g.TAssertEqual(s1, "first seen")
- g.TAssertEqual(s2, "second seen")
- })
-}
-
-func test_makeSubscriptionsFunc() {
- g.TestStart("makeSubscriptionsFunc()")
-
- g.Testing("we can have multiple readers", func() {
- subscriptions := makeSubscriptionsFuncs()
-
- var (
- readStarted sync.WaitGroup
- readFinished sync.WaitGroup
- )
- c := make(chan struct{})
- readFn := func(subscriptionsSetM) error {
- readStarted.Done()
- <- c
- return nil
- }
- addReader := func() {
- readStarted.Add(1)
- readFinished.Add(1)
- go func() {
- subscriptions.read(readFn)
- readFinished.Done()
- }()
- }
-
- addReader()
- addReader()
- addReader()
- readStarted.Wait()
- c <- struct{}{}
- c <- struct{}{}
- c <- struct{}{}
- readFinished.Wait()
- })
-
- g.Testing("writers are exclusive", func() {
- subscriptions := makeSubscriptionsFuncs()
-
- var (
- readStarted sync.WaitGroup
- readFinished sync.WaitGroup
- writeWillStart sync.WaitGroup
- writeFinished sync.WaitGroup
- )
- c := make(chan string)
- readFn := func(subscriptionsSetM) error {
- readStarted.Done()
- c <- "reader"
- return nil
- }
- addReader := func() {
- readStarted.Add(1)
- readFinished.Add(1)
- go func() {
- subscriptions.read(readFn)
- readFinished.Done()
- }()
- }
-
- writeFn := func(subscriptionsSetM) error {
- c <- "writer"
- return nil
- }
- addWriter := func() {
- writeWillStart.Add(1)
- writeFinished.Add(1)
- go func() {
- writeWillStart.Done()
- subscriptions.write(writeFn)
- writeFinished.Done()
- }()
- }
-
- addReader()
- addReader()
- addReader()
- readStarted.Wait()
- addWriter()
- writeWillStart.Wait()
-
- g.TAssertEqual(<-c, "reader")
- g.TAssertEqual(<-c, "reader")
- g.TAssertEqual(<-c, "reader")
-
- readFinished.Wait()
- g.TAssertEqual(<-c, "writer")
- writeFinished.Wait()
- })
-}
-
-func test_makeNotifyFn() {
- g.TestStart("makeNotifyFn()")
-
- g.Testing("when topic is nil only top pinger gets pinged", func() {
- pinger1 := newPinger[struct{}]()
- pinger2 := newPinger[[]byte]()
- defer pinger1.close()
- defer pinger2.close()
-
- go pinger1.onPing(func(struct{}) {
- panic("consumer pinger")
- })
- go pinger2.onPing(func(payload []byte) {
- panic("waiter pinger")
- })
-
- flowID := uuid.New()
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer-1": consumerT{
- pinger: pinger1,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter-1": waiterT{
- pinger: pinger2,
- },
- },
- },
- },
- }
- subsFn := func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- }
- topPinger := newPinger[struct{}]()
- defer topPinger.close()
-
- var wg sync.WaitGroup
- wg.Add(1)
- go topPinger.onPing(func(struct{}) {
- wg.Done()
- })
-
- notifyFn := makeNotifyFn(subsFn, topPinger)
-
- message := messageT{
- uuid: uuid.New(),
- topic: "nobody is subscribed to this one",
- payload: []byte("nobody with get this payload"),
- }
- notifyFn(message)
- wg.Wait()
- })
-
- g.Testing("otherwise all interested subscribers get pinged", func() {
- const topic = "the topic name"
-
- pinger1 := newPinger[struct{}]()
- pinger2 := newPinger[[]byte]()
- defer pinger1.close()
- defer pinger2.close()
-
- var wg sync.WaitGroup
- go pinger1.onPing(func(struct{}) {
- wg.Done()
- })
- go pinger2.onPing(func([]byte) {
- wg.Done()
- })
- wg.Add(2)
-
- flowID := uuid.New()
-
- set := subscriptionsSetM{
- topic: topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer-1": consumerT{
- pinger: pinger1,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter-1": waiterT{
- pinger: pinger2,
- },
- },
- },
- },
- }
-
- subsFn := func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- }
-
- topPinger := newPinger[struct{}]()
- defer topPinger.close()
- go topPinger.onPing(func(struct{}) {
- wg.Done()
- })
- wg.Add(1)
-
- notifyFn := makeNotifyFn(subsFn, topPinger)
-
- message := messageT{
- uuid: uuid.New(),
- topic: topic,
- flowID: flowID,
- payload: []byte("ignored in this test"),
- }
- notifyFn(message)
- wg.Wait()
- })
-}
-
-func test_collectClosedWaiters() {
- g.TestStart("collectClosedWaiter()")
-
- g.Testing("collects all the reports to be closed", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
- flowID4 := uuid.New()
- flowID5 := uuid.New()
-
- mkwaiter := func(closed bool) waiterT {
- fn := func() bool {
- return closed
- }
- return waiterT{
- closed: &fn,
- }
- }
-
- set := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": mkwaiter(false),
- "waiter-2": mkwaiter(true),
- "waiter-3": mkwaiter(true),
- },
- flowID2: map[string]waiterT{
- "waiter-4": mkwaiter(true),
- "waiter-5": mkwaiter(false),
- },
- },
- },
- "topic-2": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID3: map[string]waiterT{
- "waiter-1": mkwaiter(false),
- "waiter-2": mkwaiter(false),
- },
- flowID4: map[string]waiterT{
- "waiter-3": mkwaiter(true),
- "waiter-4": mkwaiter(true),
- "waiter-5": mkwaiter(true),
- "waiter-6": mkwaiter(true),
- },
- },
- },
- "topic-3": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID5: map[string]waiterT{},
- },
- },
- "topic-4": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := map[string]map[uuid.UUID][]string{
- "topic-1": map[uuid.UUID][]string{
- flowID1: []string{
- "waiter-2",
- "waiter-3",
- },
- flowID2: []string{
- "waiter-4",
- },
- },
- "topic-2": map[uuid.UUID][]string{
- flowID3: []string{},
- flowID4: []string{
- "waiter-3",
- "waiter-4",
- "waiter-5",
- "waiter-6",
- },
- },
- "topic-3": map[uuid.UUID][]string{
- flowID5: []string{},
- },
- "topic-4": map[uuid.UUID][]string{},
- }
-
- given := collectClosedWaiters(set)
-
- // sort names for equality
- for _, waitersIndex := range given {
- for _, names := range waitersIndex {
- sort.Strings(names)
- }
- }
- g.TAssertEqual(given, expected)
- })
-}
-
-func test_trimEmptyLeaves() {
- g.TestStart("trimEmptyLeaves()")
-
- g.Testing("noop on an empty index", func() {
- input := map[string]map[uuid.UUID][]string{}
- expected := map[string]map[uuid.UUID][]string{}
-
- trimEmptyLeaves(input)
- g.TAssertEqual(input, expected)
- })
-
- g.Testing("simplifies tree when it can", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
- flowID4 := uuid.New()
-
- input := map[string]map[uuid.UUID][]string{
- "topic-1": map[uuid.UUID][]string{
- flowID1: []string{
- "waiter-1",
- },
- flowID2: []string{},
- },
- "topic-2": map[uuid.UUID][]string{
- flowID3: []string{},
- flowID4: []string{},
- },
- "topic-3": map[uuid.UUID][]string{},
- }
-
- expected := map[string]map[uuid.UUID][]string{
- "topic-1": map[uuid.UUID][]string{
- flowID1: []string{
- "waiter-1",
- },
- },
- }
-
- trimEmptyLeaves(input)
- g.TAssertEqual(input, expected)
- })
-
- g.Testing("fully prune tree if possible", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
- flowID4 := uuid.New()
- flowID5 := uuid.New()
-
- input := map[string]map[uuid.UUID][]string{
- "topic-1": map[uuid.UUID][]string{},
- "topic-2": map[uuid.UUID][]string{},
- "topic-3": map[uuid.UUID][]string{},
- "topic-4": map[uuid.UUID][]string{
- flowID1: []string{},
- },
- "topic-5": map[uuid.UUID][]string{
- flowID2: []string{},
- flowID3: []string{},
- flowID4: []string{},
- flowID5: []string{},
- },
- }
-
- expected := map[string]map[uuid.UUID][]string{}
-
- trimEmptyLeaves(input)
- g.TAssertEqual(input, expected)
- })
-}
-
-func test_deleteIfEmpty() {
- g.TestStart("deleteIfEmpty()")
-
- g.Testing("noop if there are consumers", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- },
- }
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- },
- }
- expected2 := subscriptionsSetM{}
-
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected1)
-
- delete(set["topic"].consumers, "consumer")
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected2)
- })
-
- g.Testing("noop if there are waiters", func() {
- flowID := uuid.New()
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: nil,
- },
- },
- }
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: nil,
- },
- },
- }
- expected2 := subscriptionsSetM{}
-
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected1)
-
- delete(set["topic"].waiters, flowID)
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected2)
- })
-
- g.Testing("otherwise deletes when empty", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{}
-
- deleteIfEmpty(set, "topic")
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("unrelated topics are left untouched", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- deleteIfEmpty(set, "another-topic")
- g.TAssertEqual(set, expected)
- })
-}
-
-func test_deleteEmptyTopics() {
- g.TestStart("deleteEmptyTopics()")
-
- g.Testing("cleans up all empty topics from the set", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
-
- set := subscriptionsSetM{
- "empty": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- "has-consumers": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- "has-waiters": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: nil,
- },
- },
- "has-both": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID2: nil,
- },
- },
- "has-neither": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{
- "has-consumers": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- "has-waiters": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: nil,
- },
- },
- "has-both": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID2: nil,
- },
- },
- }
-
- deleteEmptyTopics(set)
- g.TAssertEqual(set, expected)
- })
-}
-
-func test_removeClosedWaiter() {
- g.TestStart("removeClosedWaiter()")
-
- g.Testing("removes from set all that we request", func() {
- flowID0 := uuid.New()
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
-
- set := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": waiterT{},
- "waiter-2": waiterT{},
- },
- flowID2: map[string]waiterT{
- "waiter-3": waiterT{},
- "waiter-4": waiterT{},
- "waiter-5": waiterT{},
- },
- },
- },
- "topic-2": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID3: map[string]waiterT{
- "waiter-6": waiterT{},
- "waiter-7": waiterT{},
- "waiter-8": waiterT{},
- },
- },
- },
- "topic-3": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- input := map[string]map[uuid.UUID][]string{
- "topic-0": map[uuid.UUID][]string{
- flowID0: []string{
- "waiter-0",
- },
- },
- "topic-1": map[uuid.UUID][]string{
- flowID1: []string{
- "waiter-2",
- },
- flowID2: []string{
- "waiter-3",
- "waiter-4",
- "waiter-5",
- },
- },
- "topic-2": map[uuid.UUID][]string{
- flowID3: []string{
- "waiter-6",
- "waiter-7",
- "waiter-8",
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": waiterT{},
- },
- },
- },
- }
-
- removeClosedWaiters(set, input)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("empty flowIDs from input GET LEAKED", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
-
- input := map[string]map[uuid.UUID][]string{
- "topic-2": map[uuid.UUID][]string{
- flowID2: []string{
- "waiter",
- },
- },
- }
-
- set := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{},
- },
- },
- "topic-2": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID2: map[string]waiterT{
- "waiter": waiterT{},
- },
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{},
- },
- },
- }
-
- removeClosedWaiters(set, input)
- g.TAssertEqual(set, expected)
- })
-}
-
-func test_reapClosedWaiters() {
- g.TestStart("reapClosedWaiters()")
-
- g.Testing("trimEmptyLeaves() prevents removal of empty flows", func() {
- var (
- set subscriptionsSetM
- readCount = 0
- writeCount = 0
- )
- readFn := func(fn func(subscriptionsSetM) error) error {
- readCount++
- return fn(set)
- }
- writeFn := func(fn func(subscriptionsSetM) error) error {
- writeCount++
- return fn(set)
- }
-
- openFn := func() bool {
- return false
- }
- closedFn := func() bool {
- return true
- }
- open := waiterT{ closed: &openFn }
- closed := waiterT{ closed: &closedFn }
- flowID1 := uuid.New()
- flowID2 := uuid.New()
-
- set = subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": open,
- "waiter-2": open,
- "waiter-3": open,
- },
- flowID2: map[string]waiterT{
- "waiter-4": open,
- },
- },
- },
- }
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": open,
- "waiter-2": open,
- "waiter-3": open,
- },
- flowID2: map[string]waiterT{
- "waiter-4": open,
- },
- },
- },
- }
-
- expected2 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-2": open,
- "waiter-3": open,
- },
- flowID2: map[string]waiterT{
- "waiter-4": open,
- },
- },
- },
- }
-
- expected3 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-2": open,
- "waiter-3": open,
- },
- },
- },
- }
-
- reapClosedWaiters(readFn, writeFn)
- g.TAssertEqual(readCount, 1)
- g.TAssertEqual(writeCount, 0)
- g.TAssertEqual(set, expected1)
-
- set["topic"].waiters[flowID1]["waiter-1"] = closed
- reapClosedWaiters(readFn, writeFn)
- g.TAssertEqual(readCount, 2)
- g.TAssertEqual(writeCount, 1)
- g.TAssertEqual(set, expected2)
-
- set["topic"].waiters[flowID2]["waiter-4"] = closed
- reapClosedWaiters(readFn, writeFn)
- g.TAssertEqual(readCount, 3)
- g.TAssertEqual(writeCount, 2)
- g.TAssertEqual(set, expected3)
- })
-}
-
-func test_everyNthCall() {
- g.TestStart("everyNthCall()")
-
- g.Testing("0 (incorrect) would make fn never be called", func() {
- never := everyNthCall[int](0, func(int) {
- panic("unreachable")
- })
- for i := 0; i < 100; i++ {
- never(i)
- }
- })
-
- g.Testing("the first call is delayed to be the nth", func() {
- count := 0
- values := []string{}
- fn := everyNthCall[string](3, func(s string) {
- count++
- values = append(values, s)
- })
-
- fn("1 ignored")
- g.TAssertEqual(count, 0)
- fn("2 ignored")
- g.TAssertEqual(count, 0)
- fn("3 not ignored")
- g.TAssertEqual(count, 1)
-
- fn("4 ignored")
- fn("5 ignored")
- g.TAssertEqual(count, 1)
-
- fn("6 not ignored")
- fn("7 ignored")
- fn("8 ignored")
- g.TAssertEqual(count, 2)
-
- fn("9 not ignored")
- g.TAssertEqual(count, 3)
-
- expected := []string{
- "3 not ignored",
- "6 not ignored",
- "9 not ignored",
- }
- g.TAssertEqual(values, expected)
- })
-}
-
-func test_runReaper() {
- g.TestStart("runReaper()")
-
- g.Testing("debounce reapClosedWaiters `reaperSkipCount` times", func() {
- set := subscriptionsSetM{}
-
- var (
- readCount = 0
- writeCount = 0
- )
- readFn := func(fn func(subscriptionsSetM) error) error {
- readCount++
- return fn(set)
- }
- writeFn := func(fn func(subscriptionsSetM) error) error {
- writeCount++
- return fn(set)
- }
-
- var iterCount int
- onPing := func(fn func(struct{})) {
- for i := 0; i < iterCount; i++ {
- fn(struct{}{})
- }
- }
-
- iterCount = reaperSkipCount - 1
- runReaper(onPing, readFn, writeFn)
- g.TAssertEqual(readCount, 0)
- g.TAssertEqual(writeCount, 0)
-
- iterCount = reaperSkipCount
- runReaper(onPing, readFn, writeFn)
- g.TAssertEqual(readCount, 1)
- g.TAssertEqual(writeCount, 0)
- })
-}
-
-func test_NewWithPrefix() {
- g.TestStart("NewWithPrefix()")
-
- g.Testing("we get an error with a bad prefix", func() {
- _, err := NewWithPrefix(golite.InMemory, "a bad prefix")
- g.TAssertEqual(err, g.ErrBadSQLTablePrefix)
- })
-
- g.Testing("otherwise we have a queueT and no errors", func() {
- queue, err := NewWithPrefix(golite.InMemory, "good")
- g.TErrorIf(err)
- queue.Close()
-
- g.TAssertEqual(queue.(queueT).pinger.closed(), true)
- })
-}
-
-func test_New() {
- g.TestStart("New()")
-
- g.Testing("smoke test that we get a queueT", func() {
- queue, err := New(golite.InMemory)
- g.TErrorIf(err)
- queue.Close()
-
- g.TAssertEqual(queue.(queueT).pinger.closed(), true)
- })
-}
-
-func test_asPublicMessage() {
- g.TestStart("asPublicMessage()")
-
- g.Testing("it picks the correct fields 🤷", func() {
- input := messageT{
- uuid: uuid.New(),
- timestamp: time.Now(),
- topic: "topic",
- flowID: uuid.New(),
- payload: []byte("payload"),
- }
-
- expected := Message{
- ID: input.uuid,
- Timestamp: input.timestamp,
- Topic: input.topic,
- FlowID: input.flowID,
- Payload: input.payload,
- }
-
- given := asPublicMessage(input)
- g.TAssertEqual(given, expected)
- })
-}
-
-func test_queueT_Publish() {
- g.TestStart("queueT.Publish()")
-
- const (
- topic = "queueT.Publish() topic"
- payloadStr = "queueT.Publish() payload"
- dbpath = golite.InMemory
- )
- var (
- flowID = uuid.New()
- payload = []byte(payloadStr)
- unsent = UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- )
-
- queue, err := New(dbpath)
- g.TErrorIf(err)
- defer queue.Close()
-
-
- g.Testing("it just publishes and returns", func() {
- message, err := queue.Publish(unsent)
- g.TErrorIf(err)
-
- g.TAssertEqual(message.Timestamp == time.Time{}, false)
- g.TAssertEqual(message.Topic, topic)
- g.TAssertEqual(message.FlowID, flowID)
- g.TAssertEqual(message.Payload, payload)
- })
-
- queue.Close()
- g.TAssertEqual(queue.(queueT).pinger.closed(), true)
-}
-
-func test_registerConsumerFn() {
- g.TestStart("registerConsumerFn()")
-
- g.Testing("adds a new topicSubscriptionT{} if needed", func() {
- consumer := consumerT{
- data: consumerDataT{
- topic: "topic",
- name: "consumer",
- },
- }
-
- set := subscriptionsSetM{}
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumer,
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- registerConsumerFn(consumer)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("otherwise it just uses what exists", func() {
- flowID := uuid.New()
-
- consumer := consumerT{
- data: consumerDataT{
- topic: "topic",
- name: "consumer",
- },
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "other-consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{},
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumer,
- "other-consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{},
- },
- },
- }
-
- registerConsumerFn(consumer)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("overwrites existing consumer if desired", func() {
- close1 := func() {}
- consumer1 := consumerT{
- data: consumerDataT{
- topic: "topic",
- name: "consumer",
- },
- close: &close1,
- }
-
- close2 := func() {}
- consumer2 := consumerT{
- data: consumerDataT{
- topic: "topic",
- name: "consumer",
- },
- close: &close2,
- }
-
- set := subscriptionsSetM{}
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumer1,
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected2 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumer2,
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- registerConsumerFn(consumer1)(set)
- g.TAssertEqual(set, expected1)
- g.TAssertEqual(reflect.DeepEqual(set, expected2), false)
-
- registerConsumerFn(consumer2)(set)
- g.TAssertEqual(set, expected2)
- g.TAssertEqual(reflect.DeepEqual(set, expected1), false)
- })
-}
-
-func test_registerWaiterFn() {
- g.TestStart("registerWaiterFn()")
-
- g.Testing("adds a new topicSubscriptionT{} if needed", func() {
- flowID := uuid.New()
-
- waiter := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- }
-
- set := subscriptionsSetM{}
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter,
- },
- },
- },
- }
-
- registerWaiterFn(waiter)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("adds a new waiters map if needed", func() {
- flowID := uuid.New()
-
- waiter := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter,
- },
- },
- },
- }
-
- registerWaiterFn(waiter)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("otherwise it just uses what exists", func() {
- flowID := uuid.New()
-
- waiter := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "other-waiter": waiterT{},
- },
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter,
- "other-waiter": waiterT{},
- },
- },
- },
- }
-
- registerWaiterFn(waiter)(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("overwrites existing waiter if desired", func() {
- flowID := uuid.New()
-
- close1 := func() {}
- waiter1 := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- close: &close1,
- }
-
- close2 := func() {}
- waiter2 := waiterT{
- data: waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter",
- },
- close: &close2,
- }
-
- set := subscriptionsSetM{}
-
- expected1 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter1,
- },
- },
- },
- }
-
- expected2 := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{
- "waiter": waiter2,
- },
- },
- },
- }
-
- registerWaiterFn(waiter1)(set)
- g.TAssertEqual(set, expected1)
- g.TAssertEqual(reflect.DeepEqual(set, expected2), false)
-
- registerWaiterFn(waiter2)(set)
- g.TAssertEqual(set, expected2)
- g.TAssertEqual(reflect.DeepEqual(set, expected1), false)
- })
-}
-
-func test_makeConsumeOneFn() {
- g.TestStart("makeConsumeOneFn()")
-
- savedLogger := slog.Default()
-
- s := new(strings.Builder)
- g.SetLoggerOutput(s)
-
- var (
- successCount int
- errorCount int
- callbackErr error
- successFnErr error
- errorFnErr error
- messages []Message
- successNames []string
- successIDs []uuid.UUID
- errorNames []string
- errorIDs []uuid.UUID
- deadletterIDs []uuid.UUID
- )
-
- data := consumerDataT{
- topic: "topic",
- name: "name",
- }
-
- callback := func(message Message) error {
- messages = append(messages, message)
- return callbackErr
- }
-
- successFn := func(name string, messageID uuid.UUID) error {
- successCount++
- successNames = append(successNames, name)
- successIDs = append(successIDs, messageID)
- return successFnErr
- }
-
- errorFn := func(
- name string,
- messageID uuid.UUID,
- deadletterID uuid.UUID,
- ) error {
- errorCount++
- errorNames = append(errorNames, name)
- errorIDs = append(errorIDs, messageID)
- deadletterIDs = append(deadletterIDs, deadletterID)
-
- return errorFnErr
- }
-
- consumeOneFn := makeConsumeOneFn(
- data,
- callback,
- successFn,
- errorFn,
- )
-
- message1 := messageT{ uuid: uuid.New() }
- message2 := messageT{ uuid: uuid.New() }
- message3 := messageT{ uuid: uuid.New() }
- message4 := messageT{ uuid: uuid.New() }
-
-
- g.Testing("error from successFn() is propagated", func() {
- err := consumeOneFn(message1)
- g.TErrorIf(err)
- g.TAssertEqual(successCount, 1)
- g.TAssertEqual(errorCount, 0)
-
- successFnErr = errors.New("successFn() error")
- err = consumeOneFn(message2)
- g.TAssertEqual(err, successFnErr)
- g.TAssertEqual(successCount, 2)
- g.TAssertEqual(errorCount, 0)
-
- g.TAssertEqual(s.String(), "")
- })
-
- g.Testing("error from callback() is logged and dropped", func() {
- *s = strings.Builder{}
-
- callbackErr = errors.New("callback() error")
- err := consumeOneFn(message3)
- g.TErrorIf(err)
- g.TAssertEqual(successCount, 2)
- g.TAssertEqual(errorCount, 1)
- g.TAssertEqual(s.String() == "", false)
- })
-
- g.Testing("error from errorFn() is propagated", func() {
- *s = strings.Builder{}
-
- errorFnErr = errors.New("errorFn() error")
- err := consumeOneFn(message4)
- g.TAssertEqual(err, errorFnErr)
- g.TAssertEqual(successCount, 2)
- g.TAssertEqual(errorCount, 2)
- g.TAssertEqual(s.String() == "", false)
- })
-
- g.Testing("calls happened with the expected arguments", func() {
- expectedMessages := []Message{
- asPublicMessage(message1),
- asPublicMessage(message2),
- asPublicMessage(message3),
- asPublicMessage(message4),
- }
-
- g.TAssertEqual(messages, expectedMessages)
- g.TAssertEqual(successNames, []string{ "name", "name" })
- g.TAssertEqual(errorNames, []string{ "name", "name" })
- g.TAssertEqual(successIDs, []uuid.UUID{
- message1.uuid,
- message2.uuid,
- })
- g.TAssertEqual(errorIDs, []uuid.UUID{
- message3.uuid,
- message4.uuid,
- })
- g.TAssertEqual(deadletterIDs[0] == message3.uuid, false)
- g.TAssertEqual(deadletterIDs[1] == message4.uuid, false)
- })
-
- slog.SetDefault(savedLogger)
-}
-
-func test_makeConsumeAllFn() {
- g.TestStart("makeConsumeAllFn()")
-
- savedLogger := slog.Default()
-
- s := new(strings.Builder)
- g.SetLoggerOutput(s)
-
- data := consumerDataT{}
-
- consumeOneFn := func(messageT) error {
- return nil
- }
-
- var eachFnErr error
- eachFn := func(string, string, func(messageT) error) error {
- return eachFnErr
- }
-
- consumeAllFn := makeConsumeAllFn(data, consumeOneFn, eachFn)
-
-
- g.Testing("silent when eachFn() returns no error", func() {
- consumeAllFn(struct{}{})
- g.TAssertEqual(s.String(), "")
- })
-
- g.Testing("logs warning otherwise", func() {
- eachFnErr = errors.New("eachFn() error")
- consumeAllFn(struct{}{})
- g.TAssertEqual(s.String() == "", false)
- })
-
- slog.SetDefault(savedLogger)
-}
-
-func test_makeWaitFn() {
- g.TestStart("makeWaitFn()")
-
- g.Testing("all it does is send the data and close things", func() {
- callCount := 0
- closeFn := func() {
- callCount++
- }
-
- c := make(chan []byte, 1)
- waitFn := makeWaitFn(c, closeFn)
-
- payload := []byte("payload")
- waitFn(payload)
- given := <- c
-
- g.TAssertEqual(given, payload)
- g.TAssertEqual(callCount, 1)
- })
-
- g.Testing("you can call it twice for cases of race condition", func() {
- callCount := 0
- closeFn := func() {
- callCount++
- }
-
- c := make(chan []byte, 1)
- waitFn := makeWaitFn(c, closeFn)
-
- payload := []byte("something")
- waitFn(payload)
- waitFn(payload)
- given1 := <- c
- given2 := <- c
-
- g.TAssertEqual(given1, payload)
- g.TAssertEqual(given2, []byte(nil))
- })
-}
-
-func test_runConsumer() {
- g.TestStart("runConsumer()")
-
- g.Testing("calls consumeAllFn() at least one", func() {
- onPing := func(fn func(struct{})) {}
-
- count := 0
- consumeAllFn := func(struct{}) {
- count++
- }
-
- runConsumer(onPing, consumeAllFn)
- g.TAssertEqual(count, 1)
- })
-
- g.Testing("can call consumeAllFn() (onPing + 1) times", func() {
- onPing := func(fn func(struct{})) {
- fn(struct{}{})
- fn(struct{}{})
- fn(struct{}{})
- }
-
- count := 0
- consumeAllFn := func(struct{}) {
- count++
- }
-
- runConsumer(onPing, consumeAllFn)
- g.TAssertEqual(count, 4)
- })
-}
-
-func test_tryFinding() {
- g.TestStart("tryFinding()")
-
- g.Testing("noop in case of failure", func() {
- myErr := errors.New("find() error")
- findFn := func(string, uuid.UUID) (messageT, error) {
- return messageT{}, myErr
- }
-
- count := 0
- waitFn := func([]byte) {
- count++
- }
-
-
- tryFinding(findFn, "topic", uuid.New(), waitFn)
- g.TAssertEqual(count, 0)
- })
-
- g.Testing("calls waitFn in case of success", func() {
- payload := []byte("find() payload")
-
- findFn := func(string, uuid.UUID) (messageT, error) {
- return messageT{ payload: payload }, nil
- }
-
- payloads := [][]byte{}
- waitFn := func(payload []byte) {
- payloads = append(payloads, payload)
- }
-
-
- tryFinding(findFn, "topic", uuid.New(), waitFn)
- g.TAssertEqual(payloads, [][]byte{ payload })
- })
-}
-
-func test_queueT_Subscribe() {
- g.TestStart("queueT.Subscribe()")
-
- set := subscriptionsSetM{}
- consumed := []uuid.UUID{}
- messages := []messageT{
- messageT{ uuid: uuid.New() },
- messageT{ uuid: uuid.New() },
- }
-
- var takeErr error
- queue := queueT{
- queries: queriesT{
- take: func(string, string) error {
- return takeErr
- },
- pending: func(
- topic string,
- consumer string,
- fn func(messageT) error,
- ) error {
- for _, message := range messages {
- consumed = append(
- consumed,
- message.uuid,
- )
- _ = fn(message)
- }
- return nil
- },
- commit: func(
- consumer string,
- messageID uuid.UUID,
- ) error {
- return nil
- },
- toDead: func(string, uuid.UUID, uuid.UUID) error {
- g.Unreachable()
- return nil
- },
- },
- subscriptions: subscriptionsT{
- write: func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- },
- },
- }
-
-
- g.Testing("registers our callback in the set and calls it", func() {
- var wg sync.WaitGroup
- wg.Add(2)
-
- queue.Subscribe("topic", "consumer-1", func(Message) error {
- wg.Done()
- return nil
- })
- defer queue.Unsubscribe("topic", "consumer-1")
-
- wg.Wait()
- g.TAssertEqual(consumed, []uuid.UUID{
- messages[0].uuid,
- messages[1].uuid,
- })
- })
-
- g.Testing("our callback also gets called when pinged", func() {
- consumed = []uuid.UUID{}
-
- var wg sync.WaitGroup
- wg.Add(4)
-
- queue.Subscribe("topic", "consumer-2", func(m Message) error {
- wg.Done()
- return nil
- })
- defer queue.Unsubscribe("topic", "consumer-2")
-
- consumer := set["topic"].consumers["consumer-2"]
- consumer.pinger.tryPing(struct{}{})
-
- wg.Wait()
-
- g.TAssertEqual(consumed, []uuid.UUID{
- messages[0].uuid,
- messages[1].uuid,
- messages[0].uuid,
- messages[1].uuid,
- })
- })
-
- g.Testing("we try to own the topic", func() {
- takeErr = errors.New("queueT.Subscribe() 1")
-
- err := queue.Subscribe("topic", "name", func(Message) error {
- g.Unreachable()
- return nil
- })
-
- g.TAssertEqual(err, takeErr)
- takeErr = nil
- })
-
- g.Testing("if we can't own the topic, we don't get registered", func() {
- takeErr = errors.New("queueT.Subscribe() 2")
-
- err := queue.Subscribe("topic", "name", func(Message) error {
- g.Unreachable()
- return nil
- })
- g.TErrorNil(err)
-
- expected := subscriptionsSetM{}
- g.TAssertEqual(set, expected)
-
- takeErr = nil
- })
-}
-
-func test_queueT_WaitFor() {
- g.TestStart("queueT.WaitFor()")
-
- findErr := errors.New("find() error")
- message := messageT{
- payload: []byte("queueT.WaitFor() payload"),
- }
-
- set := subscriptionsSetM{}
-
- queue := queueT{
- queries: queriesT{
- find: func(
- topic string,
- flowID uuid.UUID,
- ) (messageT, error) {
- return message, findErr
- },
- },
- subscriptions: subscriptionsT{
- write: func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- },
- read: func(fn func(subscriptionsSetM) error) error {
- return fn(set)
- },
- },
- }
-
-
- g.Testing("registers the waiter in the set", func() {
- flowID := uuid.New()
-
- defer queue.WaitFor("topic", flowID, "waiter-1").Close()
-
- expected := waiterDataT{
- topic: "topic",
- flowID: flowID,
- name: "waiter-1",
- }
-
- g.TAssertEqual(
- set["topic"].waiters[flowID]["waiter-1"].data,
- expected,
- )
- })
-
- g.Testing("the channel gets a message when waiter is pinged", func() {
- flowID := uuid.New()
- payload := []byte("sent payload")
-
- w := queue.WaitFor("topic", flowID, "waiter-2")
- defer w.Close()
-
- waiter := set["topic"].waiters[flowID]["waiter-2"]
- waiter.pinger.tryPing(payload)
-
- given := <- w.Channel
-
- g.TAssertEqual(given, payload)
- g.TAssertEqual((*waiter.closed)(), true)
- })
-
- g.Testing("we can also WaitFor() after publishing the message", func() {
- findErr = nil
- flowID := uuid.New()
-
- w := queue.WaitFor("topic", flowID, "waiter-3")
- defer w.Close()
-
- given := <- w.Channel
-
- waiter := set["topic"].waiters[flowID]["waiter-3"]
- waiter.pinger.tryPing([]byte("ignored"))
-
- g.TAssertEqual(given, message.payload)
- g.TAssertEqual((*waiter.closed)(), true)
- })
-
- g.Testing("if the data already exists we get it immediatelly", func() {
- flowID := uuid.New()
-
- w := queue.WaitFor("topic", flowID, "waiter-4")
- defer w.Close()
-
- waiter := set["topic"].waiters[flowID]["waiter-4"]
- g.TAssertEqual((*waiter.closed)(), true)
-
- given := <- w.Channel
- g.TAssertEqual(given, message.payload)
- })
-}
-
-func test_unsubscribeIfExistsFn() {
- g.TestStart("unsubscribeIfExistsFn()")
-
- g.Testing("noop on empty set", func() {
- set := subscriptionsSetM{}
- expected := subscriptionsSetM{}
- unsubscribeIfExistsFn("topic", "consumer")(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("noop on missing topic", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{},
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{},
- }
-
- unsubscribeIfExistsFn("other-topic", "consumer")(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("noop on missing consumer", func() {
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{},
- },
- },
- }
-
- unsubscribeIfExistsFn("topic", "other-consumer")(set)
- g.TAssertEqual(set, expected)
- })
-
- g.Testing("closes consumer and removes it from set", func() {
- flowID := uuid.New()
-
- count := 0
- close := func() {
- count++
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{
- close: &close,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{},
- },
- },
- }
-
- expected := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID: map[string]waiterT{},
- },
- },
- }
-
- unsubscribeIfExistsFn("topic", "consumer")(set)
- g.TAssertEqual(set, expected)
- g.TAssertEqual(count, 1)
- })
-
- g.Testing("empty topics are also removed", func() {
- count := 0
- close := func() {
- count++
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{
- close: &close,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{}
-
- unsubscribeIfExistsFn("topic", "consumer")(set)
- g.TAssertEqual(set, expected)
- g.TAssertEqual(count, 1)
- })
-}
-
-func test_queueT_Unsubscribe() {
- g.TestStart("queueT.Unsubscribe()")
-
- g.Testing("calls unsubscribesIfExists() via writeFn()", func() {
- closed := false
- close := func() {
- closed = true
- }
-
- set := subscriptionsSetM{
- "topic": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer": consumerT{
- close: &close,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- expected := subscriptionsSetM{}
-
- queue := queueT{
- subscriptions: subscriptionsT{
- write: func(
- fn func(subscriptionsSetM) error,
- ) error {
- return fn(set)
- },
- },
- }
-
- queue.Unsubscribe("topic", "consumer")
- g.TAssertEqual(set, expected)
- g.TAssertEqual(closed, true)
- })
-}
-
-func test_cleanSubscriptions() {
- g.TestStart("cleanSubscriptions()")
-
- g.Testing("all consumers and waiters get close()'d", func() {
- flowID1 := uuid.New()
- flowID2 := uuid.New()
- flowID3 := uuid.New()
-
- type pairT struct{
- closed func() bool
- fn func()
- }
-
- mkclose := func() pairT {
- closed := false
- return pairT{
- closed: func() bool {
- return closed
- },
- fn: func() {
- closed = true
- },
- }
- }
-
- c := mkclose()
- g.TAssertEqual(c.closed(), false)
- c.fn()
- var x bool = c.closed()
- g.TAssertEqual(x, true)
-
- close1 := mkclose()
- close2 := mkclose()
- close3 := mkclose()
- close4 := mkclose()
- close5 := mkclose()
- close6 := mkclose()
- close7 := mkclose()
- close8 := mkclose()
-
- set := subscriptionsSetM{
- "topic-1": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer-1": consumerT{
- close: &close1.fn,
- },
- "consumer-2": consumerT{
- close: &close2.fn,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID1: map[string]waiterT{
- "waiter-1": waiterT{
- close: &close3.fn,
- },
- },
- flowID2: map[string]waiterT{
- "waiter-2": waiterT{
- close: &close4.fn,
- },
- "waiter-3": waiterT{
- close: &close5.fn,
- },
- },
- },
- },
- "topic-2": topicSubscriptionT{
- consumers: map[string]consumerT{
- "consumer-3": consumerT{
- close: &close6.fn,
- },
- },
- waiters: map[uuid.UUID]map[string]waiterT{
- flowID3: map[string]waiterT{
- "waiter-4": waiterT{
- close: &close7.fn,
- },
- },
- },
- },
- "topic-3": topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- },
- }
-
- cleanSubscriptions(set)
-
- given := []bool{
- close1.closed(),
- close2.closed(),
- close3.closed(),
- close4.closed(),
- close5.closed(),
- close6.closed(),
- close7.closed(),
- close8.closed(),
- }
-
- expected := []bool{
- true,
- true,
- true,
- true,
- true,
- true,
- true,
- false,
- }
-
- g.TAssertEqualI(given, expected)
- })
-}
-
-func test_queueT_Close() {
- g.TestStart("queueT.Close()")
-
- g.Testing("clean pinger, subscriptions and queries", func() {
- var (
- pingerCount = 0
- subscriptionsCount = 0
- queriesCount = 0
-
- subscriptionsErr = errors.New("subscriptionsT{} error")
- queriesErr = errors.New("queriesT{} error")
- )
-
- queue := queueT{
- queries: queriesT{
- close: func() error{
- queriesCount++
- return queriesErr
- },
- },
- subscriptions: subscriptionsT{
- write: func(
- func(subscriptionsSetM) error,
- ) error {
- subscriptionsCount++
- return subscriptionsErr
- },
- },
- pinger: pingerT[struct{}]{
- close: func() {
- pingerCount++
- },
- },
- }
-
- err := queue.Close()
- g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr))
- g.TAssertEqual(pingerCount, 1)
- g.TAssertEqual(subscriptionsCount, 1)
- g.TAssertEqual(queriesCount, 1)
- })
-}
-
-
-func test_topicGetopt() {
- g.TestStart("topicGetopt()")
-
- g.Testing("checks for required positional argument", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{},
- }
-
- argsOut, ok := topicGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "Missing TOPIC.\n")
- g.TAssertEqual(ok, false)
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("success otherwise", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"a topic"},
- }
-
- argsOut, ok := topicGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(ok, true)
- argsIn.topic = "a topic"
- g.TAssertEqual(argsOut, argsIn)
- })
-}
-
-func test_topicConsumerGetopt() {
- g.TestStart("topicConsumerGetopt()")
-
- g.Testing("checks for TOPIC argument", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{},
- }
-
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "Missing TOPIC.\n")
- g.TAssertEqual(ok, false)
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("we get an error on unsupported flag", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"-Z"},
- }
-
- const message = "flag provided but not defined: -Z\n"
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), message)
- g.TAssertEqual(ok, false)
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("we also get an error on incorrect usage of flags", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"-C"},
- }
-
- const message = "flag needs an argument: -C\n"
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), message)
- g.TAssertEqual(ok, false)
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("we can customize the CONSUMER", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"-C", "custom consumer", "this topic"},
- }
-
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(ok, true)
- argsIn.topic = "this topic"
- argsIn.consumer = "custom consumer"
- g.TAssertEqual(argsOut, argsIn)
- })
-
- g.Testing("otherwise we get the default one", func() {
- var w strings.Builder
- argsIn := argsT{
- args: []string{"T"},
- }
-
- argsOut, ok := topicConsumerGetopt(argsIn, &w)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(ok, true)
- argsIn.topic = "T"
- argsIn.consumer = "default-consumer"
- g.TAssertEqual(argsOut, argsIn)
- })
-}
-
-func test_inExec() {
- g.TestStart("inExec()")
-
- const (
- topic = "inExec topic"
- payloadStr = "inExec payload"
- )
- var (
- payload = []byte(payloadStr)
- args = argsT{
- topic: topic,
- }
- )
-
- var (
- publishErr error
- messages []messageT
- id int64 = 0
- )
- queries := queriesT{
- publish: func(
- unsent UnsentMessage,
- messageID uuid.UUID,
- ) (messageT, error) {
- if publishErr != nil {
- return messageT{}, publishErr
- }
-
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: messageID,
- topic: unsent.Topic,
- flowID: unsent.FlowID,
- payload: unsent.Payload,
- }
- messages = append(messages, message)
- return message, nil
- },
- }
-
-
- g.Testing("messageID to output when successful", func() {
- r := bytes.NewReader(payload)
- var w strings.Builder
- rc, err := inExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(messages[0].topic, topic)
- g.TAssertEqual(messages[0].payload, payload)
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(w.String(), messages[0].uuid.String() + "\n")
- })
-
- g.Testing("if reading fails, we return the error", func() {
- var r *os.File
- var w strings.Builder
- rc, err := inExec(args, queries, r, &w)
- g.TAssertEqual(
- err.Error(),
- "invalid argument",
- )
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-
- g.Testing("if publishing fails, we return the error", func() {
- publishErr = errors.New("publish() error")
- r := strings.NewReader("read but not published")
- var w strings.Builder
- rc, err := inExec(args, queries, r, &w)
- g.TAssertEqual(err, publishErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_outExec() {
- g.TestStart("outExec()")
-
- const (
- topic = "outExec topic"
- consumer = "outExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- takeErr error
- nextErr error
- messages []messageT
- id int64 = 0
- )
- queries := queriesT{
- take: func(string, string) error {
- return takeErr
- },
- next: func(string, string) (messageT, error) {
- if nextErr != nil {
- return messageT{}, nextErr
- }
-
- if len(messages) == 0 {
- return messageT{}, sql.ErrNoRows
- }
-
- return messages[0], nil
- },
- }
- pub := func(payload []byte) {
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
-
-
- g.Testing("exit code 1 when we can't take", func() {
- takeErr = errors.New("outExec() take error")
-
- var w strings.Builder
-
- rc, err := outExec(args, queries, r, &w)
- g.TAssertEqual(err, takeErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
-
- takeErr = nil
- })
-
- g.Testing("exit code 3 when no message is available", func() {
- var w strings.Builder
-
- rc, err := outExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 3)
- })
-
- g.Testing("we get the same message until we commit", func() {
- var (
- w1 strings.Builder
- w2 strings.Builder
- w3 strings.Builder
- )
- args := argsT{
- topic: topic,
- consumer: consumer,
- }
-
- pub([]byte("first payload"))
- pub([]byte("second payload"))
-
- rc1, err1 := outExec(args, queries, r, &w1)
- rc2, err2 := outExec(args, queries, r, &w2)
- messages = messages[1:]
- rc3, err3 := outExec(args, queries, r, &w3)
-
- g.TErrorIf(g.SomeError(err1, err2, err3))
- g.TAssertEqual(w1.String(), "first payload\n")
- g.TAssertEqual(w2.String(), "first payload\n")
- g.TAssertEqual(w3.String(), "second payload\n")
- g.TAssertEqual(rc1, 0)
- g.TAssertEqual(rc2, 0)
- g.TAssertEqual(rc3, 0)
- })
-
- g.Testing("we propagate the error when the query fails", func() {
- nextErr = errors.New("next() error")
- var w strings.Builder
- rc, err := outExec(args, queries, r, &w)
- g.TAssertEqual(err, nextErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_commitExec() {
- g.TestStart("commitExec()")
-
- const (
- topic = "commitExec topic"
- consumer = "commitExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- takeErr error
- nextErr error
- commitErr error
- messages []messageT
- id int64 = 0
- )
- queries := queriesT{
- take: func(string, string) error {
- return takeErr
- },
- next: func(string, string) (messageT, error) {
- if nextErr != nil {
- return messageT{}, nextErr
- }
-
- if len(messages) == 0 {
- return messageT{}, sql.ErrNoRows
- }
-
- return messages[0], nil
- },
- commit: func(string, uuid.UUID) error {
- if commitErr != nil {
- return commitErr
- }
-
- messages = messages[1:]
- return nil
- },
- }
- pub := func(payload []byte) {
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
-
-
- g.Testing("error when we can't take", func() {
- takeErr = errors.New("commitExec() take error")
-
- var w strings.Builder
-
- rc, err := commitExec(args, queries, r, &w)
- g.TAssertEqual(err, takeErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
-
- takeErr = nil
- })
-
- g.Testing("error when there is nothing to commit", func() {
- var w strings.Builder
-
- rc, err := commitExec(args, queries, r, &w)
- g.TAssertEqual(err, sql.ErrNoRows)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-
- g.Testing("messages get committed in order", func() {
- var w strings.Builder
-
- pub([]byte("first payload"))
- pub([]byte("second payload"))
- pub([]byte("third payload"))
-
- message1 := messages[0]
- g.TAssertEqual(message1.payload, []byte("first payload"))
-
- rc, err := commitExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- message2 := messages[0]
- g.TAssertEqual(message2.payload, []byte("second payload"))
-
- rc, err = commitExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- message3 := messages[0]
- g.TAssertEqual(message3.payload, []byte("third payload"))
-
- rc, err = commitExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- g.TAssertEqual(len(messages), 0)
- })
-
- g.Testing("when next() query fails, we propagate its result", func() {
- nextErr = errors.New("next() error")
- var w strings.Builder
- rc, err := commitExec(args, queries, r, &w)
- g.TAssertEqual(err, nextErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- nextErr = nil
- })
-
- g.Testing("we also propagate the error on commit() failure", func() {
- commitErr = errors.New("commit() error")
- pub([]byte{})
- var w strings.Builder
- rc, err := commitExec(args, queries, r, &w)
- g.TAssertEqual(err, commitErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_deadExec() {
- g.TestStart("deadExec()")
-
- const (
- topic = "deadExec topic"
- consumer = "deadExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- takeErr error
- nextErr error
- toDeadErr error
- messages []messageT
- id int64 = 0
- )
- queries := queriesT{
- take: func(string, string) error {
- return takeErr
- },
- next: func(string, string) (messageT, error) {
- if nextErr != nil {
- return messageT{}, nextErr
- }
-
- if len(messages) == 0 {
- return messageT{}, sql.ErrNoRows
- }
-
- return messages[0], nil
- },
- toDead: func(
- _ string,
- _ uuid.UUID,
- deadletterID uuid.UUID,
- ) error {
- if toDeadErr != nil {
- return toDeadErr
- }
-
- messages = messages[1:]
- return nil
- },
- }
- pub := func(payload []byte) {
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
-
-
- g.Testing("error when we can't take", func() {
- takeErr = errors.New("deadExec() take error")
-
- var w strings.Builder
-
- rc, err := deadExec(args, queries, r, &w)
- g.TAssertEqual(err, takeErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
-
- takeErr = nil
- })
-
- g.Testing("error when there is nothing to mark as dead", func() {
- var w strings.Builder
-
- rc, err := deadExec(args, queries, r, &w)
- g.TAssertEqual(err, sql.ErrNoRows)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-
- g.Testing("the latest message becomes a deadletter", func() {
- var w strings.Builder
-
- pub([]byte("first payload"))
- pub([]byte("second payload"))
-
- message1 := messages[0]
- g.TAssertEqual(message1.payload, []byte("first payload"))
-
- rc, err := deadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- message2 := messages[0]
- g.TAssertEqual(message2.payload, []byte("second payload"))
-
- rc, err = deadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- g.TAssertEqual(len(messages), 0)
- })
-
- g.Testing("next() error is propagated", func() {
- nextErr = errors.New("next() error")
- var w strings.Builder
- rc, err := deadExec(args, queries, r, &w)
- g.TAssertEqual(err, nextErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- nextErr = nil
- })
-
- g.Testing("toDead() error is propagated", func() {
- toDeadErr = errors.New("toDead() error")
- pub([]byte{})
- var w strings.Builder
- rc, err := deadExec(args, queries, r, &w)
- g.TAssertEqual(err, toDeadErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_listDeadExec() {
- g.TestStart("listDeadExec()")
-
- const (
- topic = "listDeadExec topic"
- consumer = "listDeadExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- messages []messageT
- deadletters []deadletterT
- id int64 = 0
- errorIndex = -1
- allDeadErr = errors.New("allDead() error")
- )
- queries := queriesT{
- allDead: func(
- _ string,
- _ string,
- callback func(deadletterT, messageT) error,
- ) error {
- for i, deadletter := range deadletters {
- if i == errorIndex {
- return allDeadErr
- }
-
- callback(deadletter, messageT{})
- }
-
- return nil
- },
- }
- pub := func() {
- payload := []byte("ignored payload for this test")
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
- commit := func() {
- messages = messages[1:]
- }
- dead := func() {
- message := messages[0]
- now := time.Now()
- deadletter := deadletterT{
- uuid: uuid.New(),
- timestamp: now,
- consumer: consumer,
- messageID: message.uuid,
- }
-
- messages = messages[1:]
- deadletters = append(deadletters, deadletter)
- }
- replay := func() {
- pub()
- deadletters = deadletters[1:]
- }
-
-
- g.Testing("nothing is shown if topic is empty", func() {
- var w strings.Builder
-
- rc, err := listDeadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- })
-
- g.Testing("deadletters are printed in order", func() {
- var w strings.Builder
-
- pub()
- pub()
- pub()
-
- rc, err := listDeadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
-
- dead()
- commit()
- dead()
-
- expected := fmt.Sprintf(
- "%s\t%s\t%s\n%s\t%s\t%s\n",
- deadletters[0].uuid.String(),
- deadletters[0].timestamp.Format(time.RFC3339),
- deadletters[0].consumer,
- deadletters[1].uuid.String(),
- deadletters[1].timestamp.Format(time.RFC3339),
- deadletters[1].consumer,
- )
-
- rc, err = listDeadExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), expected)
- g.TAssertEqual(rc, 0)
- })
-
- g.Testing("deadletters disappear after being replayed", func() {
- var (
- w1 strings.Builder
- w2 strings.Builder
- )
-
- replay()
-
- deadletter := deadletters[0]
- expected := fmt.Sprintf(
- "%s\t%s\t%s\n",
- deadletter.uuid.String(),
- deadletter.timestamp.Format(time.RFC3339),
- deadletter.consumer,
- )
-
- rc, err := listDeadExec(args, queries, r, &w1)
- g.TErrorIf(err)
- g.TAssertEqual(w1.String(), expected)
- g.TAssertEqual(rc, 0)
-
- replay()
-
- rc, err = listDeadExec(args, queries, r, &w2)
- g.TErrorIf(err)
- g.TAssertEqual(w2.String(), "")
- g.TAssertEqual(rc, 0)
- })
-
- g.Testing("a database failure interrupts the output", func() {
- var w strings.Builder
-
- pub()
- pub()
- pub()
- dead()
- dead()
- dead()
-
- deadletter := deadletters[0]
- expected := fmt.Sprintf(
- "%s\t%s\t%s\n",
- deadletter.uuid.String(),
- deadletter.timestamp.Format(time.RFC3339),
- deadletter.consumer,
- )
-
- errorIndex = 1
- rc, err := listDeadExec(args, queries, r, &w)
- g.TAssertEqual(err, allDeadErr)
- g.TAssertEqual(w.String(), expected)
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_replayExec() {
- g.TestStart("replayExec()")
-
- const (
- topic = "replayExec topic"
- consumer = "replayExec consumer"
- )
- var (
- w strings.Builder
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var (
- oneDeadErr error
- replayErr error
- messages []messageT
- deadletters []deadletterT
- deadMessages []messageT
- id int64 = 0
- )
- queries := queriesT{
- oneDead: func(string, string) (deadletterT, error) {
- if oneDeadErr != nil {
- return deadletterT{}, oneDeadErr
- }
-
- if len(deadletters) == 0 {
- return deadletterT{}, sql.ErrNoRows
- }
-
- return deadletters[0], nil
- },
- replay: func(uuid.UUID, uuid.UUID) (messageT, error) {
- if replayErr != nil {
- return messageT{}, replayErr
- }
-
- message := deadMessages[0]
- messages = append(messages, message)
- deadletters = deadletters[1:]
- deadMessages = deadMessages[1:]
- return message, nil
- },
- }
- pub := func(payload []byte) {
- id++
- now := time.Now()
- message := messageT{
- id: id,
- timestamp: now,
- uuid: uuid.New(),
- topic: topic,
- flowID: uuid.New(),
- payload: payload,
- }
- messages = append(messages, message)
- }
- commit := func() {
- messages = messages[1:]
- }
- dead := func() {
- message := messages[0]
- deadletter := deadletterT{ uuid: uuid.New() }
-
- messages = messages[1:]
- deadletters = append(deadletters, deadletter)
- deadMessages = append(deadMessages, message)
- }
- next := func() string {
- return string(messages[0].payload)
- }
-
-
- g.Testing("error when there is nothing to replay", func() {
- rc, err := replayExec(args, queries, r, &w)
- g.TAssertEqual(err, sql.ErrNoRows)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
-
- pub([]byte("first payload"))
- pub([]byte("second payload"))
- pub([]byte("third payload"))
- pub([]byte("fourth payload"))
-
- rc, err = replayExec(args, queries, r, &w)
- g.TAssertEqual(err, sql.ErrNoRows)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-
- g.Testing("deadletters are replayed in order", func() {
- dead()
- commit()
- dead()
- commit()
-
- rc, err := replayExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(next(), "first payload")
-
- commit()
-
- rc, err = replayExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(next(), "third payload")
- })
-
- g.Testing("oneDead() error is forwarded", func() {
- oneDeadErr = errors.New("oneDead() error")
- rc, err := replayExec(args, queries, r, &w)
- g.TAssertEqual(err, oneDeadErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- oneDeadErr = nil
- })
-
- g.Testing("replay() error is also forwarded", func() {
- pub([]byte{})
- dead()
-
- replayErr = errors.New("replay() error")
- rc, err := replayExec(args, queries, r, &w)
- g.TAssertEqual(err, replayErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- })
-}
-
-func test_sizeExec() {
- g.TestStart("sizeExec()")
-
- const (
- topic = "sizeExec topic"
- consumer = "sizeExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var sizeErr error
- queries := queriesT{
- size: func(string) (int, error) {
- if sizeErr != nil {
- return -1, sizeErr
- }
-
- return 123, nil
- },
- }
-
-
- g.Testing("it propagates the error when the query fails", func() {
- sizeErr = errors.New("size() error")
- var w strings.Builder
- rc, err := sizeExec(args, queries, r, &w)
- g.TAssertEqual(err, sizeErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- sizeErr = nil
- })
-
- g.Testing("otherwise it just prints what is was given", func() {
- var w strings.Builder
- rc, err := sizeExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "123\n")
- g.TAssertEqual(rc, 0)
- })
-}
-
-func test_countExec() {
- g.TestStart("countExec()")
-
- const (
- topic = "countExec topic"
- consumer = "countExec consumer"
- )
- var (
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- var countErr error
- queries := queriesT{
- count: func(string, string) (int, error) {
- if countErr != nil {
- return -1, countErr
- }
-
- return 2222, nil
- },
- }
-
-
- g.Testing("it propagates the query error", func() {
- countErr = errors.New("count() error")
- var w strings.Builder
- rc, err := countExec(args, queries, r, &w)
- g.TAssertEqual(err, countErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- countErr = nil
- })
-
- g.Testing("otherwise it prints the given count", func() {
- var w strings.Builder
- rc, err := countExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(w.String(), "2222\n")
- g.TAssertEqual(rc, 0)
- })
-}
-
-func test_hasDataExec() {
- g.TestStart("hasData()")
-
- const (
- topic = "hasData topic"
- consumer = "hasData consumer"
- )
- var (
- w strings.Builder
- r = strings.NewReader("")
- args = argsT{
- topic: topic,
- consumer: consumer,
- }
- )
-
- hasData := true
- var hasDataErr error
- queries := queriesT{
- hasData: func(string, string) (bool, error) {
- if hasDataErr != nil {
- return false, hasDataErr
- }
-
- return hasData, nil
- },
- }
-
-
- g.Testing("it propagates the query error", func() {
- hasDataErr = errors.New("hasData() error")
- rc, err := hasDataExec(args, queries, r, &w)
- g.TAssertEqual(err, hasDataErr)
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 1)
- hasDataErr = nil
- })
-
- g.Testing("otherwise if just returns (not prints) the flag", func() {
- hasData = true
- rc, err := hasDataExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 0)
-
- hasData = false
- rc, err = hasDataExec(args, queries, r, &w)
- g.TErrorIf(err)
- g.TAssertEqual(rc, 1)
-
- g.TAssertEqual(w.String(), "")
- })
-}
-
-func test_usage() {
- g.TestStart("usage()")
-
- g.Testing("it just writes to io.Writer", func() {
- var w strings.Builder
- usage("xxx", &w)
- const message =
- "Usage: xxx [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
- g.TAssertEqual(w.String(), message)
- })
-
- g.Testing("noop on io.Discard for io.Writer", func() {
- usage("AN ERROR IF SEEN ANYWHERE!", io.Discard)
- })
-}
-
-func test_getopt() {
- g.TestStart("getopt()")
-
- const warning = "Missing COMMAND.\n"
- const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
-
- commandsMap := map[string]commandT{
- "good": commandT{
- name: "good",
- getopt: func(args argsT, _ io.Writer) (argsT, bool) {
- return args, true
- },
- },
- "bad": commandT{
- name: "bad",
- getopt: func(args argsT, w io.Writer) (argsT, bool) {
- if len(args.args) == 0 {
- fmt.Fprintln(w, "no required arg")
- return args, false
- }
-
- if args.args[0] != "required" {
- fmt.Fprintln(w, "not correct one")
- return args, false
- }
-
- args.topic = "a topic"
- args.consumer = "a consumer"
- return args, true
- },
- },
- }
-
-
- g.Testing("we suppress the default error message", func() {
- var w strings.Builder
- argv := []string{"$0", "-h"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- g.TAssertEqual(w.String(), usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("we get an error on unsupported flag", func() {
- var w strings.Builder
- argv := []string{"$0", "-X"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- const message = "flag provided but not defined: -X\n"
- g.TAssertEqual(w.String(), message + usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("we also get an error on incorrect usage of flags", func() {
- var w strings.Builder
- argv := []string{"$0", "-f"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- const message = "flag needs an argument: -f\n"
- g.TAssertEqual(w.String(), message + usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("error when not given a command", func() {
- var w strings.Builder
- argv := []string{"$0"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- g.TAssertEqual(w.String(), warning + usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("error on unknown command", func() {
- var w strings.Builder
- argv := []string{"$0", "unknown"}
- _, _, rc := getopt(argv, commandsMap, &w)
-
- const message = `Bad COMMAND: "unknown".` + "\n"
- g.TAssertEqual(w.String(), message + usage)
- g.TAssertEqual(rc, 2)
- })
-
- g.Testing("checks the command usage", func() {
- var (
- w1 strings.Builder
- w2 strings.Builder
- w3 strings.Builder
- )
-
- argv1 := []string{"$0", "bad"}
- argv2 := []string{"$0", "bad", "arg"}
- argv3 := []string{"$0", "bad", "required"}
- _, _, rc1 := getopt(argv1, commandsMap, &w1)
- _, _, rc2 := getopt(argv2, commandsMap, &w2)
- args, command, rc3 := getopt(argv3, commandsMap, &w3)
- expectedArgs := argsT{
- databasePath: "fiinha.db",
- prefix: "fiinha",
- command: "bad",
- allArgs: argv3,
- args: argv3[2:],
- topic: "a topic",
- consumer: "a consumer",
- }
-
- g.TAssertEqual(w1.String(), "no required arg\n" + usage)
- g.TAssertEqual(w2.String(), "not correct one\n" + usage)
- g.TAssertEqual(w3.String(), "")
- g.TAssertEqual(rc1, 2)
- g.TAssertEqual(rc2, 2)
- g.TAssertEqual(rc3, 0)
- g.TAssertEqual(args, expectedArgs)
- g.TAssertEqual(command.name, "bad")
- })
-
- g.Testing("when given a command we the default values", func() {
- var w strings.Builder
- args, command, rc := getopt(
- []string{"$0", "good"},
- commandsMap,
- &w,
- )
- expectedArgs := argsT{
- databasePath: "fiinha.db",
- prefix: "fiinha",
- command: "good",
- allArgs: []string{"$0", "good"},
- args: []string{},
- }
-
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(args, expectedArgs)
- g.TAssertEqual(command.name, "good")
- })
-
- g.Testing("we can customize both values", func() {
- var w strings.Builder
- argv := []string{"$0", "-f", "F", "-p", "P", "good"}
- args, command, rc := getopt(argv, commandsMap, &w)
- expectedArgs := argsT{
- databasePath: "F",
- prefix: "P",
- command: "good",
- allArgs: argv,
- args: []string{},
- }
-
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(args, expectedArgs)
- g.TAssertEqual(command.name, "good")
- })
-
- g.Testing("a command can have its own commands and options", func() {
- var w strings.Builder
- argv := []string{"$0", "-f", "F", "good", "-f", "-f", "SUB"}
- args, command, rc := getopt(argv, commandsMap, &w)
- expectedArgs := argsT{
- databasePath: "F",
- prefix: "fiinha",
- command: "good",
- allArgs: argv,
- args: []string{"-f", "-f", "SUB"},
- }
-
- g.TAssertEqual(w.String(), "")
- g.TAssertEqual(rc, 0)
- g.TAssertEqual(args, expectedArgs)
- g.TAssertEqual(command.name, "good")
- })
-}
-
-func test_runCommand() {
- g.TestStart("runCommand()")
-
- g.Testing("returns an error on bad prefix", func() {
- stdin := strings.NewReader("")
- var (
- stdout strings.Builder
- stderr strings.Builder
- )
- args := argsT{
- prefix: "a bad prefix",
- command: "in",
- allArgs: []string{"$0"},
- args: []string{"some topic name"},
- }
- rc := runCommand(args, commands["in"], stdin, &stdout, &stderr)
-
- g.TAssertEqual(rc, 1)
- g.TAssertEqual(stdout.String(), "")
- g.TAssertEqual(stderr.String(), "Invalid table prefix\n")
- })
-
- g.Testing("otherwise it build a queueT and calls command", func() {
- stdin := strings.NewReader("")
- var (
- stdout1 strings.Builder
- stdout2 strings.Builder
- stderr1 strings.Builder
- stderr2 strings.Builder
- )
- args1 := argsT{
- prefix: defaultPrefix,
- command: "good",
- }
- args2 := argsT{
- prefix: defaultPrefix,
- command: "bad",
- }
- myErr := errors.New("an error")
- good := commandT{
- exec: func(
- _ argsT,
- _ queriesT,
- _ io.Reader,
- w io.Writer,
- ) (int, error) {
- fmt.Fprintf(w, "good text\n")
- return 0, nil
- },
- }
- bad := commandT{
- exec: func(
- _ argsT,
- _ queriesT,
- _ io.Reader,
- w io.Writer,
- ) (int, error) {
- fmt.Fprintf(w, "bad text\n")
- return 123, myErr
- },
- }
- rc1 := runCommand(args1, good, stdin, &stdout1, &stderr1)
- rc2 := runCommand(args2, bad, stdin, &stdout2, &stderr2)
-
- g.TAssertEqual(stdout1.String(), "good text\n")
- g.TAssertEqual(stdout2.String(), "bad text\n")
- g.TAssertEqual(stderr1.String(), "")
- g.TAssertEqual(stderr2.String(), "an error\n")
- g.TAssertEqual(rc1, 0)
- g.TAssertEqual(rc2, 123)
- })
-}
-
-func test_commands() {
- g.TestStart("commands")
-
- g.Testing("ensure map key and name are in sync", func() {
- for key, command := range commands {
- g.TAssertEqual(command.name, key)
- }
- })
-}
-
-
-func dumpQueries() {
- queries := []struct{name string; fn func(string) queryT}{
- { "createTables", createTablesSQL },
- { "take", takeSQL },
- { "publish", publishSQL },
- { "find", findSQL },
- { "next", nextSQL },
- { "pending", pendingSQL },
- { "commit", commitSQL },
- { "toDead", toDeadSQL },
- { "replay", replaySQL },
- { "oneDead", oneDeadSQL },
- { "allDead", allDeadSQL },
- { "size", sizeSQL },
- { "count", countSQL },
- { "hasData", hasDataSQL },
- }
- for _, query := range queries {
- q := query.fn(defaultPrefix)
- fmt.Printf("\n-- %s.sql:", query.name)
- fmt.Printf("\n-- write:%s\n", q.write)
- fmt.Printf("\n-- read:%s\n", q.read)
- fmt.Printf("\n-- owner:%s\n", q.owner)
- }
-}
-
-
-
-func MainTest() {
- if os.Getenv("TESTING_DUMP_SQL_QUERIES") != "" {
- dumpQueries()
- return
- }
-
- g.Init()
- test_defaultPrefix()
- test_serialized()
- test_execSerialized()
- test_tryRollback()
- test_inTx()
- test_createTables()
- test_takeStmt()
- test_publishStmt()
- test_findStmt()
- test_nextStmt()
- test_messageEach()
- test_pendingStmt()
- test_commitStmt()
- test_toDeadStmt()
- test_replayStmt()
- test_oneDeadStmt()
- test_deadletterEach()
- test_allDeadStmt()
- test_sizeStmt()
- test_countStmt()
- test_hasDataStmt()
- test_initDB()
- test_queriesTclose()
- test_newPinger()
- test_makeSubscriptionsFunc()
- test_makeNotifyFn()
- test_collectClosedWaiters()
- test_trimEmptyLeaves()
- test_deleteIfEmpty()
- test_deleteEmptyTopics()
- test_removeClosedWaiter()
- test_reapClosedWaiters()
- test_everyNthCall()
- test_runReaper()
- test_NewWithPrefix()
- test_New()
- test_asPublicMessage()
- test_queueT_Publish()
- test_registerConsumerFn()
- test_registerWaiterFn()
- test_makeConsumeOneFn()
- test_makeConsumeAllFn()
- test_makeWaitFn()
- test_runConsumer()
- test_tryFinding()
- test_queueT_Subscribe()
- test_queueT_WaitFor()
- test_unsubscribeIfExistsFn()
- test_queueT_Unsubscribe()
- test_cleanSubscriptions()
- test_queueT_Close()
- test_topicGetopt()
- test_topicConsumerGetopt()
- test_inExec()
- test_outExec()
- test_commitExec()
- test_deadExec()
- test_listDeadExec()
- test_replayExec()
- test_sizeExec()
- test_countExec()
- test_hasDataExec()
- test_usage()
- test_getopt()
- test_runCommand()
- test_commands()
-}
diff --git a/tests/functional/consume-one-produce-many/fiinha.go b/tests/functional/consume-one-produce-many/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/consume-one-produce-many/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/consume-one-produce-many/main.go b/tests/functional/consume-one-produce-many/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/consume-one-produce-many/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/consumer-with-deadletter/fiinha.go b/tests/functional/consumer-with-deadletter/fiinha.go
deleted file mode 100644
index 292d327..0000000
--- a/tests/functional/consumer-with-deadletter/fiinha.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package fiinha
-
-import (
- "errors"
- "runtime"
-
- "uuid"
- g "gobang"
-)
-
-
-
-const (
- topicX = "new-event-x"
- topicY = "new-event-y"
-)
-
-var forbidden3Err = errors.New("we don't like 3")
-
-
-
-func processNewEventXToY(message Message) (UnsentMessage, error) {
- payload := string(message.Payload)
- if payload == "event 3" {
- return UnsentMessage{}, forbidden3Err
- }
-
- newPayload := []byte("processed " + payload)
- unsent := UnsentMessage{
- Topic: topicY,
- FlowID: message.FlowID,
- Payload: newPayload,
- }
- return unsent, nil
-}
-
-
-
-func MainTest() {
- g.SetLevel(g.LogLevel_None)
-
- _, file, _, ok := runtime.Caller(0)
- g.TAssertEqualS(ok, true, "can't get filename")
-
- databasePath := file + ".db"
- queue, err := New(databasePath)
- g.TErrorIf(err)
- defer queue.Close()
-
- pub := func(payload []byte, flowID uuid.UUID) {
- unsent := UnsentMessage{
- Topic: topicX,
- FlowID: flowID,
- Payload: payload,
- }
- _, err := queue.Publish(unsent)
- g.TErrorIf(err)
- }
-
-
- g.Testing("we can WaitFor() a message after a deadletter", func() {
- flowID := uuid.New()
-
- handlerFn := func(message Message) error {
- messageY, err := processNewEventXToY(message)
- if err != nil {
- return err
- }
-
- _, err = queue.Publish(messageY)
- return err
- }
- queue.Subscribe(topicX, "main-worker", handlerFn)
- defer queue.Unsubscribe(topicX, "main-worker")
-
- pub([]byte("event 1"), uuid.New())
- pub([]byte("event 2"), uuid.New())
- pub([]byte("event 3"), uuid.New())
- pub([]byte("event 4"), uuid.New())
- pub([]byte("event 5"), flowID)
-
- given := <- queue.WaitFor(topicY, flowID, "waiter").Channel
- g.TAssertEqual(given, []byte("processed event 5"))
- })
-}
diff --git a/tests/functional/consumer-with-deadletter/main.go b/tests/functional/consumer-with-deadletter/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/consumer-with-deadletter/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/custom-prefix/fiinha.go b/tests/functional/custom-prefix/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/custom-prefix/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/custom-prefix/main.go b/tests/functional/custom-prefix/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/custom-prefix/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/distinct-consumers-separate-instances/fiinha.go b/tests/functional/distinct-consumers-separate-instances/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/distinct-consumers-separate-instances/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/distinct-consumers-separate-instances/main.go b/tests/functional/distinct-consumers-separate-instances/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/distinct-consumers-separate-instances/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/flow-id/fiinha.go b/tests/functional/flow-id/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/flow-id/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/flow-id/main.go b/tests/functional/flow-id/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/flow-id/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/idempotency/fiinha.go b/tests/functional/idempotency/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/idempotency/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/idempotency/main.go b/tests/functional/idempotency/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/idempotency/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/new-instance-takeover/fiinha.go b/tests/functional/new-instance-takeover/fiinha.go
deleted file mode 100644
index 5e6ad4b..0000000
--- a/tests/functional/new-instance-takeover/fiinha.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package fiinha
-
-import (
- "runtime"
- "os"
-
- "uuid"
- g "gobang"
-)
-
-
-
-const topic = "topic"
-
-
-
-func pub(queue IQueue, topic string, flowID uuid.UUID) {
- unsent := UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: []byte{},
- }
- _, err := queue.Publish(unsent)
- g.TErrorIf(err)
-}
-
-func handlerFn(publish func(uuid.UUID)) func(Message) error {
- return func(message Message) error {
- publish(message.FlowID)
- return nil
- }
-}
-
-func startInstance(
- dbpath string,
- instanceID int,
- name string,
-) (IQueue, error) {
- iqueue, err := New(dbpath)
- g.TErrorIf(err)
- queue := iqueue.(queueT)
-
- notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
- queries, err := initDB(dbpath, defaultPrefix, notifyFn, instanceID)
- g.TErrorIf(err)
-
- err = queue.queries.close()
- g.TErrorIf(err)
-
- queue.queries = queries
-
- pub_ := func(topic string) func(uuid.UUID) {
- return func(flowID uuid.UUID) {
- pub(queue, topic, flowID)
- }
- }
-
- individual := "individual-" + name
- shared := "shared"
-
- queue.Subscribe(topic, individual, handlerFn(pub_(individual)))
- queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name)))
-
- return queue, nil
-}
-
-
-
-func MainTest() {
- g.Init()
-
- _, file, _, ok := runtime.Caller(0)
- g.TAssertEqualS(ok, true, "can't get filename")
-
- dbpath := file + ".db"
- instanceID1 := os.Getpid()
- instanceID2 := instanceID1 + 1
-
- flowID1 := uuid.New()
- flowID2 := uuid.New()
-
- g.Testing("new instances take ownership of topic+name combo", func() {
- q1, err := startInstance(dbpath, instanceID1, "first")
- g.TErrorIf(err)
- defer q1.Close()
-
- pub(q1, topic, uuid.New())
- pub(q1, topic, uuid.New())
- pub(q1, topic, flowID1)
-
- <- q1.WaitFor("individual-first", flowID1, "w").Channel
- <- q1.WaitFor( "shared-first", flowID1, "w").Channel
-
- q2, err := startInstance(dbpath, instanceID2, "second")
- g.TErrorIf(err)
- defer q2.Close()
-
- <- q2.WaitFor("individual-second", flowID1, "w").Channel
-
- pub(q2, topic, uuid.New())
- pub(q2, topic, uuid.New())
- pub(q2, topic, flowID2)
-
- // FIXME: notify multiple instances so we can add this:
- // <- q2.WaitFor("individual-first", flowID2, "w").Channel
- <- q2.WaitFor("individual-second", flowID2, "w").Channel
- <- q2.WaitFor( "shared-second", flowID2, "w").Channel
- })
-}
diff --git a/tests/functional/new-instance-takeover/main.go b/tests/functional/new-instance-takeover/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/new-instance-takeover/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/wait-after-publish/fiinha.go b/tests/functional/wait-after-publish/fiinha.go
deleted file mode 100644
index 71b9b56..0000000
--- a/tests/functional/wait-after-publish/fiinha.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package fiinha
-
-import (
- "runtime"
-
- "uuid"
- g "gobang"
-)
-
-
-
-const topic = "topic"
-
-
-
-func MainTest() {
- _, file, _, ok := runtime.Caller(0)
- g.TAssertEqualS(ok, true, "can't get filename")
-
- databasePath := file + ".db"
- queue, err := New(databasePath)
- g.TErrorIf(err)
- defer queue.Close()
-
- pub := func(flowID uuid.UUID, payload []byte) {
- unsent := UnsentMessage{
- Topic: topic,
- FlowID: flowID,
- Payload: payload,
- }
- _, err := queue.Publish(unsent)
- g.TErrorIf(err)
- }
-
-
- g.Testing("we can WaitFor() a message before its publishing", func() {
- flowID := uuid.New()
- waiter := queue.WaitFor(topic, flowID, "waiter").Channel
-
- pub(flowID, []byte("payload before"))
-
- given := <- waiter
- g.TAssertEqual(given, []byte("payload before"))
- })
-
- g.Testing("we can also do it after its publishing", func() {
- flowID := uuid.New()
-
- pub(flowID, []byte("payload after"))
-
- given := <- queue.WaitFor(topic, flowID, "waiter").Channel
- g.TAssertEqual(given, []byte("payload after"))
- })
-}
diff --git a/tests/functional/wait-after-publish/main.go b/tests/functional/wait-after-publish/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/wait-after-publish/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/functional/waiter/fiinha.go b/tests/functional/waiter/fiinha.go
deleted file mode 100644
index 6a3ca47..0000000
--- a/tests/functional/waiter/fiinha.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package fiinha
-
-func MainTest() {
- // FIXME
-}
diff --git a/tests/functional/waiter/main.go b/tests/functional/waiter/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/functional/waiter/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/api-check/fiinha.go b/tests/fuzz/api-check/fiinha.go
deleted file mode 100644
index 86801de..0000000
--- a/tests/fuzz/api-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func api(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- // FIXME
- if n > 1 {
- if n < 2 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- fuzzTargets := []testing.InternalFuzzTarget{
- { "api", api },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/api-check/main.go b/tests/fuzz/api-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/api-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/cli-check/fiinha.go b/tests/fuzz/cli-check/fiinha.go
deleted file mode 100644
index 1cb6f37..0000000
--- a/tests/fuzz/cli-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/cli-check/main.go b/tests/fuzz/cli-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/cli-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/equal-produced-consumed-order-check/fiinha.go b/tests/fuzz/equal-produced-consumed-order-check/fiinha.go
deleted file mode 100644
index ef2e72a..0000000
--- a/tests/fuzz/equal-produced-consumed-order-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME: produced order is identical to consumed order
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/equal-produced-consumed-order-check/main.go b/tests/fuzz/equal-produced-consumed-order-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/equal-produced-consumed-order-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/exactly-once-check/fiinha.go b/tests/fuzz/exactly-once-check/fiinha.go
deleted file mode 100644
index 6ac1fb1..0000000
--- a/tests/fuzz/exactly-once-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME: a message is consumed exactly once
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/exactly-once-check/main.go b/tests/fuzz/exactly-once-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/exactly-once-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/queries-check/fiinha.go b/tests/fuzz/queries-check/fiinha.go
deleted file mode 100644
index 1cb6f37..0000000
--- a/tests/fuzz/queries-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/queries-check/main.go b/tests/fuzz/queries-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/queries-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/fuzz/total-order-check/fiinha.go b/tests/fuzz/total-order-check/fiinha.go
deleted file mode 100644
index cb5aa61..0000000
--- a/tests/fuzz/total-order-check/fiinha.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package fiinha
-
-import (
- "os"
- "testing"
- "testing/internal/testdeps"
-)
-
-
-
-func queries(f *testing.F) {
- f.Fuzz(func(t *testing.T, n int) {
- if n > 154 {
- if n < 155 {
- t.Errorf("Failed n: %v\n", n)
- }
- }
- })
-}
-
-
-
-func MainTest() {
- // FIXME: a consumer gets the messages in total order
- fuzzTargets := []testing.InternalFuzzTarget{
- { "queries", queries },
- }
-
- deps := testdeps.TestDeps{}
- tests := []testing.InternalTest {}
- benchmarks := []testing.InternalBenchmark{}
- examples := []testing.InternalExample {}
- m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples)
- os.Exit(m.Run())
-}
diff --git a/tests/fuzz/total-order-check/main.go b/tests/fuzz/total-order-check/main.go
deleted file mode 120000
index f67563d..0000000
--- a/tests/fuzz/total-order-check/main.go
+++ /dev/null
@@ -1 +0,0 @@
-../../main.go \ No newline at end of file
diff --git a/tests/integration.sh b/tests/integration.sh
deleted file mode 100755
index fcb62ca..0000000
--- a/tests/integration.sh
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/sh
-set -eu
-
-exit
diff --git a/tests/main.go b/tests/main.go
deleted file mode 100644
index 789b267..0000000
--- a/tests/main.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package main
-
-import "fiinha"
-
-func main() {
- fiinha.MainTest()
-}
diff --git a/tests/queries.sql b/tests/queries.sql
deleted file mode 100644
index 241f419..0000000
--- a/tests/queries.sql
+++ /dev/null
@@ -1,387 +0,0 @@
-
--- createTables.sql:
--- write:
- CREATE TABLE IF NOT EXISTS "fiinha_payloads" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
- topic TEXT NOT NULL,
- payload BLOB NOT NULL
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "fiinha_payloads_topic"
- ON "fiinha_payloads"(topic);
-
- CREATE TABLE IF NOT EXISTS "fiinha_messages" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
- uuid BLOB NOT NULL UNIQUE,
- flow_id BLOB NOT NULL,
- payload_id INTEGER NOT NULL
- REFERENCES "fiinha_payloads"(id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "fiinha_messages_flow_id"
- ON "fiinha_messages"(flow_id);
-
- CREATE TABLE IF NOT EXISTS "fiinha_offsets" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
- consumer TEXT NOT NULL,
- message_id INTEGER NOT NULL
- REFERENCES "fiinha_messages"(id),
- instance_id INTEGER NOT NULL,
- UNIQUE (consumer, message_id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "fiinha_offsets_consumer"
- ON "fiinha_offsets"(consumer);
-
- CREATE TABLE IF NOT EXISTS "fiinha_deadletters" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- uuid BLOB NOT NULL UNIQUE,
- consumer TEXT NOT NULL,
- message_id INTEGER NOT NULL
- REFERENCES "fiinha_messages"(id),
- instance_id INTEGER NOT NULL,
- UNIQUE (consumer, message_id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "fiinha_deadletters_consumer"
- ON "fiinha_deadletters"(consumer);
-
- CREATE TABLE IF NOT EXISTS "fiinha_replays" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- deadletter_id INTEGER NOT NULL UNIQUE
- REFERENCES "fiinha_deadletters"(id) ,
- message_id INTEGER NOT NULL UNIQUE
- REFERENCES "fiinha_messages"(id)
- ) STRICT;
-
- CREATE TABLE IF NOT EXISTS "fiinha_owners" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- topic TEXT NOT NULL,
- consumer TEXT NOT NULL,
- owner_id INTEGER NOT NULL,
- UNIQUE (topic, consumer)
- ) STRICT;
-
- CREATE TRIGGER IF NOT EXISTS "fiinha_check_instance_owns_topic"
- BEFORE INSERT ON "fiinha_offsets"
- WHEN NEW.instance_id != (
- SELECT owner_id FROM "fiinha_owners"
- WHERE topic = (
- SELECT "fiinha_payloads".topic
- FROM "fiinha_payloads"
- JOIN "fiinha_messages" ON "fiinha_payloads".id =
- "fiinha_messages".payload_id
- WHERE "fiinha_messages".id = NEW.message_id
- ) AND consumer = NEW.consumer
- )
- BEGIN
- SELECT RAISE(
- ABORT,
- 'instance does not own topic/consumer combo'
- );
- END;
-
- CREATE TRIGGER IF NOT EXISTS "fiinha_check_can_publish_deadletter"
- BEFORE INSERT ON "fiinha_deadletters"
- WHEN NEW.instance_id != (
- SELECT owner_id FROM "fiinha_owners"
- WHERE topic = (
- SELECT "fiinha_payloads".topic
- FROM "fiinha_payloads"
- JOIN "fiinha_messages" ON "fiinha_payloads".id =
- "fiinha_messages".payload_id
- WHERE "fiinha_messages".id = NEW.message_id
- ) AND consumer = NEW.consumer
- )
- BEGIN
- SELECT RAISE(
- ABORT,
- 'Instance does not own topic/consumer combo'
- );
- END;
-
-
--- read:
-
--- owner:
-
--- take.sql:
--- write:
- INSERT INTO "fiinha_owners" (topic, consumer, owner_id)
- VALUES (?, ?, ?)
- ON CONFLICT (topic, consumer) DO
- UPDATE SET owner_id=excluded.owner_id;
-
-
--- read:
-
--- owner:
-
--- publish.sql:
--- write:
- INSERT INTO "fiinha_payloads" (topic, payload)
- VALUES (?, ?);
-
- INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id)
- VALUES (?, ?, last_insert_rowid());
-
-
--- read:
- SELECT id, timestamp FROM "fiinha_messages"
- WHERE uuid = ?;
-
-
--- owner:
-
--- find.sql:
--- write:
-
--- read:
- SELECT
- "fiinha_messages".id,
- "fiinha_messages".timestamp,
- "fiinha_messages".uuid,
- "fiinha_payloads".payload
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_messages".flow_id = ?
- ORDER BY "fiinha_messages".id DESC
- LIMIT 1;
-
-
--- owner:
-
--- next.sql:
--- write:
-
--- read:
- SELECT
- (
- SELECT owner_id FROM "fiinha_owners"
- WHERE
- topic = ? AND
- consumer = ?
- LIMIT 1
- ) AS owner_id,
- "fiinha_messages".id,
- "fiinha_messages".timestamp,
- "fiinha_messages".uuid,
- "fiinha_messages".flow_id,
- "fiinha_payloads".payload
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_messages".id NOT IN (
- SELECT message_id FROM "fiinha_offsets"
- WHERE consumer = ?
- )
- ORDER BY "fiinha_messages".id ASC
- LIMIT 1;
-
-
--- owner:
-
--- pending.sql:
--- write:
-
--- read:
- SELECT
- "fiinha_messages".id,
- "fiinha_messages".timestamp,
- "fiinha_messages".uuid,
- "fiinha_messages".flow_id,
- "fiinha_payloads".topic,
- "fiinha_payloads".payload
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_messages".id NOT IN (
- SELECT message_id FROM "fiinha_offsets"
- WHERE consumer = ?
- )
- ORDER BY "fiinha_messages".id ASC;
-
-
--- owner:
- SELECT owner_id FROM "fiinha_owners"
- WHERE
- topic = ? AND
- consumer = ?;
-
-
--- commit.sql:
--- write:
- INSERT INTO "fiinha_offsets" (consumer, message_id, instance_id)
- VALUES (?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?);
-
-
--- read:
-
--- owner:
-
--- toDead.sql:
--- write:
- INSERT INTO "fiinha_offsets"
- ( consumer, message_id, instance_id)
- VALUES ( ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?);
-
- INSERT INTO "fiinha_deadletters"
- (uuid, consumer, message_id, instance_id)
- VALUES (?, ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?);
-
-
--- read:
-
--- owner:
-
--- replay.sql:
--- write:
- INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id)
- SELECT
- ?,
- "fiinha_messages".flow_id,
- "fiinha_messages".payload_id
- FROM "fiinha_messages"
- JOIN "fiinha_deadletters" ON
- "fiinha_messages".id = "fiinha_deadletters".message_id
- WHERE "fiinha_deadletters".uuid = ?;
-
- INSERT INTO "fiinha_replays" (deadletter_id, message_id)
- VALUES (
- (SELECT id FROM "fiinha_deadletters" WHERE uuid = ?),
- last_insert_rowid()
- );
-
-
--- read:
- SELECT
- "fiinha_messages".id,
- "fiinha_messages".timestamp,
- "fiinha_messages".flow_id,
- "fiinha_payloads".topic,
- "fiinha_payloads".payload
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE "fiinha_messages".uuid = ?;
-
-
--- owner:
-
--- oneDead.sql:
--- write:
-
--- read:
- SELECT
- "fiinha_deadletters".uuid,
- "fiinha_offsets".timestamp,
- "fiinha_messages".uuid
- FROM "fiinha_deadletters"
- JOIN "fiinha_offsets" ON
- "fiinha_deadletters".message_id = "fiinha_offsets".message_id
- JOIN "fiinha_messages" ON
- "fiinha_deadletters".message_id = "fiinha_messages".id
- JOIN "fiinha_payloads" ON
- "fiinha_messages".payload_id = "fiinha_payloads".id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_deadletters".consumer = ? AND
- "fiinha_offsets".consumer = ? AND
- "fiinha_deadletters".id NOT IN (
- SELECT deadletter_id FROM "fiinha_replays"
- )
- ORDER BY "fiinha_deadletters".id ASC
- LIMIT 1;
-
-
--- owner:
-
--- allDead.sql:
--- write:
-
--- read:
- SELECT
- "fiinha_deadletters".uuid,
- "fiinha_deadletters".message_id,
- "fiinha_offsets".timestamp,
- "fiinha_offsets".consumer,
- "fiinha_messages".timestamp,
- "fiinha_messages".uuid,
- "fiinha_messages".flow_id,
- "fiinha_payloads".topic,
- "fiinha_payloads".payload
- FROM "fiinha_deadletters"
- JOIN "fiinha_offsets" ON
- "fiinha_deadletters".message_id = "fiinha_offsets".message_id
- JOIN "fiinha_messages" ON
- "fiinha_deadletters".message_id = "fiinha_messages".id
- JOIN "fiinha_payloads" ON
- "fiinha_messages".payload_id = "fiinha_payloads".id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_deadletters".consumer = ? AND
- "fiinha_offsets".consumer = ? AND
- "fiinha_deadletters".id NOT IN (
- SELECT deadletter_id FROM "fiinha_replays"
- )
- ORDER BY "fiinha_deadletters".id ASC;
-
-
--- owner:
-
--- size.sql:
--- write:
-
--- read:
- SELECT
- COUNT(1) as size
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_messages".payload_id = "fiinha_payloads".id
- WHERE "fiinha_payloads".topic = ?;
-
-
--- owner:
-
--- count.sql:
--- write:
-
--- read:
- SELECT
- COUNT(1) as count
- FROM "fiinha_messages"
- JOIN "fiinha_offsets" ON
- "fiinha_messages".id = "fiinha_offsets".message_id
- JOIN "fiinha_payloads" ON
- "fiinha_messages".payload_id = "fiinha_payloads".id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_offsets".consumer = ?;
-
-
--- owner:
-
--- hasData.sql:
--- write:
-
--- read:
- SELECT 1 as data
- FROM "fiinha_messages"
- JOIN "fiinha_payloads" ON
- "fiinha_payloads".id = "fiinha_messages".payload_id
- WHERE
- "fiinha_payloads".topic = ? AND
- "fiinha_messages".id NOT IN (
- SELECT message_id FROM "fiinha_offsets"
- WHERE consumer = ?
- )
- LIMIT 1;
-
-
--- owner: