diff options
Diffstat (limited to 'tests')
57 files changed, 0 insertions, 7069 deletions
diff --git a/tests/benchmarks/deadletters/fiinha.go b/tests/benchmarks/deadletters/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/deadletters/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/deadletters/main.go b/tests/benchmarks/deadletters/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/deadletters/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/lookup/fiinha.go b/tests/benchmarks/lookup/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/lookup/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/lookup/main.go b/tests/benchmarks/lookup/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/lookup/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/multiple-consumers/fiinha.go b/tests/benchmarks/multiple-consumers/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/multiple-consumers/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/multiple-consumers/main.go b/tests/benchmarks/multiple-consumers/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/multiple-consumers/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/multiple-produces/fiinha.go b/tests/benchmarks/multiple-produces/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/multiple-produces/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/multiple-produces/main.go b/tests/benchmarks/multiple-produces/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/multiple-produces/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/reaper/fiinha.go b/tests/benchmarks/reaper/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/reaper/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/reaper/main.go b/tests/benchmarks/reaper/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/reaper/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/replay/fiinha.go b/tests/benchmarks/replay/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/replay/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/replay/main.go b/tests/benchmarks/replay/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/replay/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/single-consumer/fiinha.go b/tests/benchmarks/single-consumer/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/single-consumer/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/single-consumer/main.go b/tests/benchmarks/single-consumer/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/single-consumer/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/single-producer/fiinha.go b/tests/benchmarks/single-producer/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/single-producer/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/single-producer/main.go b/tests/benchmarks/single-producer/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/single-producer/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/subscribe/fiinha.go b/tests/benchmarks/subscribe/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/subscribe/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/subscribe/main.go b/tests/benchmarks/subscribe/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/subscribe/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/unsubscribe/fiinha.go b/tests/benchmarks/unsubscribe/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/unsubscribe/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/unsubscribe/main.go b/tests/benchmarks/unsubscribe/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/unsubscribe/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/benchmarks/waiter/fiinha.go b/tests/benchmarks/waiter/fiinha.go deleted file mode 100644 index 9c0b641..0000000 --- a/tests/benchmarks/waiter/fiinha.go +++ /dev/null @@ -1,24 +0,0 @@ -package fiinha - -import ( - "flag" - "time" -) - - - -var nFlag = flag.Int( - "n", - 1_000, - "The number of iterations to execute", -) - -func MainTest() { - // FIXME - flag.Parse() - n := *nFlag - - for i := 0; i < n; i++ { - time.Sleep(time.Millisecond * 1) - } -} diff --git a/tests/benchmarks/waiter/main.go b/tests/benchmarks/waiter/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/benchmarks/waiter/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/cli-opts.sh b/tests/cli-opts.sh deleted file mode 100755 index fcb62ca..0000000 --- a/tests/cli-opts.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -set -eu - -exit diff --git a/tests/fiinha.go b/tests/fiinha.go deleted file mode 100644 index 0901190..0000000 --- a/tests/fiinha.go +++ /dev/null @@ -1,5889 +0,0 @@ -package fiinha - -import ( - "bytes" - "database/sql" - "errors" - "fmt" - "io" - "log/slog" - "os" - "reflect" - "sort" - "strings" - "sync" - "time" - - "golite" - "uuid" - g "gobang" -) - - - -var instanceID = os.Getpid() - - - -func test_defaultPrefix() { - g.TestStart("defaultPrefix") - - g.Testing("the defaultPrefix is valid", func() { - g.TErrorIf(g.ValidateSQLTablePrefix(defaultPrefix)) - }) -} - -func test_tryRollback() { - g.TestStart("tryRollback()") - - myErr := errors.New("bottom error") - - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - defer db.Close() - - - g.Testing("the error is propagated if rollback doesn't fail", func() { - tx, err := db.Begin() - g.TErrorIf(err) - - err = tryRollback(tx, myErr) - g.TAssertEqual(err, myErr) - }) - - g.Testing("a wrapped error when rollback fails", func() { - tx, err := db.Begin() - g.TErrorIf(err) - - err = tx.Commit() - g.TErrorIf(err) - - err = tryRollback(tx, myErr) - g.TAssertEqual(reflect.DeepEqual(err, myErr), false) - g.TAssertEqual(errors.Is(err, myErr), true) - }) -} - -func test_inTx() { - g.TestStart("inTx()") - - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - defer db.Close() - - - g.Testing("when fn() errors, we propagate it", func() { - myErr := errors.New("to be propagated") - err := inTx(db, func(tx *sql.Tx) error { - return myErr - }) - g.TAssertEqual(err, myErr) - }) - - g.Testing("on nil error we get nil", func() { - err := inTx(db, func(tx *sql.Tx) error { - return nil - }) - g.TErrorIf(err) - }) -} - -func test_serialized() { - // FIXME -} - -func test_execSerialized() { - // FIXME -} - -func test_createTables() { - g.TestStart("createTables()") - - const ( - dbpath = golite.InMemory - prefix = defaultPrefix - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - defer db.Close() - - - g.Testing("tables exist afterwards", func() { - const tmpl_read = ` - SELECT id FROM "%s_messages" LIMIT 1; - ` - qRead := fmt.Sprintf(tmpl_read, prefix) - - _, err := db.Exec(qRead) - g.TErrorNil(err) - - err = createTables(db, prefix) - g.TErrorIf(err) - - _, err = db.Exec(qRead) - g.TErrorIf(err) - }) - - g.Testing("we can do it multiple times", func() { - g.TErrorIf(g.SomeError( - createTables(db, prefix), - createTables(db, prefix), - createTables(db, prefix), - )) - }) -} - -func test_takeStmt() { - g.TestStart("takeStmt()") - - const ( - topic = "take() topic" - consumer = "take() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - g.TErrorIf(takeErr) - defer g.SomeFnError( - takeClose, - db.Close, - ) - - const tmpl = ` - SELECT owner_id from "%s_owners" - WHERE - topic = ? AND - consumer = ?; - ` - sqlOwner := fmt.Sprintf(tmpl, prefix) - - - g.Testing("when there is no owner, we become it", func() { - var ownerID int - err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) - g.TAssertEqual(err, sql.ErrNoRows) - - err = take(topic, consumer) - g.TErrorIf(err) - - err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) - g.TErrorIf(err) - g.TAssertEqual(ownerID, instanceID) - }) - - g.Testing("if there is already an owner, we overtake it", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - var ownerID int - err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) - g.TErrorIf(err) - g.TAssertEqual(ownerID, instanceID) - - err = take(topic, consumer) - g.TErrorIf(err) - - err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) - g.TErrorIf(err) - g.TAssertEqual(ownerID, otherCfg.instanceID) - }) - g.Testing("no error if closed more than once", func() { - g.TErrorIf(g.SomeError( - takeClose(), - takeClose(), - takeClose(), - )) - }) -} - -func test_publishStmt() { - g.TestStart("publishStmt()") - - const ( - topic = "publish() topic" - payloadStr = "publish() payload" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - publish, publishClose, publishErr := publishStmt(cfg) - g.TErrorIf(publishErr) - defer g.SomeFnError( - publishClose, - db.Close, - ) - - - g.Testing("we can publish a message", func() { - messageID := uuid.New() - message, err := publish(unsent, messageID) - g.TErrorIf(err) - - g.TAssertEqual(message.id, int64(1)) - g.TAssertEqual(message.uuid, messageID) - g.TAssertEqual(message.topic, topic) - g.TAssertEqual(message.flowID, flowID) - g.TAssertEqual(message.payload, payload) - }) - - g.Testing("we can publish the same message repeatedly", func() { - messageID1 := uuid.New() - messageID2 := uuid.New() - message1, err1 := publish(unsent, messageID1) - message2, err2 := publish(unsent, messageID2) - g.TErrorIf(g.SomeError(err1, err2)) - - g.TAssertEqual(message1.id, message2.id - 1) - g.TAssertEqual(message1.topic, message2.topic) - g.TAssertEqual(message1.flowID, message2.flowID) - g.TAssertEqual(message1.payload, message2.payload) - - g.TAssertEqual(message1.uuid, messageID1) - g.TAssertEqual(message2.uuid, messageID2) - }) - - g.Testing("publishing a message with the same UUID errors", func() { - messageID := uuid.New() - message1, err1 := publish(unsent, messageID) - _, err2 := publish(unsent, messageID) - g.TErrorIf(err1) - - g.TAssertEqual(message1.uuid, messageID) - g.TAssertEqual(message1.topic, topic) - g.TAssertEqual(message1.flowID, flowID) - g.TAssertEqual(message1.payload, payload) - - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - publishClose(), - publishClose(), - publishClose(), - )) - }) -} - -func test_findStmt() { - g.TestStart("findStmt()") - - const ( - topic = "find() topic" - payloadStr = "find() payload" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - publish, publishClose, publishErr := publishStmt(cfg) - find, findClose, findErr := findStmt(cfg) - g.TErrorIf(g.SomeError( - publishErr, - findErr, - )) - defer g.SomeFnError( - publishClose, - findClose, - db.Close, - ) - - pub := func(flowID uuid.UUID) uuid.UUID { - unsentWithFlowID := unsent - unsentWithFlowID.FlowID = flowID - messageID := uuid.New() - _, err := publish(unsentWithFlowID, messageID) - g.TErrorIf(err) - return messageID - } - - - g.Testing("we can find a message by topic and flowID", func() { - flowID := uuid.New() - messageID := pub(flowID) - message, err := find(topic, flowID) - g.TErrorIf(err) - - g.TAssertEqual(message.uuid, messageID) - g.TAssertEqual(message.topic, topic) - g.TAssertEqual(message.flowID, flowID) - g.TAssertEqual(message.payload, payload) - }) - - g.Testing("a non-existent message gives us an error", func() { - message, err := find(topic, uuid.New()) - g.TAssertEqual(message, messageT{}) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("findig twice yields the exact same message", func() { - flowID := uuid.New() - messageID := pub(flowID) - message1, err1 := find(topic, flowID) - message2, err2 := find(topic, flowID) - g.TErrorIf(g.SomeError(err1, err2)) - - g.TAssertEqual(message1.uuid, messageID) - g.TAssertEqual(message1, message2) - }) - - g.Testing("returns the latest entry if multiple are available", func() { - flowID := uuid.New() - - _ , err0 := find(topic, flowID) - pub(flowID) - message1, err1 := find(topic, flowID) - pub(flowID) - message2, err2 := find(topic, flowID) - - g.TAssertEqual(err0, sql.ErrNoRows) - g.TErrorIf(g.SomeError(err1, err2)) - g.TAssertEqual(message1.uuid == message2.uuid, false) - g.TAssertEqual(message1.id < message2.id, true) - }) - - g.Testing("no error if closed more than once", func() { - g.TErrorIf(g.SomeError( - findClose(), - findClose(), - findClose(), - )) - }) -} - -func test_nextStmt() { - g.TestStart("nextStmt()") - - const ( - topic = "next() topic" - payloadStr = "next() payload" - consumer = "next() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - next, nextClose, nextErr := nextStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - g.TErrorIf(takeErr) - g.TErrorIf(publishErr) - g.TErrorIf(nextErr) - g.TErrorIf(commitErr) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - nextErr, - commitErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - nextClose, - commitClose, - db.Close, - ) - - pub := func(topic string) messageT { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message - } - - - g.Testing("we get an error on empty topic", func() { - _, err := next(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("we don't get messages from other topics", func() { - pub("other topic") - _, err := next(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("we can get the next message", func() { - expectedMessage := pub(topic) - pub(topic) - pub(topic) - message, err := next(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(message, expectedMessage) - }) - - g.Testing("we keep getting the next until we commit", func() { - message1, err1 := next(topic, consumer) - message2, err2 := next(topic, consumer) - g.TErrorIf(commit(consumer, message1.uuid)) - message3, err3 := next(topic, consumer) - g.TErrorIf(g.SomeError(err1, err2, err3)) - - g.TAssertEqual(message1, message2) - g.TAssertEqual(message2.uuid != message3.uuid, true) - }) - - g.Testing("each consumer has its own next message", func() { - g.TErrorIf(take(topic, "other consumer")) - message1, err1 := next(topic, consumer) - message2, err2 := next(topic, "other consumer") - g.TErrorIf(g.SomeError(err1, err2)) - g.TAssertEqual(message1.uuid != message2.uuid, true) - }) - - g.Testing("error when we're not the owner", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - _, err := next(topic, consumer) - g.TErrorIf(err) - - err = take(topic, consumer) - g.TErrorIf(err) - - _, err = next(topic, consumer) - g.TAssertEqual(err, fmt.Errorf( - notOwnerErrorFmt, - otherCfg.instanceID, - topic, - consumer, - instanceID, - )) - }) - - g.Testing("we can close more than once", func() { - g.TErrorIf(g.SomeError( - nextClose(), - nextClose(), - nextClose(), - )) - }) -} - -func test_messageEach() { - g.TestStart("messageEach()") - - const ( - topic = "messageEach() topic" - payloadStr = "messageEach() payload" - consumer = "messageEach() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - pending, pendingClose, pendingErr := pendingStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - pendingErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - pendingClose, - db.Close, - ) - - pub := func() uuid.UUID { - message, err := publish(unsent, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - g.TErrorIf(take(topic, consumer)) - - - g.Testing("not called on empty set", func() { - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - messageEach(rows, func(messageT) error { - g.Unreachable() - return nil - }) - }) - - g.Testing("the callback is called once for each entry", func() { - messageIDs := []uuid.UUID{ - pub(), - pub(), - pub(), - } - - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - var collectedIDs []uuid.UUID - err = messageEach(rows, func(message messageT) error { - collectedIDs = append(collectedIDs, message.uuid) - return nil - }) - g.TErrorIf(err) - - g.TAssertEqual(collectedIDs, messageIDs) - }) - - g.Testing("we halt if the timestamp is ill-formatted", func() { - messageID := pub() - message_id_bytes := messageID[:] - pub() - pub() - pub() - - const tmplUpdate = ` - UPDATE "%s_messages" - SET timestamp = '01/01/1970' - WHERE uuid = ?; - ` - sqlUpdate := fmt.Sprintf(tmplUpdate, prefix) - _, err := db.Exec(sqlUpdate, message_id_bytes) - g.TErrorIf(err) - - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - n := 0 - err = messageEach(rows, func(messageT) error { - n++ - return nil - }) - - g.TAssertEqual( - err, - &time.ParseError{ - Layout: time.RFC3339Nano, - Value: "01/01/1970", - LayoutElem: "2006", - ValueElem: "01/01/1970", - Message: "", - }, - ) - g.TAssertEqual(n, 3) - - const tmplDelete = ` - DELETE FROM "%s_messages" - WHERE uuid = ?; - ` - sqlDelete := fmt.Sprintf(tmplDelete, prefix) - _, err = db.Exec(sqlDelete, message_id_bytes) - g.TErrorIf(err) - }) - - g.Testing("we halt if the callback returns an error", func() { - myErr := errors.New("callback error early return") - - rows1, err1 := pending(topic, consumer) - g.TErrorIf(err1) - - n1 := 0 - err1 = messageEach(rows1, func(messageT) error { - n1++ - if n1 == 4 { - return myErr - } - return nil - }) - - rows2, err2 := pending(topic, consumer) - g.TErrorIf(err2) - - n2 := 0 - err2 = messageEach(rows2, func(messageT) error { - n2++ - return nil - }) - - g.TAssertEqual(err1, myErr) - g.TErrorIf(err2) - g.TAssertEqual(n1, 4) - g.TAssertEqual(n2, 6) - }) - - g.Testing("noop when given nil for *sql.Rows", func() { - err := messageEach(nil, func(messageT) error { - g.Unreachable() - return nil - }) - g.TErrorIf(err) - }) -} - -func test_pendingStmt() { - g.TestStart("pendingStmt()") - - const ( - topic = "pending() topic" - payloadStr = "pending() payload" - consumer = "pending() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - pending, pendingClose, pendingErr := pendingStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - pendingErr, - commitErr, - toDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - pendingClose, - commitClose, - toDeadClose, - db.Close, - ) - - pub := func(topic string) messageT { - g.TErrorIf(take(topic, consumer)) - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message - } - g.TErrorIf(take(topic, consumer)) - - collectPending := func(topic string, consumer string) []messageT { - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - var messages []messageT - err = messageEach(rows, func(message messageT) error { - messages = append(messages, message) - return nil - }) - g.TErrorIf(err) - return messages - } - - - g.Testing("an empty database has 0 pending items", func() { - g.TAssertEqual(len(collectPending(topic, consumer)), 0) - }) - - g.Testing("after publishing we get all messages", func() { - expected := []messageT{ - pub(topic), - pub(topic), - } - - g.TAssertEqualI(collectPending(topic, consumer), expected) - }) - - g.Testing("we get the same messages when calling again", func() { - messages1 := collectPending(topic, consumer) - messages2 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - g.TAssertEqualI(messages1, messages2) - }) - - g.Testing("we don't get messages from other topics", func() { - pub("other topic") - - g.TAssertEqual(len(collectPending(topic, consumer)), 2) - g.TAssertEqual(len(collectPending("other topic", consumer)), 1) - }) - - g.Testing("after others commit, pending still returns them", func() { - g.TErrorIf(take(topic, "other consumer")) - messages1 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - g.TErrorIf( - commit("other consumer", messages1[0].uuid), - ) - - messages2 := collectPending(topic, consumer) - g.TAssertEqualI(messages1, messages2) - }) - - g.Testing("committing other topic doesn't change current", func() { - messages1 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - - message := pub("other topic") - - g.TErrorIf(commit(consumer, message.uuid)) - - messages2 := collectPending(topic, consumer) - g.TAssertEqualI(messages1, messages2) - }) - - g.Testing("after commiting, pending doesn't return them again", func() { - messages1 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - - g.TErrorIf(commit(consumer, messages1[0].uuid)) - - messages2 := collectPending(topic, consumer) - g.TAssertEqual(len(messages2), 1) - g.TAssertEqual(messages2[0], messages1[1]) - - g.TErrorIf(commit(consumer, messages1[1].uuid)) - - messages3 := collectPending(topic, consumer) - g.TAssertEqual(len(messages3), 0) - }) - - g.Testing("on deadletter, pending also doesn't return them", func() { - messages0 := collectPending(topic, consumer) - g.TAssertEqual(len(messages0), 0) - - message1 := pub(topic) - message2 := pub(topic) - - messages1 := collectPending(topic, consumer) - g.TAssertEqual(len(messages1), 2) - - err = toDead(consumer, message1.uuid, uuid.New()) - g.TErrorIf(err) - - messages2 := collectPending(topic, consumer) - g.TAssertEqual(len(messages2), 1) - g.TAssertEqual(messages2[0], message2) - - err = toDead(consumer, message2.uuid, uuid.New()) - g.TErrorIf(err) - - messages3 := collectPending(topic, consumer) - g.TAssertEqual(len(messages3), 0) - }) - - g.Testing("if commits are unordered, pending is still sorted", func() { - message1 := pub(topic) - message2 := pub(topic) - message3 := pub(topic) - - g.TAssertEqual(collectPending(topic, consumer), []messageT{ - message1, - message2, - message3, - }) - - g.TErrorIf(commit(consumer, message2.uuid)) - g.TAssertEqual(collectPending(topic, consumer), []messageT{ - message1, - message3, - }) - - g.TErrorIf(commit(consumer, message1.uuid)) - g.TAssertEqual(collectPending(topic, consumer), []messageT{ - message3, - }) - - g.TErrorIf(commit(consumer, message3.uuid)) - g.TAssertEqual(len(collectPending(topic, consumer)), 0) - }) - - g.Testing("when we're not the owners we get nothing", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - message1 := pub(topic) - message2 := pub(topic) - message3 := pub(topic) - message4 := pub(topic) - message5 := pub(topic) - - expected := []messageT{ - message1, - message2, - message3, - message4, - message5, - } - - g.TAssertEqual(collectPending(topic, consumer), expected) - - err := take(topic, consumer) - g.TErrorIf(err) - - rows, err := pending(topic, consumer) - g.TErrorIf(err) - - err = messageEach(rows, func(messageT) error { - g.Unreachable() - return nil - }) - g.TErrorIf(err) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - pendingClose(), - pendingClose(), - pendingClose(), - )) - }) -} - -func test_commitStmt() { - g.TestStart("commitStmt()") - - const ( - topic = "commit() topic" - payloadStr = "commit() payload" - consumer = "commit() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - commitErr, - toDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - commitClose, - toDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - cmt := func(consumer string, messageID uuid.UUID) error { - g.TErrorIf(take(topic, consumer)) - - return commit(consumer, messageID) - } - - - g.Testing("we can't commit twice", func() { - messageID := pub(topic) - - err1 := cmt(consumer, messageID) - err2 := cmt(consumer, messageID) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("we can't commit non-existent messages", func() { - err := cmt(consumer, uuid.New()) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintNotNull, - ) - }) - - g.Testing("multiple consumers may commit a message", func() { - messageID := pub(topic) - - g.TErrorIf(g.SomeError( - cmt(consumer, messageID), - cmt("other consumer", messageID), - cmt("yet another consumer", messageID), - )) - }) - - g.Testing("a consumer can commit to multiple topics", func() { - messageID1 := pub(topic) - messageID2 := pub("other topic") - messageID3 := pub("yet another topic") - - g.TErrorIf(g.SomeError( - cmt(consumer, messageID1), - cmt(consumer, messageID2), - cmt(consumer, messageID3), - )) - }) - - g.Testing("a consumer can consume many messages from a topic", func() { - messageID1 := pub(topic) - messageID2 := pub(topic) - messageID3 := pub(topic) - - g.TErrorIf(g.SomeError( - cmt(consumer, messageID1), - cmt(consumer, messageID2), - cmt(consumer, messageID3), - )) - }) - - g.Testing("we can't commit a dead message", func() { - messageID := pub(topic) - - err1 := toDead(consumer, messageID, uuid.New()) - err2 := cmt(consumer, messageID) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("error if we don't own the topic/consumer", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - messageID := pub(topic) - - err := take(topic, consumer) - g.TErrorIf(err) - - err = commit(consumer, messageID) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintTrigger, - ) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - commitClose(), - commitClose(), - commitClose(), - )) - }) -} - -func test_toDeadStmt() { - g.TestStart("toDeadStmt()") - - const ( - topic = "toDead() topic" - payloadStr = "toDead() payload" - consumer = "toDead() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - commitErr, - toDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - commitClose, - toDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - asDead := func( - consumer string, - messageID uuid.UUID, - deadletterID uuid.UUID, - ) error { - g.TErrorIf(take(topic, consumer)) - return toDead(consumer, messageID, deadletterID) - } - - - g.Testing("we can't mark as dead twice", func() { - messageID := pub(topic) - - err1 := asDead(consumer, messageID, uuid.New()) - err2 := asDead(consumer, messageID, uuid.New()) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("we can't reuse a deadletter id", func() { - messageID1 := pub(topic) - messageID2 := pub(topic) - deadletterID := uuid.New() - - err1 := asDead(consumer, messageID1, deadletterID) - err2 := asDead(consumer, messageID2, deadletterID) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - - }) - - g.Testing("we can't mark as dead non-existent messages", func() { - err := asDead(consumer, uuid.New(), uuid.New()) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintNotNull, - ) - }) - - g.Testing("multiple consumers may mark a message as dead", func() { - messageID := pub(topic) - - g.TErrorIf(g.SomeError( - asDead(consumer, messageID, uuid.New()), - asDead("another consumer", messageID, uuid.New()), - asDead("yet another consumer", messageID, uuid.New()), - )) - }) - - g.Testing("a consumer can mark as dead in multiple topics", func() { - messageID1 := pub(topic) - messageID2 := pub("other topic") - messageID3 := pub("yet other topic") - - g.TErrorIf(g.SomeError( - asDead(consumer, messageID1, uuid.New()), - asDead(consumer, messageID2, uuid.New()), - asDead(consumer, messageID3, uuid.New()), - )) - }) - - g.Testing("a consumer can produce many deadletters in a topic", func() { - messageID1 := pub(topic) - messageID2 := pub(topic) - messageID3 := pub(topic) - - g.TErrorIf(g.SomeError( - asDead(consumer, messageID1, uuid.New()), - asDead(consumer, messageID2, uuid.New()), - asDead(consumer, messageID3, uuid.New()), - )) - }) - - g.Testing("a consumer can intercalate commits and deadletters", func() { - messageID1 := pub(topic) - messageID2 := pub(topic) - messageID3 := pub(topic) - messageID4 := pub(topic) - messageID5 := pub(topic) - - g.TErrorIf(g.SomeError( - asDead(consumer, messageID1, uuid.New()), - commit(consumer, messageID2), - commit(consumer, messageID3), - asDead(consumer, messageID4, uuid.New()), - commit(consumer, messageID5), - )) - }) - - g.Testing("we can't mark a committed message as dead", func() { - messageID := pub(topic) - - err1 := commit(consumer, messageID) - err2 := asDead(consumer, messageID, uuid.New()) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("error if we don't own the message's consumer/topic", func() { - otherCfg := cfg - otherCfg.instanceID = instanceID + 1 - - messageID1 := pub(topic) - messageID2 := pub(topic) - - take, takeClose, takeErr := takeStmt(otherCfg) - g.TErrorIf(takeErr) - defer takeClose() - - err := toDead(consumer, messageID1, uuid.New()) - g.TErrorIf(err) - - err = take(topic, consumer) - g.TErrorIf(err) - - err = toDead(consumer, messageID2, uuid.New()) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintTrigger, - ) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - toDeadClose(), - toDeadClose(), - toDeadClose(), - )) - }) -} - -func test_replayStmt() { - g.TestStart("replayStmt()") - - const ( - topic = "replay() topic" - payloadStr = "replay() payload" - consumer = "replay() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - replay, replayClose, replayErr := replayStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - replayErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - replayClose, - db.Close, - ) - - pub := func() messageT { - message, err := publish(unsent, uuid.New()) - g.TErrorIf(err) - return message - } - g.TErrorIf(take(topic, consumer)) - - - g.Testing("we can replay a message", func() { - message := pub() - deadletterID := uuid.New() - replayedID := uuid.New() - - err1 := toDead(consumer, message.uuid, deadletterID) - replayed, err2 := replay(deadletterID, replayedID) - g.TErrorIf(g.SomeError(err1, err2)) - - g.TAssertEqual(replayed.uuid, replayedID) - g.TAssertEqual(replayed.id == message.id, false) - g.TAssertEqual(replayed.uuid == message.uuid, false) - }) - - g.Testing("a replayed message keeps its payload", func() { - message := pub() - deadletterID := uuid.New() - err := toDead(consumer, message.uuid, deadletterID) - g.TErrorIf(err) - - replayed, err := replay(deadletterID, uuid.New()) - g.TErrorIf(err) - g.TAssertEqual(message.flowID, replayed.flowID) - g.TAssertEqual(message.payload, replayed.payload) - }) - - g.Testing("we can't replay a dead message twice", func() { - message := pub() - deadletterID := uuid.New() - - err := toDead(consumer, message.uuid, deadletterID) - g.TErrorIf(err) - - _, err1 := replay(deadletterID, uuid.New()) - _, err2 := replay(deadletterID, uuid.New()) - g.TErrorIf(err1) - g.TAssertEqual( - err2.(golite.Error).ExtendedCode, - golite.ErrConstraintUnique, - ) - }) - - g.Testing("we cant replay non-existent messages", func() { - _, err := replay(uuid.New(), uuid.New()) - g.TAssertEqual( - err.(golite.Error).ExtendedCode, - golite.ErrConstraintNotNull, - ) - }) - - g.Testing("messages can die and then be replayed many times", func() { - message := pub() - deadletterID1 := uuid.New() - deadletterID2 := uuid.New() - - err := toDead(consumer, message.uuid, deadletterID1) - g.TErrorIf(err) - - replayed1, err := replay(deadletterID1, uuid.New()) - g.TErrorIf(err) - - err = toDead(consumer, replayed1.uuid, deadletterID2) - g.TErrorIf(err) - - replayed2, err := replay(deadletterID2, uuid.New()) - g.TErrorIf(err) - - g.TAssertEqual(message.flowID, replayed1.flowID) - g.TAssertEqual(replayed1.flowID, replayed2.flowID) - }) - - g.Testing("no actual closing occurs", func() { - g.TErrorIf(g.SomeError( - replayClose(), - replayClose(), - replayClose(), - )) - }) -} - -func test_oneDeadStmt() { - g.TestStart("oneDeadStmt()") - - const ( - topic = "oneDead() topic" - payloadStr = "oneDead() payload" - consumer = "oneDead() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - replay, replayClose, replayErr := replayStmt(cfg) - oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - replayErr, - oneDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - replayClose, - oneDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - - g.Testing("error on missing deadletters", func() { - _, err := oneDead(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("deadletters on other topics don't show for us", func() { - err := toDead(consumer, pub("other topic"), uuid.New()) - g.TErrorIf(err) - - _, err = oneDead(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("deadletters for other consumers don't show for use", func() { - g.TErrorIf(take(topic, "other consumer")) - err := toDead("other consumer", pub(topic), uuid.New()) - g.TErrorIf(err) - - _, err = oneDead(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) - - g.Testing("after being replayed deadletters aren't returned", func() { - messageID1 := uuid.New() - messageID2 := uuid.New() - messageID3 := uuid.New() - - err1 := toDead(consumer, pub(topic), messageID1) - err2 := toDead(consumer, pub(topic), messageID2) - err3 := toDead(consumer, pub(topic), messageID3) - g.TErrorIf(g.SomeError(err1, err2, err3)) - - deadletter, err := oneDead(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(deadletter.uuid, messageID1) - - _, err = replay(messageID2, uuid.New()) - g.TErrorIf(err) - - deadletter, err = oneDead(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(deadletter.uuid, messageID1) - - _, err = replay(messageID1, uuid.New()) - g.TErrorIf(err) - - deadletter, err = oneDead(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(deadletter.uuid, messageID3) - - _, err = replay(messageID3, uuid.New()) - g.TErrorIf(err) - - _, err = oneDead(topic, consumer) - g.TAssertEqual(err, sql.ErrNoRows) - }) -} - -func test_deadletterEach() { - g.TestStart("deadletterEach") - - const ( - topic = "deadletterEach() topic" - payloadStr = "deadletterEach() payload" - consumer = "deadletterEach() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - allDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - allDeadClose, - db.Close, - ) - - pub := func() uuid.UUID { - message, err := publish(unsent, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - dead := func(messageID uuid.UUID) uuid.UUID { - deadletterID := uuid.New() - err := toDead(consumer, messageID, deadletterID) - g.TErrorIf(err) - - return deadletterID - } - g.TErrorIf(take(topic, consumer)) - - - g.Testing("not called on empty set", func() { - rows, err := allDead(topic, consumer) - g.TErrorIf(err) - - n := 0 - deadletterEach(rows, func(deadletterT, messageT) error { - n++ - return nil - }) - }) - - g.Testing("the callback is called once for each entry", func() { - expected := []uuid.UUID{ - dead(pub()), - dead(pub()), - dead(pub()), - } - - rows, err := allDead(topic, consumer) - g.TErrorIf(err) - - var deadletterIDs []uuid.UUID - deadletterEach(rows, func( - deadletter deadletterT, - _ messageT, - ) error { - deadletterIDs = append(deadletterIDs, deadletter.uuid) - return nil - }) - - g.TAssertEqual(deadletterIDs, expected) - }) - - g.Testing("we halt if the timestamp is ill-formatted", func() { - messageID := pub() - message_id_bytes := messageID[:] - dead(messageID) - dead(pub()) - dead(pub()) - dead(pub()) - dead(pub()) - - const tmplUpdate = ` - UPDATE "%s_offsets" - SET timestamp = '01-01-1970' - WHERE message_id IN ( - SELECT id FROM "%s_messages" WHERE uuid = ? - ); - ` - sqlUpdate := fmt.Sprintf(tmplUpdate, prefix, prefix) - _, err := db.Exec(sqlUpdate, message_id_bytes) - g.TErrorIf(err) - - rows, err := allDead(topic, consumer) - g.TErrorIf(err) - - n := 0 - err = deadletterEach(rows, func(deadletterT, messageT) error { - n++ - return nil - }) - - g.TAssertEqual( - err, - &time.ParseError{ - Layout: time.RFC3339Nano, - Value: "01-01-1970", - LayoutElem: "2006", - ValueElem: "01-01-1970", - Message: "", - }, - ) - g.TAssertEqual(n, 3) - - const tmplDelete = ` - DELETE FROM "%s_offsets" - WHERE message_id IN ( - SELECT id FROM "%s_messages" WHERE uuid = ? - ); - ` - sqlDelete := fmt.Sprintf(tmplDelete, prefix, prefix) - _, err = db.Exec(sqlDelete, message_id_bytes) - g.TErrorIf(err) - }) - - g.Testing("we halt if the callback returns an error", func() { - myErr := errors.New("early return error") - - rows1, err1 := allDead(topic, consumer) - g.TErrorIf(err1) - - n1 := 0 - err1 = deadletterEach(rows1, func(deadletterT, messageT) error { - n1++ - if n1 == 1 { - return myErr - } - return nil - }) - - rows2, err2 := allDead(topic, consumer) - g.TErrorIf(err2) - - n2 := 0 - err = deadletterEach(rows2, func(deadletterT, messageT) error { - n2++ - return nil - }) - - g.TAssertEqual(err1, myErr) - g.TErrorIf(err2) - g.TAssertEqual(n1, 1) - g.TAssertEqual(n2, 7) - }) -} - -func test_allDeadStmt() { - g.TestStart("allDeadStmt()") - - const ( - topic = "allDead() topic" - payloadStr = "allDead() payload" - consumer = "allDead() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - replay, replayClose, replayErr := replayStmt(cfg) - allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - replayErr, - allDeadErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - replayClose, - allDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - collectAll := func( - topic string, - consumer string, - ) ([]deadletterT, []messageT) { - var ( - deadletters []deadletterT - messages []messageT - ) - eachFn := func( - deadletter deadletterT, - message messageT, - ) error { - deadletters = append(deadletters, deadletter) - messages = append(messages, message) - return nil - } - - rows, err := allDead(topic, consumer) - g.TErrorIf(err) - - err = deadletterEach(rows, eachFn) - g.TErrorIf(err) - - return deadletters, messages - } - - - g.Testing("no entry on empty deadletters", func() { - deadletterIDs, _ := collectAll(topic, consumer) - g.TAssertEqual(len(deadletterIDs), 0) - }) - - g.Testing("deadletters on other topics don't show up", func() { - err := toDead(consumer, pub("other topic"), uuid.New()) - g.TErrorIf(err) - - deadletters, _ := collectAll(topic, consumer) - g.TAssertEqual(len(deadletters), 0) - }) - - g.Testing("deadletters of other consumers don't show up", func() { - g.TErrorIf(take(topic, "other consumer")) - err := toDead("other consumer", pub(topic), uuid.New()) - g.TErrorIf(err) - - deadletterIDs, _ := collectAll(topic, consumer) - g.TAssertEqual(len(deadletterIDs), 0) - }) - - g.Testing("deadletters are given in order", func() { - deadletterIDs := []uuid.UUID{ - uuid.New(), - uuid.New(), - uuid.New(), - } - messageIDs := []uuid.UUID{ - pub(topic), - pub(topic), - pub(topic), - } - - err1 := toDead(consumer, messageIDs[0], deadletterIDs[0]) - err2 := toDead(consumer, messageIDs[1], deadletterIDs[1]) - err3 := toDead(consumer, messageIDs[2], deadletterIDs[2]) - g.TErrorIf(g.SomeError(err1, err2, err3)) - - deadletters, messages := collectAll(topic, consumer) - g.TAssertEqual(deadletters[0].uuid, deadletterIDs[0]) - g.TAssertEqual(deadletters[1].uuid, deadletterIDs[1]) - g.TAssertEqual(deadletters[2].uuid, deadletterIDs[2]) - g.TAssertEqual( messages[0].uuid, messageIDs[0]) - g.TAssertEqual( messages[1].uuid, messageIDs[1]) - g.TAssertEqual( messages[2].uuid, messageIDs[2]) - g.TAssertEqual(len(deadletters), 3) - g.TAssertEqual(len(messages), 3) - }) - - g.Testing("after being replayed, they stop appearing", func() { - deadletters, _ := collectAll(topic, consumer) - g.TAssertEqual(len(deadletters), 3) - - _, err := replay(deadletters[0].uuid, uuid.New()) - g.TErrorIf(err) - collecteds, _ := collectAll(topic, consumer) - g.TAssertEqual(len(collecteds), 2) - - _, err = replay(deadletters[1].uuid, uuid.New()) - g.TErrorIf(err) - collecteds, _ = collectAll(topic, consumer) - g.TAssertEqual(len(collecteds), 1) - - _, err = replay(deadletters[2].uuid, uuid.New()) - g.TErrorIf(err) - collecteds, _ = collectAll(topic, consumer) - g.TAssertEqual(len(collecteds), 0) - }) -} - -func test_sizeStmt() { - g.TestStart("sizeStmt()") - - const ( - topic = "size() topic" - payloadStr = "size() payload" - consumer = "size() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - replay, replayClose, replayErr := replayStmt(cfg) - oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) - size, sizeClose, sizeErr := sizeStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - toDeadErr, - replayErr, - oneDeadErr, - sizeErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - toDeadClose, - replayClose, - sizeClose, - oneDeadClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - - g.Testing("0 on empty topic", func() { - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("other topics don't fall into our count", func() { - pub("other topic") - - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("otherwise we just get the sum", func() { - pub(topic) - pub(topic) - pub(topic) - pub(topic) - pub(topic) - - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 5) - }) - - g.Testing("deadletters aren't taken into account", func() { - sixthMessageID := pub(topic) - err := toDead(consumer, sixthMessageID, uuid.New()) - g.TErrorIf(err) - - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 6) - }) - - g.Testing("after replay, deadletters increases the size", func() { - deadletter, err := oneDead(topic, consumer) - g.TErrorIf(err) - - _, err = replay(deadletter.uuid, uuid.New()) - g.TErrorIf(err) - - n, err := size(topic) - g.TErrorIf(err) - g.TAssertEqual(n, 7) - }) -} - -func test_countStmt() { - g.TestStart("countStmt()") - - const ( - topic = "count() topic" - payloadStr = "count() payload" - consumer = "count() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - next, nextClose, nextErr := nextStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - count, countClose, countErr := countStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - nextErr, - commitErr, - toDeadErr, - countErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - nextClose, - commitClose, - toDeadClose, - countClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - - g.Testing("0 on empty topic", func() { - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("other topics don't add to our count", func() { - err := commit(consumer, pub("other topic")) - g.TErrorIf(err) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("other consumers don't influence our count", func() { - g.TErrorIf(take(topic, "other consumer")) - err := commit("other consumer", pub(topic)) - g.TErrorIf(err) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("unconsumed messages don't count", func() { - pub(topic) - pub(topic) - pub(topic) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 0) - }) - - g.Testing("consumed messages do count", func() { - message, err := next(topic, consumer) - g.TErrorIf(err) - err = commit(consumer, message.uuid) - g.TErrorIf(err) - - message, err = next(topic, consumer) - g.TErrorIf(err) - err = commit(consumer, message.uuid) - g.TErrorIf(err) - - message, err = next(topic, consumer) - g.TErrorIf(err) - err = commit(consumer, message.uuid) - g.TErrorIf(err) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 3) - }) - - g.Testing("deadletters count as consumed", func() { - message, err := next(topic, consumer) - g.TErrorIf(err) - - err = toDead(consumer, message.uuid, uuid.New()) - g.TErrorIf(err) - - n, err := count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(n, 4) - }) -} - -func test_hasDataStmt() { - g.TestStart("hasDataStmt()") - - const ( - topic = "hasData() topic" - payloadStr = "hasData() payload" - consumer = "hasData() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - db, err := sql.Open(golite.DriverName, dbpath) - g.TErrorIf(err) - g.TErrorIf(createTables(db, prefix)) - - cfg := dbconfigT{ - shared: db, - dbpath: dbpath, - prefix: prefix, - instanceID: instanceID, - } - take, takeClose, takeErr := takeStmt(cfg) - publish, publishClose, publishErr := publishStmt(cfg) - next, nextClose, nextErr := nextStmt(cfg) - commit, commitClose, commitErr := commitStmt(cfg) - toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) - hasData, hasDataClose, hasDataErr := hasDataStmt(cfg) - g.TErrorIf(g.SomeError( - takeErr, - publishErr, - nextErr, - commitErr, - toDeadErr, - hasDataErr, - )) - defer g.SomeFnError( - takeClose, - publishClose, - nextClose, - commitClose, - toDeadClose, - hasDataClose, - db.Close, - ) - - pub := func(topic string) uuid.UUID { - g.TErrorIf(take(topic, consumer)) - - unsentWithTopic := unsent - unsentWithTopic.Topic = topic - message, err := publish(unsentWithTopic, uuid.New()) - g.TErrorIf(err) - return message.uuid - } - - - g.Testing("false on empty topic", func() { - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, false) - }) - - g.Testing("other topics don't change the response", func() { - pub("other topic") - - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, false) - }) - - g.Testing("published messages flip the flag", func() { - pub(topic) - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, true) - }) - - g.Testing("other consumers don't influence us", func() { - g.TErrorIf(take(topic, "other consumer")) - message, err := next(topic, "other consumer") - g.TErrorIf(err) - - err = commit("other consumer", message.uuid) - g.TErrorIf(err) - - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, true) - }) - - g.Testing("consuming messages unflips the result", func() { - message, err := next(topic, consumer) - g.TErrorIf(err) - - err = commit(consumer, message.uuid) - g.TErrorIf(err) - - has, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has, false) - }) - - g.Testing("same for deadletters", func() { - has0, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has0, false) - - messageID1 := pub(topic) - messageID2 := pub(topic) - - has1, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has1, true) - - err = toDead(consumer, messageID1, uuid.New()) - g.TErrorIf(err) - err = commit(consumer, messageID2) - g.TErrorIf(err) - - has2, err := hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(has2, false) - }) -} - -func test_initDB() { - g.TestStart("initDB()") - - const ( - topic = "initDB() topic" - payloadStr = "initDB() payload" - consumer = "initDB() consumer" - dbpath = golite.InMemory - prefix = defaultPrefix - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - var messages []messageT - notifyFn := func(message messageT) { - messages = append(messages, message) - } - - queries, err := initDB(dbpath, prefix, notifyFn, instanceID) - g.TErrorIf(err) - defer queries.close() - - g.TErrorIf(queries.take(topic, consumer)) - - - g.Testing("we can perform all the wrapped operations", func() { - messageID := uuid.New() - newMessageID := uuid.New() - deadletterID := uuid.New() - - messageV1, err := queries.publish(unsent, messageID) - g.TErrorIf(err) - - messageV2, err := queries.next(topic, consumer) - g.TErrorIf(err) - - var messagesV3 []messageT - pendingFn := func(message messageT) error { - messagesV3 = append(messagesV3, message) - return nil - } - err = queries.pending(topic, consumer, pendingFn) - g.TErrorIf(err) - g.TAssertEqual(len(messagesV3), 1) - messageV3 := messagesV3[0] - - err = queries.toDead(consumer, messageID, deadletterID) - g.TErrorIf(err) - - deadletterV1, err := queries.oneDead(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(deadletterV1.uuid, deadletterID) - - var ( - deadlettersV2 []deadletterT - messagesV4 []messageT - ) - deadletterFn := func( - deadletter deadletterT, - message messageT, - ) error { - deadlettersV2 = append(deadlettersV2, deadletter) - messagesV4 = append(messagesV4, message) - return nil - } - err = queries.allDead(topic, consumer, deadletterFn) - g.TErrorIf(err) - g.TAssertEqual(len(deadlettersV2), 1) - g.TAssertEqual(deadlettersV2[0].uuid, deadletterID) - g.TAssertEqual(len(messagesV4), 1) - messageV4 := messagesV4[0] - - g.TAssertEqual(messageV1, messageV2) - g.TAssertEqual(messageV1, messageV3) - g.TAssertEqual(messageV1, messageV4) - - newMessageV1, err := queries.replay(deadletterID, newMessageID) - g.TErrorIf(err) - - err = queries.commit(consumer, newMessageID) - g.TErrorIf(err) - - newMessageV0 := messageV1 - newMessageV0.id = newMessageV1.id - newMessageV0.uuid = newMessageID - newMessageV0.timestamp = newMessageV1.timestamp - g.TAssertEqual(newMessageV1, newMessageV0) - - size, err := queries.size(topic) - g.TErrorIf(err) - g.TAssertEqual(size, 2) - - count, err := queries.count(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(count, 2) - - hasData, err := queries.hasData(topic, consumer) - g.TErrorIf(err) - g.TAssertEqual(hasData, false) - }) -} - -func test_queriesTclose() { - g.TestStart("queriesT.close()") - - const ( - dbpath = golite.InMemory - prefix = defaultPrefix - ) - - notifyFn := func(messageT) {} - queries, err := initDB(dbpath, prefix, notifyFn, instanceID) - g.TErrorIf(err) - - - g.Testing("closing more than once does not error", func() { - g.TErrorIf(g.SomeError( - queries.close(), - queries.close(), - )) - }) -} - -func test_newPinger() { - g.TestStart("newPinger()") - - g.Testing("onPing() on a closed pinger is a noop", func() { - pinger := newPinger[string]() - pinger.close() - pinger.onPing(func(s string) { - panic(s) - }) - pinger.tryPing("ignored") - }) - - g.Testing("onPing() on a closed pinger with data gets that", func() { - pinger := newPinger[string]() - pinger.tryPing("received") - pinger.tryPing("ignored") - g.TAssertEqual(pinger.closed(), false) - - pinger.close() - g.TAssertEqual(pinger.closed(), true) - - c := make(chan string) - count := 0 - go pinger.onPing(func(s string) { - if count > 0 { - panic(s) - } - count++ - - c <- s - }) - - given := <- c - g.TAssertEqual(given, "received") - }) - - g.Testing("when onPing is late, we loose messages", func() { - pinger := newPinger[string]() - pinger.tryPing("first seen") - pinger.tryPing("second dropped") - pinger.tryPing("third dropped") - - c := make(chan string) - go pinger.onPing(func(s string) { - c <- s - }) - s := <- c - - close(c) - pinger.close() - g.TAssertEqual(s, "first seen") - }) - - g.Testing("if onPing is on time, it may not loose", func() { - pinger := newPinger[string]() - pinger.tryPing("first seen") - - c := make(chan string) - go pinger.onPing(func(s string) { - c <- s - }) - - s1 := <- c - pinger.tryPing("second seen") - s2 := <- c - - close(c) - pinger.close() - g.TAssertEqual(s1, "first seen") - g.TAssertEqual(s2, "second seen") - }) - - g.Testing("if onPing takes too long, it still looses messages", func() { - pinger := newPinger[string]() - pinger.tryPing("first seen") - - c1 := make(chan string) - c2 := make(chan struct{}) - go pinger.onPing(func(s string) { - c1 <- s - c2 <- struct{}{} - }) - - s1 := <- c1 - pinger.tryPing("second seen") - pinger.tryPing("third dropped") - <- c2 - s2 := <- c1 - <- c2 - - close(c2) - close(c1) - pinger.close() - g.TAssertEqual(s1, "first seen") - g.TAssertEqual(s2, "second seen") - }) -} - -func test_makeSubscriptionsFunc() { - g.TestStart("makeSubscriptionsFunc()") - - g.Testing("we can have multiple readers", func() { - subscriptions := makeSubscriptionsFuncs() - - var ( - readStarted sync.WaitGroup - readFinished sync.WaitGroup - ) - c := make(chan struct{}) - readFn := func(subscriptionsSetM) error { - readStarted.Done() - <- c - return nil - } - addReader := func() { - readStarted.Add(1) - readFinished.Add(1) - go func() { - subscriptions.read(readFn) - readFinished.Done() - }() - } - - addReader() - addReader() - addReader() - readStarted.Wait() - c <- struct{}{} - c <- struct{}{} - c <- struct{}{} - readFinished.Wait() - }) - - g.Testing("writers are exclusive", func() { - subscriptions := makeSubscriptionsFuncs() - - var ( - readStarted sync.WaitGroup - readFinished sync.WaitGroup - writeWillStart sync.WaitGroup - writeFinished sync.WaitGroup - ) - c := make(chan string) - readFn := func(subscriptionsSetM) error { - readStarted.Done() - c <- "reader" - return nil - } - addReader := func() { - readStarted.Add(1) - readFinished.Add(1) - go func() { - subscriptions.read(readFn) - readFinished.Done() - }() - } - - writeFn := func(subscriptionsSetM) error { - c <- "writer" - return nil - } - addWriter := func() { - writeWillStart.Add(1) - writeFinished.Add(1) - go func() { - writeWillStart.Done() - subscriptions.write(writeFn) - writeFinished.Done() - }() - } - - addReader() - addReader() - addReader() - readStarted.Wait() - addWriter() - writeWillStart.Wait() - - g.TAssertEqual(<-c, "reader") - g.TAssertEqual(<-c, "reader") - g.TAssertEqual(<-c, "reader") - - readFinished.Wait() - g.TAssertEqual(<-c, "writer") - writeFinished.Wait() - }) -} - -func test_makeNotifyFn() { - g.TestStart("makeNotifyFn()") - - g.Testing("when topic is nil only top pinger gets pinged", func() { - pinger1 := newPinger[struct{}]() - pinger2 := newPinger[[]byte]() - defer pinger1.close() - defer pinger2.close() - - go pinger1.onPing(func(struct{}) { - panic("consumer pinger") - }) - go pinger2.onPing(func(payload []byte) { - panic("waiter pinger") - }) - - flowID := uuid.New() - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer-1": consumerT{ - pinger: pinger1, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter-1": waiterT{ - pinger: pinger2, - }, - }, - }, - }, - } - subsFn := func(fn func(subscriptionsSetM) error) error { - return fn(set) - } - topPinger := newPinger[struct{}]() - defer topPinger.close() - - var wg sync.WaitGroup - wg.Add(1) - go topPinger.onPing(func(struct{}) { - wg.Done() - }) - - notifyFn := makeNotifyFn(subsFn, topPinger) - - message := messageT{ - uuid: uuid.New(), - topic: "nobody is subscribed to this one", - payload: []byte("nobody with get this payload"), - } - notifyFn(message) - wg.Wait() - }) - - g.Testing("otherwise all interested subscribers get pinged", func() { - const topic = "the topic name" - - pinger1 := newPinger[struct{}]() - pinger2 := newPinger[[]byte]() - defer pinger1.close() - defer pinger2.close() - - var wg sync.WaitGroup - go pinger1.onPing(func(struct{}) { - wg.Done() - }) - go pinger2.onPing(func([]byte) { - wg.Done() - }) - wg.Add(2) - - flowID := uuid.New() - - set := subscriptionsSetM{ - topic: topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer-1": consumerT{ - pinger: pinger1, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter-1": waiterT{ - pinger: pinger2, - }, - }, - }, - }, - } - - subsFn := func(fn func(subscriptionsSetM) error) error { - return fn(set) - } - - topPinger := newPinger[struct{}]() - defer topPinger.close() - go topPinger.onPing(func(struct{}) { - wg.Done() - }) - wg.Add(1) - - notifyFn := makeNotifyFn(subsFn, topPinger) - - message := messageT{ - uuid: uuid.New(), - topic: topic, - flowID: flowID, - payload: []byte("ignored in this test"), - } - notifyFn(message) - wg.Wait() - }) -} - -func test_collectClosedWaiters() { - g.TestStart("collectClosedWaiter()") - - g.Testing("collects all the reports to be closed", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - flowID4 := uuid.New() - flowID5 := uuid.New() - - mkwaiter := func(closed bool) waiterT { - fn := func() bool { - return closed - } - return waiterT{ - closed: &fn, - } - } - - set := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": mkwaiter(false), - "waiter-2": mkwaiter(true), - "waiter-3": mkwaiter(true), - }, - flowID2: map[string]waiterT{ - "waiter-4": mkwaiter(true), - "waiter-5": mkwaiter(false), - }, - }, - }, - "topic-2": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID3: map[string]waiterT{ - "waiter-1": mkwaiter(false), - "waiter-2": mkwaiter(false), - }, - flowID4: map[string]waiterT{ - "waiter-3": mkwaiter(true), - "waiter-4": mkwaiter(true), - "waiter-5": mkwaiter(true), - "waiter-6": mkwaiter(true), - }, - }, - }, - "topic-3": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID5: map[string]waiterT{}, - }, - }, - "topic-4": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := map[string]map[uuid.UUID][]string{ - "topic-1": map[uuid.UUID][]string{ - flowID1: []string{ - "waiter-2", - "waiter-3", - }, - flowID2: []string{ - "waiter-4", - }, - }, - "topic-2": map[uuid.UUID][]string{ - flowID3: []string{}, - flowID4: []string{ - "waiter-3", - "waiter-4", - "waiter-5", - "waiter-6", - }, - }, - "topic-3": map[uuid.UUID][]string{ - flowID5: []string{}, - }, - "topic-4": map[uuid.UUID][]string{}, - } - - given := collectClosedWaiters(set) - - // sort names for equality - for _, waitersIndex := range given { - for _, names := range waitersIndex { - sort.Strings(names) - } - } - g.TAssertEqual(given, expected) - }) -} - -func test_trimEmptyLeaves() { - g.TestStart("trimEmptyLeaves()") - - g.Testing("noop on an empty index", func() { - input := map[string]map[uuid.UUID][]string{} - expected := map[string]map[uuid.UUID][]string{} - - trimEmptyLeaves(input) - g.TAssertEqual(input, expected) - }) - - g.Testing("simplifies tree when it can", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - flowID4 := uuid.New() - - input := map[string]map[uuid.UUID][]string{ - "topic-1": map[uuid.UUID][]string{ - flowID1: []string{ - "waiter-1", - }, - flowID2: []string{}, - }, - "topic-2": map[uuid.UUID][]string{ - flowID3: []string{}, - flowID4: []string{}, - }, - "topic-3": map[uuid.UUID][]string{}, - } - - expected := map[string]map[uuid.UUID][]string{ - "topic-1": map[uuid.UUID][]string{ - flowID1: []string{ - "waiter-1", - }, - }, - } - - trimEmptyLeaves(input) - g.TAssertEqual(input, expected) - }) - - g.Testing("fully prune tree if possible", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - flowID4 := uuid.New() - flowID5 := uuid.New() - - input := map[string]map[uuid.UUID][]string{ - "topic-1": map[uuid.UUID][]string{}, - "topic-2": map[uuid.UUID][]string{}, - "topic-3": map[uuid.UUID][]string{}, - "topic-4": map[uuid.UUID][]string{ - flowID1: []string{}, - }, - "topic-5": map[uuid.UUID][]string{ - flowID2: []string{}, - flowID3: []string{}, - flowID4: []string{}, - flowID5: []string{}, - }, - } - - expected := map[string]map[uuid.UUID][]string{} - - trimEmptyLeaves(input) - g.TAssertEqual(input, expected) - }) -} - -func test_deleteIfEmpty() { - g.TestStart("deleteIfEmpty()") - - g.Testing("noop if there are consumers", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - }, - } - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - }, - } - expected2 := subscriptionsSetM{} - - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected1) - - delete(set["topic"].consumers, "consumer") - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected2) - }) - - g.Testing("noop if there are waiters", func() { - flowID := uuid.New() - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: nil, - }, - }, - } - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: nil, - }, - }, - } - expected2 := subscriptionsSetM{} - - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected1) - - delete(set["topic"].waiters, flowID) - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected2) - }) - - g.Testing("otherwise deletes when empty", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{} - - deleteIfEmpty(set, "topic") - g.TAssertEqual(set, expected) - }) - - g.Testing("unrelated topics are left untouched", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - deleteIfEmpty(set, "another-topic") - g.TAssertEqual(set, expected) - }) -} - -func test_deleteEmptyTopics() { - g.TestStart("deleteEmptyTopics()") - - g.Testing("cleans up all empty topics from the set", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - - set := subscriptionsSetM{ - "empty": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - "has-consumers": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - "has-waiters": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: nil, - }, - }, - "has-both": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID2: nil, - }, - }, - "has-neither": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{ - "has-consumers": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - "has-waiters": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: nil, - }, - }, - "has-both": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID2: nil, - }, - }, - } - - deleteEmptyTopics(set) - g.TAssertEqual(set, expected) - }) -} - -func test_removeClosedWaiter() { - g.TestStart("removeClosedWaiter()") - - g.Testing("removes from set all that we request", func() { - flowID0 := uuid.New() - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - - set := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": waiterT{}, - "waiter-2": waiterT{}, - }, - flowID2: map[string]waiterT{ - "waiter-3": waiterT{}, - "waiter-4": waiterT{}, - "waiter-5": waiterT{}, - }, - }, - }, - "topic-2": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID3: map[string]waiterT{ - "waiter-6": waiterT{}, - "waiter-7": waiterT{}, - "waiter-8": waiterT{}, - }, - }, - }, - "topic-3": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - input := map[string]map[uuid.UUID][]string{ - "topic-0": map[uuid.UUID][]string{ - flowID0: []string{ - "waiter-0", - }, - }, - "topic-1": map[uuid.UUID][]string{ - flowID1: []string{ - "waiter-2", - }, - flowID2: []string{ - "waiter-3", - "waiter-4", - "waiter-5", - }, - }, - "topic-2": map[uuid.UUID][]string{ - flowID3: []string{ - "waiter-6", - "waiter-7", - "waiter-8", - }, - }, - } - - expected := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": waiterT{}, - }, - }, - }, - } - - removeClosedWaiters(set, input) - g.TAssertEqual(set, expected) - }) - - g.Testing("empty flowIDs from input GET LEAKED", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - - input := map[string]map[uuid.UUID][]string{ - "topic-2": map[uuid.UUID][]string{ - flowID2: []string{ - "waiter", - }, - }, - } - - set := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{}, - }, - }, - "topic-2": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID2: map[string]waiterT{ - "waiter": waiterT{}, - }, - }, - }, - } - - expected := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{}, - }, - }, - } - - removeClosedWaiters(set, input) - g.TAssertEqual(set, expected) - }) -} - -func test_reapClosedWaiters() { - g.TestStart("reapClosedWaiters()") - - g.Testing("trimEmptyLeaves() prevents removal of empty flows", func() { - var ( - set subscriptionsSetM - readCount = 0 - writeCount = 0 - ) - readFn := func(fn func(subscriptionsSetM) error) error { - readCount++ - return fn(set) - } - writeFn := func(fn func(subscriptionsSetM) error) error { - writeCount++ - return fn(set) - } - - openFn := func() bool { - return false - } - closedFn := func() bool { - return true - } - open := waiterT{ closed: &openFn } - closed := waiterT{ closed: &closedFn } - flowID1 := uuid.New() - flowID2 := uuid.New() - - set = subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": open, - "waiter-2": open, - "waiter-3": open, - }, - flowID2: map[string]waiterT{ - "waiter-4": open, - }, - }, - }, - } - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": open, - "waiter-2": open, - "waiter-3": open, - }, - flowID2: map[string]waiterT{ - "waiter-4": open, - }, - }, - }, - } - - expected2 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-2": open, - "waiter-3": open, - }, - flowID2: map[string]waiterT{ - "waiter-4": open, - }, - }, - }, - } - - expected3 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-2": open, - "waiter-3": open, - }, - }, - }, - } - - reapClosedWaiters(readFn, writeFn) - g.TAssertEqual(readCount, 1) - g.TAssertEqual(writeCount, 0) - g.TAssertEqual(set, expected1) - - set["topic"].waiters[flowID1]["waiter-1"] = closed - reapClosedWaiters(readFn, writeFn) - g.TAssertEqual(readCount, 2) - g.TAssertEqual(writeCount, 1) - g.TAssertEqual(set, expected2) - - set["topic"].waiters[flowID2]["waiter-4"] = closed - reapClosedWaiters(readFn, writeFn) - g.TAssertEqual(readCount, 3) - g.TAssertEqual(writeCount, 2) - g.TAssertEqual(set, expected3) - }) -} - -func test_everyNthCall() { - g.TestStart("everyNthCall()") - - g.Testing("0 (incorrect) would make fn never be called", func() { - never := everyNthCall[int](0, func(int) { - panic("unreachable") - }) - for i := 0; i < 100; i++ { - never(i) - } - }) - - g.Testing("the first call is delayed to be the nth", func() { - count := 0 - values := []string{} - fn := everyNthCall[string](3, func(s string) { - count++ - values = append(values, s) - }) - - fn("1 ignored") - g.TAssertEqual(count, 0) - fn("2 ignored") - g.TAssertEqual(count, 0) - fn("3 not ignored") - g.TAssertEqual(count, 1) - - fn("4 ignored") - fn("5 ignored") - g.TAssertEqual(count, 1) - - fn("6 not ignored") - fn("7 ignored") - fn("8 ignored") - g.TAssertEqual(count, 2) - - fn("9 not ignored") - g.TAssertEqual(count, 3) - - expected := []string{ - "3 not ignored", - "6 not ignored", - "9 not ignored", - } - g.TAssertEqual(values, expected) - }) -} - -func test_runReaper() { - g.TestStart("runReaper()") - - g.Testing("debounce reapClosedWaiters `reaperSkipCount` times", func() { - set := subscriptionsSetM{} - - var ( - readCount = 0 - writeCount = 0 - ) - readFn := func(fn func(subscriptionsSetM) error) error { - readCount++ - return fn(set) - } - writeFn := func(fn func(subscriptionsSetM) error) error { - writeCount++ - return fn(set) - } - - var iterCount int - onPing := func(fn func(struct{})) { - for i := 0; i < iterCount; i++ { - fn(struct{}{}) - } - } - - iterCount = reaperSkipCount - 1 - runReaper(onPing, readFn, writeFn) - g.TAssertEqual(readCount, 0) - g.TAssertEqual(writeCount, 0) - - iterCount = reaperSkipCount - runReaper(onPing, readFn, writeFn) - g.TAssertEqual(readCount, 1) - g.TAssertEqual(writeCount, 0) - }) -} - -func test_NewWithPrefix() { - g.TestStart("NewWithPrefix()") - - g.Testing("we get an error with a bad prefix", func() { - _, err := NewWithPrefix(golite.InMemory, "a bad prefix") - g.TAssertEqual(err, g.ErrBadSQLTablePrefix) - }) - - g.Testing("otherwise we have a queueT and no errors", func() { - queue, err := NewWithPrefix(golite.InMemory, "good") - g.TErrorIf(err) - queue.Close() - - g.TAssertEqual(queue.(queueT).pinger.closed(), true) - }) -} - -func test_New() { - g.TestStart("New()") - - g.Testing("smoke test that we get a queueT", func() { - queue, err := New(golite.InMemory) - g.TErrorIf(err) - queue.Close() - - g.TAssertEqual(queue.(queueT).pinger.closed(), true) - }) -} - -func test_asPublicMessage() { - g.TestStart("asPublicMessage()") - - g.Testing("it picks the correct fields 🤷", func() { - input := messageT{ - uuid: uuid.New(), - timestamp: time.Now(), - topic: "topic", - flowID: uuid.New(), - payload: []byte("payload"), - } - - expected := Message{ - ID: input.uuid, - Timestamp: input.timestamp, - Topic: input.topic, - FlowID: input.flowID, - Payload: input.payload, - } - - given := asPublicMessage(input) - g.TAssertEqual(given, expected) - }) -} - -func test_queueT_Publish() { - g.TestStart("queueT.Publish()") - - const ( - topic = "queueT.Publish() topic" - payloadStr = "queueT.Publish() payload" - dbpath = golite.InMemory - ) - var ( - flowID = uuid.New() - payload = []byte(payloadStr) - unsent = UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - ) - - queue, err := New(dbpath) - g.TErrorIf(err) - defer queue.Close() - - - g.Testing("it just publishes and returns", func() { - message, err := queue.Publish(unsent) - g.TErrorIf(err) - - g.TAssertEqual(message.Timestamp == time.Time{}, false) - g.TAssertEqual(message.Topic, topic) - g.TAssertEqual(message.FlowID, flowID) - g.TAssertEqual(message.Payload, payload) - }) - - queue.Close() - g.TAssertEqual(queue.(queueT).pinger.closed(), true) -} - -func test_registerConsumerFn() { - g.TestStart("registerConsumerFn()") - - g.Testing("adds a new topicSubscriptionT{} if needed", func() { - consumer := consumerT{ - data: consumerDataT{ - topic: "topic", - name: "consumer", - }, - } - - set := subscriptionsSetM{} - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumer, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - registerConsumerFn(consumer)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("otherwise it just uses what exists", func() { - flowID := uuid.New() - - consumer := consumerT{ - data: consumerDataT{ - topic: "topic", - name: "consumer", - }, - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "other-consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{}, - }, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumer, - "other-consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{}, - }, - }, - } - - registerConsumerFn(consumer)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("overwrites existing consumer if desired", func() { - close1 := func() {} - consumer1 := consumerT{ - data: consumerDataT{ - topic: "topic", - name: "consumer", - }, - close: &close1, - } - - close2 := func() {} - consumer2 := consumerT{ - data: consumerDataT{ - topic: "topic", - name: "consumer", - }, - close: &close2, - } - - set := subscriptionsSetM{} - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumer1, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected2 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumer2, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - registerConsumerFn(consumer1)(set) - g.TAssertEqual(set, expected1) - g.TAssertEqual(reflect.DeepEqual(set, expected2), false) - - registerConsumerFn(consumer2)(set) - g.TAssertEqual(set, expected2) - g.TAssertEqual(reflect.DeepEqual(set, expected1), false) - }) -} - -func test_registerWaiterFn() { - g.TestStart("registerWaiterFn()") - - g.Testing("adds a new topicSubscriptionT{} if needed", func() { - flowID := uuid.New() - - waiter := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - } - - set := subscriptionsSetM{} - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter, - }, - }, - }, - } - - registerWaiterFn(waiter)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("adds a new waiters map if needed", func() { - flowID := uuid.New() - - waiter := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter, - }, - }, - }, - } - - registerWaiterFn(waiter)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("otherwise it just uses what exists", func() { - flowID := uuid.New() - - waiter := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "other-waiter": waiterT{}, - }, - }, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter, - "other-waiter": waiterT{}, - }, - }, - }, - } - - registerWaiterFn(waiter)(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("overwrites existing waiter if desired", func() { - flowID := uuid.New() - - close1 := func() {} - waiter1 := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - close: &close1, - } - - close2 := func() {} - waiter2 := waiterT{ - data: waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter", - }, - close: &close2, - } - - set := subscriptionsSetM{} - - expected1 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter1, - }, - }, - }, - } - - expected2 := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{ - "waiter": waiter2, - }, - }, - }, - } - - registerWaiterFn(waiter1)(set) - g.TAssertEqual(set, expected1) - g.TAssertEqual(reflect.DeepEqual(set, expected2), false) - - registerWaiterFn(waiter2)(set) - g.TAssertEqual(set, expected2) - g.TAssertEqual(reflect.DeepEqual(set, expected1), false) - }) -} - -func test_makeConsumeOneFn() { - g.TestStart("makeConsumeOneFn()") - - savedLogger := slog.Default() - - s := new(strings.Builder) - g.SetLoggerOutput(s) - - var ( - successCount int - errorCount int - callbackErr error - successFnErr error - errorFnErr error - messages []Message - successNames []string - successIDs []uuid.UUID - errorNames []string - errorIDs []uuid.UUID - deadletterIDs []uuid.UUID - ) - - data := consumerDataT{ - topic: "topic", - name: "name", - } - - callback := func(message Message) error { - messages = append(messages, message) - return callbackErr - } - - successFn := func(name string, messageID uuid.UUID) error { - successCount++ - successNames = append(successNames, name) - successIDs = append(successIDs, messageID) - return successFnErr - } - - errorFn := func( - name string, - messageID uuid.UUID, - deadletterID uuid.UUID, - ) error { - errorCount++ - errorNames = append(errorNames, name) - errorIDs = append(errorIDs, messageID) - deadletterIDs = append(deadletterIDs, deadletterID) - - return errorFnErr - } - - consumeOneFn := makeConsumeOneFn( - data, - callback, - successFn, - errorFn, - ) - - message1 := messageT{ uuid: uuid.New() } - message2 := messageT{ uuid: uuid.New() } - message3 := messageT{ uuid: uuid.New() } - message4 := messageT{ uuid: uuid.New() } - - - g.Testing("error from successFn() is propagated", func() { - err := consumeOneFn(message1) - g.TErrorIf(err) - g.TAssertEqual(successCount, 1) - g.TAssertEqual(errorCount, 0) - - successFnErr = errors.New("successFn() error") - err = consumeOneFn(message2) - g.TAssertEqual(err, successFnErr) - g.TAssertEqual(successCount, 2) - g.TAssertEqual(errorCount, 0) - - g.TAssertEqual(s.String(), "") - }) - - g.Testing("error from callback() is logged and dropped", func() { - *s = strings.Builder{} - - callbackErr = errors.New("callback() error") - err := consumeOneFn(message3) - g.TErrorIf(err) - g.TAssertEqual(successCount, 2) - g.TAssertEqual(errorCount, 1) - g.TAssertEqual(s.String() == "", false) - }) - - g.Testing("error from errorFn() is propagated", func() { - *s = strings.Builder{} - - errorFnErr = errors.New("errorFn() error") - err := consumeOneFn(message4) - g.TAssertEqual(err, errorFnErr) - g.TAssertEqual(successCount, 2) - g.TAssertEqual(errorCount, 2) - g.TAssertEqual(s.String() == "", false) - }) - - g.Testing("calls happened with the expected arguments", func() { - expectedMessages := []Message{ - asPublicMessage(message1), - asPublicMessage(message2), - asPublicMessage(message3), - asPublicMessage(message4), - } - - g.TAssertEqual(messages, expectedMessages) - g.TAssertEqual(successNames, []string{ "name", "name" }) - g.TAssertEqual(errorNames, []string{ "name", "name" }) - g.TAssertEqual(successIDs, []uuid.UUID{ - message1.uuid, - message2.uuid, - }) - g.TAssertEqual(errorIDs, []uuid.UUID{ - message3.uuid, - message4.uuid, - }) - g.TAssertEqual(deadletterIDs[0] == message3.uuid, false) - g.TAssertEqual(deadletterIDs[1] == message4.uuid, false) - }) - - slog.SetDefault(savedLogger) -} - -func test_makeConsumeAllFn() { - g.TestStart("makeConsumeAllFn()") - - savedLogger := slog.Default() - - s := new(strings.Builder) - g.SetLoggerOutput(s) - - data := consumerDataT{} - - consumeOneFn := func(messageT) error { - return nil - } - - var eachFnErr error - eachFn := func(string, string, func(messageT) error) error { - return eachFnErr - } - - consumeAllFn := makeConsumeAllFn(data, consumeOneFn, eachFn) - - - g.Testing("silent when eachFn() returns no error", func() { - consumeAllFn(struct{}{}) - g.TAssertEqual(s.String(), "") - }) - - g.Testing("logs warning otherwise", func() { - eachFnErr = errors.New("eachFn() error") - consumeAllFn(struct{}{}) - g.TAssertEqual(s.String() == "", false) - }) - - slog.SetDefault(savedLogger) -} - -func test_makeWaitFn() { - g.TestStart("makeWaitFn()") - - g.Testing("all it does is send the data and close things", func() { - callCount := 0 - closeFn := func() { - callCount++ - } - - c := make(chan []byte, 1) - waitFn := makeWaitFn(c, closeFn) - - payload := []byte("payload") - waitFn(payload) - given := <- c - - g.TAssertEqual(given, payload) - g.TAssertEqual(callCount, 1) - }) - - g.Testing("you can call it twice for cases of race condition", func() { - callCount := 0 - closeFn := func() { - callCount++ - } - - c := make(chan []byte, 1) - waitFn := makeWaitFn(c, closeFn) - - payload := []byte("something") - waitFn(payload) - waitFn(payload) - given1 := <- c - given2 := <- c - - g.TAssertEqual(given1, payload) - g.TAssertEqual(given2, []byte(nil)) - }) -} - -func test_runConsumer() { - g.TestStart("runConsumer()") - - g.Testing("calls consumeAllFn() at least one", func() { - onPing := func(fn func(struct{})) {} - - count := 0 - consumeAllFn := func(struct{}) { - count++ - } - - runConsumer(onPing, consumeAllFn) - g.TAssertEqual(count, 1) - }) - - g.Testing("can call consumeAllFn() (onPing + 1) times", func() { - onPing := func(fn func(struct{})) { - fn(struct{}{}) - fn(struct{}{}) - fn(struct{}{}) - } - - count := 0 - consumeAllFn := func(struct{}) { - count++ - } - - runConsumer(onPing, consumeAllFn) - g.TAssertEqual(count, 4) - }) -} - -func test_tryFinding() { - g.TestStart("tryFinding()") - - g.Testing("noop in case of failure", func() { - myErr := errors.New("find() error") - findFn := func(string, uuid.UUID) (messageT, error) { - return messageT{}, myErr - } - - count := 0 - waitFn := func([]byte) { - count++ - } - - - tryFinding(findFn, "topic", uuid.New(), waitFn) - g.TAssertEqual(count, 0) - }) - - g.Testing("calls waitFn in case of success", func() { - payload := []byte("find() payload") - - findFn := func(string, uuid.UUID) (messageT, error) { - return messageT{ payload: payload }, nil - } - - payloads := [][]byte{} - waitFn := func(payload []byte) { - payloads = append(payloads, payload) - } - - - tryFinding(findFn, "topic", uuid.New(), waitFn) - g.TAssertEqual(payloads, [][]byte{ payload }) - }) -} - -func test_queueT_Subscribe() { - g.TestStart("queueT.Subscribe()") - - set := subscriptionsSetM{} - consumed := []uuid.UUID{} - messages := []messageT{ - messageT{ uuid: uuid.New() }, - messageT{ uuid: uuid.New() }, - } - - var takeErr error - queue := queueT{ - queries: queriesT{ - take: func(string, string) error { - return takeErr - }, - pending: func( - topic string, - consumer string, - fn func(messageT) error, - ) error { - for _, message := range messages { - consumed = append( - consumed, - message.uuid, - ) - _ = fn(message) - } - return nil - }, - commit: func( - consumer string, - messageID uuid.UUID, - ) error { - return nil - }, - toDead: func(string, uuid.UUID, uuid.UUID) error { - g.Unreachable() - return nil - }, - }, - subscriptions: subscriptionsT{ - write: func(fn func(subscriptionsSetM) error) error { - return fn(set) - }, - }, - } - - - g.Testing("registers our callback in the set and calls it", func() { - var wg sync.WaitGroup - wg.Add(2) - - queue.Subscribe("topic", "consumer-1", func(Message) error { - wg.Done() - return nil - }) - defer queue.Unsubscribe("topic", "consumer-1") - - wg.Wait() - g.TAssertEqual(consumed, []uuid.UUID{ - messages[0].uuid, - messages[1].uuid, - }) - }) - - g.Testing("our callback also gets called when pinged", func() { - consumed = []uuid.UUID{} - - var wg sync.WaitGroup - wg.Add(4) - - queue.Subscribe("topic", "consumer-2", func(m Message) error { - wg.Done() - return nil - }) - defer queue.Unsubscribe("topic", "consumer-2") - - consumer := set["topic"].consumers["consumer-2"] - consumer.pinger.tryPing(struct{}{}) - - wg.Wait() - - g.TAssertEqual(consumed, []uuid.UUID{ - messages[0].uuid, - messages[1].uuid, - messages[0].uuid, - messages[1].uuid, - }) - }) - - g.Testing("we try to own the topic", func() { - takeErr = errors.New("queueT.Subscribe() 1") - - err := queue.Subscribe("topic", "name", func(Message) error { - g.Unreachable() - return nil - }) - - g.TAssertEqual(err, takeErr) - takeErr = nil - }) - - g.Testing("if we can't own the topic, we don't get registered", func() { - takeErr = errors.New("queueT.Subscribe() 2") - - err := queue.Subscribe("topic", "name", func(Message) error { - g.Unreachable() - return nil - }) - g.TErrorNil(err) - - expected := subscriptionsSetM{} - g.TAssertEqual(set, expected) - - takeErr = nil - }) -} - -func test_queueT_WaitFor() { - g.TestStart("queueT.WaitFor()") - - findErr := errors.New("find() error") - message := messageT{ - payload: []byte("queueT.WaitFor() payload"), - } - - set := subscriptionsSetM{} - - queue := queueT{ - queries: queriesT{ - find: func( - topic string, - flowID uuid.UUID, - ) (messageT, error) { - return message, findErr - }, - }, - subscriptions: subscriptionsT{ - write: func(fn func(subscriptionsSetM) error) error { - return fn(set) - }, - read: func(fn func(subscriptionsSetM) error) error { - return fn(set) - }, - }, - } - - - g.Testing("registers the waiter in the set", func() { - flowID := uuid.New() - - defer queue.WaitFor("topic", flowID, "waiter-1").Close() - - expected := waiterDataT{ - topic: "topic", - flowID: flowID, - name: "waiter-1", - } - - g.TAssertEqual( - set["topic"].waiters[flowID]["waiter-1"].data, - expected, - ) - }) - - g.Testing("the channel gets a message when waiter is pinged", func() { - flowID := uuid.New() - payload := []byte("sent payload") - - w := queue.WaitFor("topic", flowID, "waiter-2") - defer w.Close() - - waiter := set["topic"].waiters[flowID]["waiter-2"] - waiter.pinger.tryPing(payload) - - given := <- w.Channel - - g.TAssertEqual(given, payload) - g.TAssertEqual((*waiter.closed)(), true) - }) - - g.Testing("we can also WaitFor() after publishing the message", func() { - findErr = nil - flowID := uuid.New() - - w := queue.WaitFor("topic", flowID, "waiter-3") - defer w.Close() - - given := <- w.Channel - - waiter := set["topic"].waiters[flowID]["waiter-3"] - waiter.pinger.tryPing([]byte("ignored")) - - g.TAssertEqual(given, message.payload) - g.TAssertEqual((*waiter.closed)(), true) - }) - - g.Testing("if the data already exists we get it immediatelly", func() { - flowID := uuid.New() - - w := queue.WaitFor("topic", flowID, "waiter-4") - defer w.Close() - - waiter := set["topic"].waiters[flowID]["waiter-4"] - g.TAssertEqual((*waiter.closed)(), true) - - given := <- w.Channel - g.TAssertEqual(given, message.payload) - }) -} - -func test_unsubscribeIfExistsFn() { - g.TestStart("unsubscribeIfExistsFn()") - - g.Testing("noop on empty set", func() { - set := subscriptionsSetM{} - expected := subscriptionsSetM{} - unsubscribeIfExistsFn("topic", "consumer")(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("noop on missing topic", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{}, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{}, - } - - unsubscribeIfExistsFn("other-topic", "consumer")(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("noop on missing consumer", func() { - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{}, - }, - }, - } - - unsubscribeIfExistsFn("topic", "other-consumer")(set) - g.TAssertEqual(set, expected) - }) - - g.Testing("closes consumer and removes it from set", func() { - flowID := uuid.New() - - count := 0 - close := func() { - count++ - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{ - close: &close, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{}, - }, - }, - } - - expected := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID: map[string]waiterT{}, - }, - }, - } - - unsubscribeIfExistsFn("topic", "consumer")(set) - g.TAssertEqual(set, expected) - g.TAssertEqual(count, 1) - }) - - g.Testing("empty topics are also removed", func() { - count := 0 - close := func() { - count++ - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{ - close: &close, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{} - - unsubscribeIfExistsFn("topic", "consumer")(set) - g.TAssertEqual(set, expected) - g.TAssertEqual(count, 1) - }) -} - -func test_queueT_Unsubscribe() { - g.TestStart("queueT.Unsubscribe()") - - g.Testing("calls unsubscribesIfExists() via writeFn()", func() { - closed := false - close := func() { - closed = true - } - - set := subscriptionsSetM{ - "topic": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer": consumerT{ - close: &close, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - expected := subscriptionsSetM{} - - queue := queueT{ - subscriptions: subscriptionsT{ - write: func( - fn func(subscriptionsSetM) error, - ) error { - return fn(set) - }, - }, - } - - queue.Unsubscribe("topic", "consumer") - g.TAssertEqual(set, expected) - g.TAssertEqual(closed, true) - }) -} - -func test_cleanSubscriptions() { - g.TestStart("cleanSubscriptions()") - - g.Testing("all consumers and waiters get close()'d", func() { - flowID1 := uuid.New() - flowID2 := uuid.New() - flowID3 := uuid.New() - - type pairT struct{ - closed func() bool - fn func() - } - - mkclose := func() pairT { - closed := false - return pairT{ - closed: func() bool { - return closed - }, - fn: func() { - closed = true - }, - } - } - - c := mkclose() - g.TAssertEqual(c.closed(), false) - c.fn() - var x bool = c.closed() - g.TAssertEqual(x, true) - - close1 := mkclose() - close2 := mkclose() - close3 := mkclose() - close4 := mkclose() - close5 := mkclose() - close6 := mkclose() - close7 := mkclose() - close8 := mkclose() - - set := subscriptionsSetM{ - "topic-1": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer-1": consumerT{ - close: &close1.fn, - }, - "consumer-2": consumerT{ - close: &close2.fn, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID1: map[string]waiterT{ - "waiter-1": waiterT{ - close: &close3.fn, - }, - }, - flowID2: map[string]waiterT{ - "waiter-2": waiterT{ - close: &close4.fn, - }, - "waiter-3": waiterT{ - close: &close5.fn, - }, - }, - }, - }, - "topic-2": topicSubscriptionT{ - consumers: map[string]consumerT{ - "consumer-3": consumerT{ - close: &close6.fn, - }, - }, - waiters: map[uuid.UUID]map[string]waiterT{ - flowID3: map[string]waiterT{ - "waiter-4": waiterT{ - close: &close7.fn, - }, - }, - }, - }, - "topic-3": topicSubscriptionT{ - consumers: map[string]consumerT{}, - waiters: map[uuid.UUID]map[string]waiterT{}, - }, - } - - cleanSubscriptions(set) - - given := []bool{ - close1.closed(), - close2.closed(), - close3.closed(), - close4.closed(), - close5.closed(), - close6.closed(), - close7.closed(), - close8.closed(), - } - - expected := []bool{ - true, - true, - true, - true, - true, - true, - true, - false, - } - - g.TAssertEqualI(given, expected) - }) -} - -func test_queueT_Close() { - g.TestStart("queueT.Close()") - - g.Testing("clean pinger, subscriptions and queries", func() { - var ( - pingerCount = 0 - subscriptionsCount = 0 - queriesCount = 0 - - subscriptionsErr = errors.New("subscriptionsT{} error") - queriesErr = errors.New("queriesT{} error") - ) - - queue := queueT{ - queries: queriesT{ - close: func() error{ - queriesCount++ - return queriesErr - }, - }, - subscriptions: subscriptionsT{ - write: func( - func(subscriptionsSetM) error, - ) error { - subscriptionsCount++ - return subscriptionsErr - }, - }, - pinger: pingerT[struct{}]{ - close: func() { - pingerCount++ - }, - }, - } - - err := queue.Close() - g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr)) - g.TAssertEqual(pingerCount, 1) - g.TAssertEqual(subscriptionsCount, 1) - g.TAssertEqual(queriesCount, 1) - }) -} - - -func test_topicGetopt() { - g.TestStart("topicGetopt()") - - g.Testing("checks for required positional argument", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{}, - } - - argsOut, ok := topicGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "Missing TOPIC.\n") - g.TAssertEqual(ok, false) - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("success otherwise", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"a topic"}, - } - - argsOut, ok := topicGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(ok, true) - argsIn.topic = "a topic" - g.TAssertEqual(argsOut, argsIn) - }) -} - -func test_topicConsumerGetopt() { - g.TestStart("topicConsumerGetopt()") - - g.Testing("checks for TOPIC argument", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{}, - } - - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "Missing TOPIC.\n") - g.TAssertEqual(ok, false) - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("we get an error on unsupported flag", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"-Z"}, - } - - const message = "flag provided but not defined: -Z\n" - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), message) - g.TAssertEqual(ok, false) - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("we also get an error on incorrect usage of flags", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"-C"}, - } - - const message = "flag needs an argument: -C\n" - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), message) - g.TAssertEqual(ok, false) - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("we can customize the CONSUMER", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"-C", "custom consumer", "this topic"}, - } - - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(ok, true) - argsIn.topic = "this topic" - argsIn.consumer = "custom consumer" - g.TAssertEqual(argsOut, argsIn) - }) - - g.Testing("otherwise we get the default one", func() { - var w strings.Builder - argsIn := argsT{ - args: []string{"T"}, - } - - argsOut, ok := topicConsumerGetopt(argsIn, &w) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(ok, true) - argsIn.topic = "T" - argsIn.consumer = "default-consumer" - g.TAssertEqual(argsOut, argsIn) - }) -} - -func test_inExec() { - g.TestStart("inExec()") - - const ( - topic = "inExec topic" - payloadStr = "inExec payload" - ) - var ( - payload = []byte(payloadStr) - args = argsT{ - topic: topic, - } - ) - - var ( - publishErr error - messages []messageT - id int64 = 0 - ) - queries := queriesT{ - publish: func( - unsent UnsentMessage, - messageID uuid.UUID, - ) (messageT, error) { - if publishErr != nil { - return messageT{}, publishErr - } - - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: messageID, - topic: unsent.Topic, - flowID: unsent.FlowID, - payload: unsent.Payload, - } - messages = append(messages, message) - return message, nil - }, - } - - - g.Testing("messageID to output when successful", func() { - r := bytes.NewReader(payload) - var w strings.Builder - rc, err := inExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(messages[0].topic, topic) - g.TAssertEqual(messages[0].payload, payload) - g.TAssertEqual(rc, 0) - g.TAssertEqual(w.String(), messages[0].uuid.String() + "\n") - }) - - g.Testing("if reading fails, we return the error", func() { - var r *os.File - var w strings.Builder - rc, err := inExec(args, queries, r, &w) - g.TAssertEqual( - err.Error(), - "invalid argument", - ) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) - - g.Testing("if publishing fails, we return the error", func() { - publishErr = errors.New("publish() error") - r := strings.NewReader("read but not published") - var w strings.Builder - rc, err := inExec(args, queries, r, &w) - g.TAssertEqual(err, publishErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_outExec() { - g.TestStart("outExec()") - - const ( - topic = "outExec topic" - consumer = "outExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - takeErr error - nextErr error - messages []messageT - id int64 = 0 - ) - queries := queriesT{ - take: func(string, string) error { - return takeErr - }, - next: func(string, string) (messageT, error) { - if nextErr != nil { - return messageT{}, nextErr - } - - if len(messages) == 0 { - return messageT{}, sql.ErrNoRows - } - - return messages[0], nil - }, - } - pub := func(payload []byte) { - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - - - g.Testing("exit code 1 when we can't take", func() { - takeErr = errors.New("outExec() take error") - - var w strings.Builder - - rc, err := outExec(args, queries, r, &w) - g.TAssertEqual(err, takeErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - - takeErr = nil - }) - - g.Testing("exit code 3 when no message is available", func() { - var w strings.Builder - - rc, err := outExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 3) - }) - - g.Testing("we get the same message until we commit", func() { - var ( - w1 strings.Builder - w2 strings.Builder - w3 strings.Builder - ) - args := argsT{ - topic: topic, - consumer: consumer, - } - - pub([]byte("first payload")) - pub([]byte("second payload")) - - rc1, err1 := outExec(args, queries, r, &w1) - rc2, err2 := outExec(args, queries, r, &w2) - messages = messages[1:] - rc3, err3 := outExec(args, queries, r, &w3) - - g.TErrorIf(g.SomeError(err1, err2, err3)) - g.TAssertEqual(w1.String(), "first payload\n") - g.TAssertEqual(w2.String(), "first payload\n") - g.TAssertEqual(w3.String(), "second payload\n") - g.TAssertEqual(rc1, 0) - g.TAssertEqual(rc2, 0) - g.TAssertEqual(rc3, 0) - }) - - g.Testing("we propagate the error when the query fails", func() { - nextErr = errors.New("next() error") - var w strings.Builder - rc, err := outExec(args, queries, r, &w) - g.TAssertEqual(err, nextErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_commitExec() { - g.TestStart("commitExec()") - - const ( - topic = "commitExec topic" - consumer = "commitExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - takeErr error - nextErr error - commitErr error - messages []messageT - id int64 = 0 - ) - queries := queriesT{ - take: func(string, string) error { - return takeErr - }, - next: func(string, string) (messageT, error) { - if nextErr != nil { - return messageT{}, nextErr - } - - if len(messages) == 0 { - return messageT{}, sql.ErrNoRows - } - - return messages[0], nil - }, - commit: func(string, uuid.UUID) error { - if commitErr != nil { - return commitErr - } - - messages = messages[1:] - return nil - }, - } - pub := func(payload []byte) { - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - - - g.Testing("error when we can't take", func() { - takeErr = errors.New("commitExec() take error") - - var w strings.Builder - - rc, err := commitExec(args, queries, r, &w) - g.TAssertEqual(err, takeErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - - takeErr = nil - }) - - g.Testing("error when there is nothing to commit", func() { - var w strings.Builder - - rc, err := commitExec(args, queries, r, &w) - g.TAssertEqual(err, sql.ErrNoRows) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) - - g.Testing("messages get committed in order", func() { - var w strings.Builder - - pub([]byte("first payload")) - pub([]byte("second payload")) - pub([]byte("third payload")) - - message1 := messages[0] - g.TAssertEqual(message1.payload, []byte("first payload")) - - rc, err := commitExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - message2 := messages[0] - g.TAssertEqual(message2.payload, []byte("second payload")) - - rc, err = commitExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - message3 := messages[0] - g.TAssertEqual(message3.payload, []byte("third payload")) - - rc, err = commitExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - g.TAssertEqual(len(messages), 0) - }) - - g.Testing("when next() query fails, we propagate its result", func() { - nextErr = errors.New("next() error") - var w strings.Builder - rc, err := commitExec(args, queries, r, &w) - g.TAssertEqual(err, nextErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - nextErr = nil - }) - - g.Testing("we also propagate the error on commit() failure", func() { - commitErr = errors.New("commit() error") - pub([]byte{}) - var w strings.Builder - rc, err := commitExec(args, queries, r, &w) - g.TAssertEqual(err, commitErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_deadExec() { - g.TestStart("deadExec()") - - const ( - topic = "deadExec topic" - consumer = "deadExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - takeErr error - nextErr error - toDeadErr error - messages []messageT - id int64 = 0 - ) - queries := queriesT{ - take: func(string, string) error { - return takeErr - }, - next: func(string, string) (messageT, error) { - if nextErr != nil { - return messageT{}, nextErr - } - - if len(messages) == 0 { - return messageT{}, sql.ErrNoRows - } - - return messages[0], nil - }, - toDead: func( - _ string, - _ uuid.UUID, - deadletterID uuid.UUID, - ) error { - if toDeadErr != nil { - return toDeadErr - } - - messages = messages[1:] - return nil - }, - } - pub := func(payload []byte) { - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - - - g.Testing("error when we can't take", func() { - takeErr = errors.New("deadExec() take error") - - var w strings.Builder - - rc, err := deadExec(args, queries, r, &w) - g.TAssertEqual(err, takeErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - - takeErr = nil - }) - - g.Testing("error when there is nothing to mark as dead", func() { - var w strings.Builder - - rc, err := deadExec(args, queries, r, &w) - g.TAssertEqual(err, sql.ErrNoRows) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) - - g.Testing("the latest message becomes a deadletter", func() { - var w strings.Builder - - pub([]byte("first payload")) - pub([]byte("second payload")) - - message1 := messages[0] - g.TAssertEqual(message1.payload, []byte("first payload")) - - rc, err := deadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - message2 := messages[0] - g.TAssertEqual(message2.payload, []byte("second payload")) - - rc, err = deadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - g.TAssertEqual(len(messages), 0) - }) - - g.Testing("next() error is propagated", func() { - nextErr = errors.New("next() error") - var w strings.Builder - rc, err := deadExec(args, queries, r, &w) - g.TAssertEqual(err, nextErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - nextErr = nil - }) - - g.Testing("toDead() error is propagated", func() { - toDeadErr = errors.New("toDead() error") - pub([]byte{}) - var w strings.Builder - rc, err := deadExec(args, queries, r, &w) - g.TAssertEqual(err, toDeadErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_listDeadExec() { - g.TestStart("listDeadExec()") - - const ( - topic = "listDeadExec topic" - consumer = "listDeadExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - messages []messageT - deadletters []deadletterT - id int64 = 0 - errorIndex = -1 - allDeadErr = errors.New("allDead() error") - ) - queries := queriesT{ - allDead: func( - _ string, - _ string, - callback func(deadletterT, messageT) error, - ) error { - for i, deadletter := range deadletters { - if i == errorIndex { - return allDeadErr - } - - callback(deadletter, messageT{}) - } - - return nil - }, - } - pub := func() { - payload := []byte("ignored payload for this test") - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - commit := func() { - messages = messages[1:] - } - dead := func() { - message := messages[0] - now := time.Now() - deadletter := deadletterT{ - uuid: uuid.New(), - timestamp: now, - consumer: consumer, - messageID: message.uuid, - } - - messages = messages[1:] - deadletters = append(deadletters, deadletter) - } - replay := func() { - pub() - deadletters = deadletters[1:] - } - - - g.Testing("nothing is shown if topic is empty", func() { - var w strings.Builder - - rc, err := listDeadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - }) - - g.Testing("deadletters are printed in order", func() { - var w strings.Builder - - pub() - pub() - pub() - - rc, err := listDeadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - - dead() - commit() - dead() - - expected := fmt.Sprintf( - "%s\t%s\t%s\n%s\t%s\t%s\n", - deadletters[0].uuid.String(), - deadletters[0].timestamp.Format(time.RFC3339), - deadletters[0].consumer, - deadletters[1].uuid.String(), - deadletters[1].timestamp.Format(time.RFC3339), - deadletters[1].consumer, - ) - - rc, err = listDeadExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), expected) - g.TAssertEqual(rc, 0) - }) - - g.Testing("deadletters disappear after being replayed", func() { - var ( - w1 strings.Builder - w2 strings.Builder - ) - - replay() - - deadletter := deadletters[0] - expected := fmt.Sprintf( - "%s\t%s\t%s\n", - deadletter.uuid.String(), - deadletter.timestamp.Format(time.RFC3339), - deadletter.consumer, - ) - - rc, err := listDeadExec(args, queries, r, &w1) - g.TErrorIf(err) - g.TAssertEqual(w1.String(), expected) - g.TAssertEqual(rc, 0) - - replay() - - rc, err = listDeadExec(args, queries, r, &w2) - g.TErrorIf(err) - g.TAssertEqual(w2.String(), "") - g.TAssertEqual(rc, 0) - }) - - g.Testing("a database failure interrupts the output", func() { - var w strings.Builder - - pub() - pub() - pub() - dead() - dead() - dead() - - deadletter := deadletters[0] - expected := fmt.Sprintf( - "%s\t%s\t%s\n", - deadletter.uuid.String(), - deadletter.timestamp.Format(time.RFC3339), - deadletter.consumer, - ) - - errorIndex = 1 - rc, err := listDeadExec(args, queries, r, &w) - g.TAssertEqual(err, allDeadErr) - g.TAssertEqual(w.String(), expected) - g.TAssertEqual(rc, 1) - }) -} - -func test_replayExec() { - g.TestStart("replayExec()") - - const ( - topic = "replayExec topic" - consumer = "replayExec consumer" - ) - var ( - w strings.Builder - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var ( - oneDeadErr error - replayErr error - messages []messageT - deadletters []deadletterT - deadMessages []messageT - id int64 = 0 - ) - queries := queriesT{ - oneDead: func(string, string) (deadletterT, error) { - if oneDeadErr != nil { - return deadletterT{}, oneDeadErr - } - - if len(deadletters) == 0 { - return deadletterT{}, sql.ErrNoRows - } - - return deadletters[0], nil - }, - replay: func(uuid.UUID, uuid.UUID) (messageT, error) { - if replayErr != nil { - return messageT{}, replayErr - } - - message := deadMessages[0] - messages = append(messages, message) - deadletters = deadletters[1:] - deadMessages = deadMessages[1:] - return message, nil - }, - } - pub := func(payload []byte) { - id++ - now := time.Now() - message := messageT{ - id: id, - timestamp: now, - uuid: uuid.New(), - topic: topic, - flowID: uuid.New(), - payload: payload, - } - messages = append(messages, message) - } - commit := func() { - messages = messages[1:] - } - dead := func() { - message := messages[0] - deadletter := deadletterT{ uuid: uuid.New() } - - messages = messages[1:] - deadletters = append(deadletters, deadletter) - deadMessages = append(deadMessages, message) - } - next := func() string { - return string(messages[0].payload) - } - - - g.Testing("error when there is nothing to replay", func() { - rc, err := replayExec(args, queries, r, &w) - g.TAssertEqual(err, sql.ErrNoRows) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - - pub([]byte("first payload")) - pub([]byte("second payload")) - pub([]byte("third payload")) - pub([]byte("fourth payload")) - - rc, err = replayExec(args, queries, r, &w) - g.TAssertEqual(err, sql.ErrNoRows) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) - - g.Testing("deadletters are replayed in order", func() { - dead() - commit() - dead() - commit() - - rc, err := replayExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(next(), "first payload") - - commit() - - rc, err = replayExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(next(), "third payload") - }) - - g.Testing("oneDead() error is forwarded", func() { - oneDeadErr = errors.New("oneDead() error") - rc, err := replayExec(args, queries, r, &w) - g.TAssertEqual(err, oneDeadErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - oneDeadErr = nil - }) - - g.Testing("replay() error is also forwarded", func() { - pub([]byte{}) - dead() - - replayErr = errors.New("replay() error") - rc, err := replayExec(args, queries, r, &w) - g.TAssertEqual(err, replayErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - }) -} - -func test_sizeExec() { - g.TestStart("sizeExec()") - - const ( - topic = "sizeExec topic" - consumer = "sizeExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var sizeErr error - queries := queriesT{ - size: func(string) (int, error) { - if sizeErr != nil { - return -1, sizeErr - } - - return 123, nil - }, - } - - - g.Testing("it propagates the error when the query fails", func() { - sizeErr = errors.New("size() error") - var w strings.Builder - rc, err := sizeExec(args, queries, r, &w) - g.TAssertEqual(err, sizeErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - sizeErr = nil - }) - - g.Testing("otherwise it just prints what is was given", func() { - var w strings.Builder - rc, err := sizeExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "123\n") - g.TAssertEqual(rc, 0) - }) -} - -func test_countExec() { - g.TestStart("countExec()") - - const ( - topic = "countExec topic" - consumer = "countExec consumer" - ) - var ( - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - var countErr error - queries := queriesT{ - count: func(string, string) (int, error) { - if countErr != nil { - return -1, countErr - } - - return 2222, nil - }, - } - - - g.Testing("it propagates the query error", func() { - countErr = errors.New("count() error") - var w strings.Builder - rc, err := countExec(args, queries, r, &w) - g.TAssertEqual(err, countErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - countErr = nil - }) - - g.Testing("otherwise it prints the given count", func() { - var w strings.Builder - rc, err := countExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(w.String(), "2222\n") - g.TAssertEqual(rc, 0) - }) -} - -func test_hasDataExec() { - g.TestStart("hasData()") - - const ( - topic = "hasData topic" - consumer = "hasData consumer" - ) - var ( - w strings.Builder - r = strings.NewReader("") - args = argsT{ - topic: topic, - consumer: consumer, - } - ) - - hasData := true - var hasDataErr error - queries := queriesT{ - hasData: func(string, string) (bool, error) { - if hasDataErr != nil { - return false, hasDataErr - } - - return hasData, nil - }, - } - - - g.Testing("it propagates the query error", func() { - hasDataErr = errors.New("hasData() error") - rc, err := hasDataExec(args, queries, r, &w) - g.TAssertEqual(err, hasDataErr) - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 1) - hasDataErr = nil - }) - - g.Testing("otherwise if just returns (not prints) the flag", func() { - hasData = true - rc, err := hasDataExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 0) - - hasData = false - rc, err = hasDataExec(args, queries, r, &w) - g.TErrorIf(err) - g.TAssertEqual(rc, 1) - - g.TAssertEqual(w.String(), "") - }) -} - -func test_usage() { - g.TestStart("usage()") - - g.Testing("it just writes to io.Writer", func() { - var w strings.Builder - usage("xxx", &w) - const message = - "Usage: xxx [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" - g.TAssertEqual(w.String(), message) - }) - - g.Testing("noop on io.Discard for io.Writer", func() { - usage("AN ERROR IF SEEN ANYWHERE!", io.Discard) - }) -} - -func test_getopt() { - g.TestStart("getopt()") - - const warning = "Missing COMMAND.\n" - const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" - - commandsMap := map[string]commandT{ - "good": commandT{ - name: "good", - getopt: func(args argsT, _ io.Writer) (argsT, bool) { - return args, true - }, - }, - "bad": commandT{ - name: "bad", - getopt: func(args argsT, w io.Writer) (argsT, bool) { - if len(args.args) == 0 { - fmt.Fprintln(w, "no required arg") - return args, false - } - - if args.args[0] != "required" { - fmt.Fprintln(w, "not correct one") - return args, false - } - - args.topic = "a topic" - args.consumer = "a consumer" - return args, true - }, - }, - } - - - g.Testing("we suppress the default error message", func() { - var w strings.Builder - argv := []string{"$0", "-h"} - _, _, rc := getopt(argv, commandsMap, &w) - - g.TAssertEqual(w.String(), usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("we get an error on unsupported flag", func() { - var w strings.Builder - argv := []string{"$0", "-X"} - _, _, rc := getopt(argv, commandsMap, &w) - - const message = "flag provided but not defined: -X\n" - g.TAssertEqual(w.String(), message + usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("we also get an error on incorrect usage of flags", func() { - var w strings.Builder - argv := []string{"$0", "-f"} - _, _, rc := getopt(argv, commandsMap, &w) - - const message = "flag needs an argument: -f\n" - g.TAssertEqual(w.String(), message + usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("error when not given a command", func() { - var w strings.Builder - argv := []string{"$0"} - _, _, rc := getopt(argv, commandsMap, &w) - - g.TAssertEqual(w.String(), warning + usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("error on unknown command", func() { - var w strings.Builder - argv := []string{"$0", "unknown"} - _, _, rc := getopt(argv, commandsMap, &w) - - const message = `Bad COMMAND: "unknown".` + "\n" - g.TAssertEqual(w.String(), message + usage) - g.TAssertEqual(rc, 2) - }) - - g.Testing("checks the command usage", func() { - var ( - w1 strings.Builder - w2 strings.Builder - w3 strings.Builder - ) - - argv1 := []string{"$0", "bad"} - argv2 := []string{"$0", "bad", "arg"} - argv3 := []string{"$0", "bad", "required"} - _, _, rc1 := getopt(argv1, commandsMap, &w1) - _, _, rc2 := getopt(argv2, commandsMap, &w2) - args, command, rc3 := getopt(argv3, commandsMap, &w3) - expectedArgs := argsT{ - databasePath: "fiinha.db", - prefix: "fiinha", - command: "bad", - allArgs: argv3, - args: argv3[2:], - topic: "a topic", - consumer: "a consumer", - } - - g.TAssertEqual(w1.String(), "no required arg\n" + usage) - g.TAssertEqual(w2.String(), "not correct one\n" + usage) - g.TAssertEqual(w3.String(), "") - g.TAssertEqual(rc1, 2) - g.TAssertEqual(rc2, 2) - g.TAssertEqual(rc3, 0) - g.TAssertEqual(args, expectedArgs) - g.TAssertEqual(command.name, "bad") - }) - - g.Testing("when given a command we the default values", func() { - var w strings.Builder - args, command, rc := getopt( - []string{"$0", "good"}, - commandsMap, - &w, - ) - expectedArgs := argsT{ - databasePath: "fiinha.db", - prefix: "fiinha", - command: "good", - allArgs: []string{"$0", "good"}, - args: []string{}, - } - - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(args, expectedArgs) - g.TAssertEqual(command.name, "good") - }) - - g.Testing("we can customize both values", func() { - var w strings.Builder - argv := []string{"$0", "-f", "F", "-p", "P", "good"} - args, command, rc := getopt(argv, commandsMap, &w) - expectedArgs := argsT{ - databasePath: "F", - prefix: "P", - command: "good", - allArgs: argv, - args: []string{}, - } - - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(args, expectedArgs) - g.TAssertEqual(command.name, "good") - }) - - g.Testing("a command can have its own commands and options", func() { - var w strings.Builder - argv := []string{"$0", "-f", "F", "good", "-f", "-f", "SUB"} - args, command, rc := getopt(argv, commandsMap, &w) - expectedArgs := argsT{ - databasePath: "F", - prefix: "fiinha", - command: "good", - allArgs: argv, - args: []string{"-f", "-f", "SUB"}, - } - - g.TAssertEqual(w.String(), "") - g.TAssertEqual(rc, 0) - g.TAssertEqual(args, expectedArgs) - g.TAssertEqual(command.name, "good") - }) -} - -func test_runCommand() { - g.TestStart("runCommand()") - - g.Testing("returns an error on bad prefix", func() { - stdin := strings.NewReader("") - var ( - stdout strings.Builder - stderr strings.Builder - ) - args := argsT{ - prefix: "a bad prefix", - command: "in", - allArgs: []string{"$0"}, - args: []string{"some topic name"}, - } - rc := runCommand(args, commands["in"], stdin, &stdout, &stderr) - - g.TAssertEqual(rc, 1) - g.TAssertEqual(stdout.String(), "") - g.TAssertEqual(stderr.String(), "Invalid table prefix\n") - }) - - g.Testing("otherwise it build a queueT and calls command", func() { - stdin := strings.NewReader("") - var ( - stdout1 strings.Builder - stdout2 strings.Builder - stderr1 strings.Builder - stderr2 strings.Builder - ) - args1 := argsT{ - prefix: defaultPrefix, - command: "good", - } - args2 := argsT{ - prefix: defaultPrefix, - command: "bad", - } - myErr := errors.New("an error") - good := commandT{ - exec: func( - _ argsT, - _ queriesT, - _ io.Reader, - w io.Writer, - ) (int, error) { - fmt.Fprintf(w, "good text\n") - return 0, nil - }, - } - bad := commandT{ - exec: func( - _ argsT, - _ queriesT, - _ io.Reader, - w io.Writer, - ) (int, error) { - fmt.Fprintf(w, "bad text\n") - return 123, myErr - }, - } - rc1 := runCommand(args1, good, stdin, &stdout1, &stderr1) - rc2 := runCommand(args2, bad, stdin, &stdout2, &stderr2) - - g.TAssertEqual(stdout1.String(), "good text\n") - g.TAssertEqual(stdout2.String(), "bad text\n") - g.TAssertEqual(stderr1.String(), "") - g.TAssertEqual(stderr2.String(), "an error\n") - g.TAssertEqual(rc1, 0) - g.TAssertEqual(rc2, 123) - }) -} - -func test_commands() { - g.TestStart("commands") - - g.Testing("ensure map key and name are in sync", func() { - for key, command := range commands { - g.TAssertEqual(command.name, key) - } - }) -} - - -func dumpQueries() { - queries := []struct{name string; fn func(string) queryT}{ - { "createTables", createTablesSQL }, - { "take", takeSQL }, - { "publish", publishSQL }, - { "find", findSQL }, - { "next", nextSQL }, - { "pending", pendingSQL }, - { "commit", commitSQL }, - { "toDead", toDeadSQL }, - { "replay", replaySQL }, - { "oneDead", oneDeadSQL }, - { "allDead", allDeadSQL }, - { "size", sizeSQL }, - { "count", countSQL }, - { "hasData", hasDataSQL }, - } - for _, query := range queries { - q := query.fn(defaultPrefix) - fmt.Printf("\n-- %s.sql:", query.name) - fmt.Printf("\n-- write:%s\n", q.write) - fmt.Printf("\n-- read:%s\n", q.read) - fmt.Printf("\n-- owner:%s\n", q.owner) - } -} - - - -func MainTest() { - if os.Getenv("TESTING_DUMP_SQL_QUERIES") != "" { - dumpQueries() - return - } - - g.Init() - test_defaultPrefix() - test_serialized() - test_execSerialized() - test_tryRollback() - test_inTx() - test_createTables() - test_takeStmt() - test_publishStmt() - test_findStmt() - test_nextStmt() - test_messageEach() - test_pendingStmt() - test_commitStmt() - test_toDeadStmt() - test_replayStmt() - test_oneDeadStmt() - test_deadletterEach() - test_allDeadStmt() - test_sizeStmt() - test_countStmt() - test_hasDataStmt() - test_initDB() - test_queriesTclose() - test_newPinger() - test_makeSubscriptionsFunc() - test_makeNotifyFn() - test_collectClosedWaiters() - test_trimEmptyLeaves() - test_deleteIfEmpty() - test_deleteEmptyTopics() - test_removeClosedWaiter() - test_reapClosedWaiters() - test_everyNthCall() - test_runReaper() - test_NewWithPrefix() - test_New() - test_asPublicMessage() - test_queueT_Publish() - test_registerConsumerFn() - test_registerWaiterFn() - test_makeConsumeOneFn() - test_makeConsumeAllFn() - test_makeWaitFn() - test_runConsumer() - test_tryFinding() - test_queueT_Subscribe() - test_queueT_WaitFor() - test_unsubscribeIfExistsFn() - test_queueT_Unsubscribe() - test_cleanSubscriptions() - test_queueT_Close() - test_topicGetopt() - test_topicConsumerGetopt() - test_inExec() - test_outExec() - test_commitExec() - test_deadExec() - test_listDeadExec() - test_replayExec() - test_sizeExec() - test_countExec() - test_hasDataExec() - test_usage() - test_getopt() - test_runCommand() - test_commands() -} diff --git a/tests/functional/consume-one-produce-many/fiinha.go b/tests/functional/consume-one-produce-many/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/consume-one-produce-many/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/consume-one-produce-many/main.go b/tests/functional/consume-one-produce-many/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/consume-one-produce-many/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/consumer-with-deadletter/fiinha.go b/tests/functional/consumer-with-deadletter/fiinha.go deleted file mode 100644 index 292d327..0000000 --- a/tests/functional/consumer-with-deadletter/fiinha.go +++ /dev/null @@ -1,85 +0,0 @@ -package fiinha - -import ( - "errors" - "runtime" - - "uuid" - g "gobang" -) - - - -const ( - topicX = "new-event-x" - topicY = "new-event-y" -) - -var forbidden3Err = errors.New("we don't like 3") - - - -func processNewEventXToY(message Message) (UnsentMessage, error) { - payload := string(message.Payload) - if payload == "event 3" { - return UnsentMessage{}, forbidden3Err - } - - newPayload := []byte("processed " + payload) - unsent := UnsentMessage{ - Topic: topicY, - FlowID: message.FlowID, - Payload: newPayload, - } - return unsent, nil -} - - - -func MainTest() { - g.SetLevel(g.LogLevel_None) - - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - databasePath := file + ".db" - queue, err := New(databasePath) - g.TErrorIf(err) - defer queue.Close() - - pub := func(payload []byte, flowID uuid.UUID) { - unsent := UnsentMessage{ - Topic: topicX, - FlowID: flowID, - Payload: payload, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) - } - - - g.Testing("we can WaitFor() a message after a deadletter", func() { - flowID := uuid.New() - - handlerFn := func(message Message) error { - messageY, err := processNewEventXToY(message) - if err != nil { - return err - } - - _, err = queue.Publish(messageY) - return err - } - queue.Subscribe(topicX, "main-worker", handlerFn) - defer queue.Unsubscribe(topicX, "main-worker") - - pub([]byte("event 1"), uuid.New()) - pub([]byte("event 2"), uuid.New()) - pub([]byte("event 3"), uuid.New()) - pub([]byte("event 4"), uuid.New()) - pub([]byte("event 5"), flowID) - - given := <- queue.WaitFor(topicY, flowID, "waiter").Channel - g.TAssertEqual(given, []byte("processed event 5")) - }) -} diff --git a/tests/functional/consumer-with-deadletter/main.go b/tests/functional/consumer-with-deadletter/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/consumer-with-deadletter/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/custom-prefix/fiinha.go b/tests/functional/custom-prefix/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/custom-prefix/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/custom-prefix/main.go b/tests/functional/custom-prefix/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/custom-prefix/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/distinct-consumers-separate-instances/fiinha.go b/tests/functional/distinct-consumers-separate-instances/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/distinct-consumers-separate-instances/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/distinct-consumers-separate-instances/main.go b/tests/functional/distinct-consumers-separate-instances/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/distinct-consumers-separate-instances/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/flow-id/fiinha.go b/tests/functional/flow-id/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/flow-id/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/flow-id/main.go b/tests/functional/flow-id/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/flow-id/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/idempotency/fiinha.go b/tests/functional/idempotency/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/idempotency/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/idempotency/main.go b/tests/functional/idempotency/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/idempotency/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/new-instance-takeover/fiinha.go b/tests/functional/new-instance-takeover/fiinha.go deleted file mode 100644 index 5e6ad4b..0000000 --- a/tests/functional/new-instance-takeover/fiinha.go +++ /dev/null @@ -1,109 +0,0 @@ -package fiinha - -import ( - "runtime" - "os" - - "uuid" - g "gobang" -) - - - -const topic = "topic" - - - -func pub(queue IQueue, topic string, flowID uuid.UUID) { - unsent := UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: []byte{}, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) -} - -func handlerFn(publish func(uuid.UUID)) func(Message) error { - return func(message Message) error { - publish(message.FlowID) - return nil - } -} - -func startInstance( - dbpath string, - instanceID int, - name string, -) (IQueue, error) { - iqueue, err := New(dbpath) - g.TErrorIf(err) - queue := iqueue.(queueT) - - notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger) - queries, err := initDB(dbpath, defaultPrefix, notifyFn, instanceID) - g.TErrorIf(err) - - err = queue.queries.close() - g.TErrorIf(err) - - queue.queries = queries - - pub_ := func(topic string) func(uuid.UUID) { - return func(flowID uuid.UUID) { - pub(queue, topic, flowID) - } - } - - individual := "individual-" + name - shared := "shared" - - queue.Subscribe(topic, individual, handlerFn(pub_(individual))) - queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name))) - - return queue, nil -} - - - -func MainTest() { - g.Init() - - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - dbpath := file + ".db" - instanceID1 := os.Getpid() - instanceID2 := instanceID1 + 1 - - flowID1 := uuid.New() - flowID2 := uuid.New() - - g.Testing("new instances take ownership of topic+name combo", func() { - q1, err := startInstance(dbpath, instanceID1, "first") - g.TErrorIf(err) - defer q1.Close() - - pub(q1, topic, uuid.New()) - pub(q1, topic, uuid.New()) - pub(q1, topic, flowID1) - - <- q1.WaitFor("individual-first", flowID1, "w").Channel - <- q1.WaitFor( "shared-first", flowID1, "w").Channel - - q2, err := startInstance(dbpath, instanceID2, "second") - g.TErrorIf(err) - defer q2.Close() - - <- q2.WaitFor("individual-second", flowID1, "w").Channel - - pub(q2, topic, uuid.New()) - pub(q2, topic, uuid.New()) - pub(q2, topic, flowID2) - - // FIXME: notify multiple instances so we can add this: - // <- q2.WaitFor("individual-first", flowID2, "w").Channel - <- q2.WaitFor("individual-second", flowID2, "w").Channel - <- q2.WaitFor( "shared-second", flowID2, "w").Channel - }) -} diff --git a/tests/functional/new-instance-takeover/main.go b/tests/functional/new-instance-takeover/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/new-instance-takeover/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/wait-after-publish/fiinha.go b/tests/functional/wait-after-publish/fiinha.go deleted file mode 100644 index 71b9b56..0000000 --- a/tests/functional/wait-after-publish/fiinha.go +++ /dev/null @@ -1,54 +0,0 @@ -package fiinha - -import ( - "runtime" - - "uuid" - g "gobang" -) - - - -const topic = "topic" - - - -func MainTest() { - _, file, _, ok := runtime.Caller(0) - g.TAssertEqualS(ok, true, "can't get filename") - - databasePath := file + ".db" - queue, err := New(databasePath) - g.TErrorIf(err) - defer queue.Close() - - pub := func(flowID uuid.UUID, payload []byte) { - unsent := UnsentMessage{ - Topic: topic, - FlowID: flowID, - Payload: payload, - } - _, err := queue.Publish(unsent) - g.TErrorIf(err) - } - - - g.Testing("we can WaitFor() a message before its publishing", func() { - flowID := uuid.New() - waiter := queue.WaitFor(topic, flowID, "waiter").Channel - - pub(flowID, []byte("payload before")) - - given := <- waiter - g.TAssertEqual(given, []byte("payload before")) - }) - - g.Testing("we can also do it after its publishing", func() { - flowID := uuid.New() - - pub(flowID, []byte("payload after")) - - given := <- queue.WaitFor(topic, flowID, "waiter").Channel - g.TAssertEqual(given, []byte("payload after")) - }) -} diff --git a/tests/functional/wait-after-publish/main.go b/tests/functional/wait-after-publish/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/wait-after-publish/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/functional/waiter/fiinha.go b/tests/functional/waiter/fiinha.go deleted file mode 100644 index 6a3ca47..0000000 --- a/tests/functional/waiter/fiinha.go +++ /dev/null @@ -1,5 +0,0 @@ -package fiinha - -func MainTest() { - // FIXME -} diff --git a/tests/functional/waiter/main.go b/tests/functional/waiter/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/functional/waiter/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/api-check/fiinha.go b/tests/fuzz/api-check/fiinha.go deleted file mode 100644 index 86801de..0000000 --- a/tests/fuzz/api-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func api(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - // FIXME - if n > 1 { - if n < 2 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - fuzzTargets := []testing.InternalFuzzTarget{ - { "api", api }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/api-check/main.go b/tests/fuzz/api-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/api-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/cli-check/fiinha.go b/tests/fuzz/cli-check/fiinha.go deleted file mode 100644 index 1cb6f37..0000000 --- a/tests/fuzz/cli-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/cli-check/main.go b/tests/fuzz/cli-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/cli-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/equal-produced-consumed-order-check/fiinha.go b/tests/fuzz/equal-produced-consumed-order-check/fiinha.go deleted file mode 100644 index ef2e72a..0000000 --- a/tests/fuzz/equal-produced-consumed-order-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME: produced order is identical to consumed order - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/equal-produced-consumed-order-check/main.go b/tests/fuzz/equal-produced-consumed-order-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/equal-produced-consumed-order-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/exactly-once-check/fiinha.go b/tests/fuzz/exactly-once-check/fiinha.go deleted file mode 100644 index 6ac1fb1..0000000 --- a/tests/fuzz/exactly-once-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME: a message is consumed exactly once - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/exactly-once-check/main.go b/tests/fuzz/exactly-once-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/exactly-once-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/queries-check/fiinha.go b/tests/fuzz/queries-check/fiinha.go deleted file mode 100644 index 1cb6f37..0000000 --- a/tests/fuzz/queries-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/queries-check/main.go b/tests/fuzz/queries-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/queries-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/fuzz/total-order-check/fiinha.go b/tests/fuzz/total-order-check/fiinha.go deleted file mode 100644 index cb5aa61..0000000 --- a/tests/fuzz/total-order-check/fiinha.go +++ /dev/null @@ -1,35 +0,0 @@ -package fiinha - -import ( - "os" - "testing" - "testing/internal/testdeps" -) - - - -func queries(f *testing.F) { - f.Fuzz(func(t *testing.T, n int) { - if n > 154 { - if n < 155 { - t.Errorf("Failed n: %v\n", n) - } - } - }) -} - - - -func MainTest() { - // FIXME: a consumer gets the messages in total order - fuzzTargets := []testing.InternalFuzzTarget{ - { "queries", queries }, - } - - deps := testdeps.TestDeps{} - tests := []testing.InternalTest {} - benchmarks := []testing.InternalBenchmark{} - examples := []testing.InternalExample {} - m := testing.MainStart(deps, tests, benchmarks, fuzzTargets, examples) - os.Exit(m.Run()) -} diff --git a/tests/fuzz/total-order-check/main.go b/tests/fuzz/total-order-check/main.go deleted file mode 120000 index f67563d..0000000 --- a/tests/fuzz/total-order-check/main.go +++ /dev/null @@ -1 +0,0 @@ -../../main.go
\ No newline at end of file diff --git a/tests/integration.sh b/tests/integration.sh deleted file mode 100755 index fcb62ca..0000000 --- a/tests/integration.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -set -eu - -exit diff --git a/tests/main.go b/tests/main.go deleted file mode 100644 index 789b267..0000000 --- a/tests/main.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "fiinha" - -func main() { - fiinha.MainTest() -} diff --git a/tests/queries.sql b/tests/queries.sql deleted file mode 100644 index 241f419..0000000 --- a/tests/queries.sql +++ /dev/null @@ -1,387 +0,0 @@ - --- createTables.sql: --- write: - CREATE TABLE IF NOT EXISTS "fiinha_payloads" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), - topic TEXT NOT NULL, - payload BLOB NOT NULL - ) STRICT; - CREATE INDEX IF NOT EXISTS "fiinha_payloads_topic" - ON "fiinha_payloads"(topic); - - CREATE TABLE IF NOT EXISTS "fiinha_messages" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), - uuid BLOB NOT NULL UNIQUE, - flow_id BLOB NOT NULL, - payload_id INTEGER NOT NULL - REFERENCES "fiinha_payloads"(id) - ) STRICT; - CREATE INDEX IF NOT EXISTS "fiinha_messages_flow_id" - ON "fiinha_messages"(flow_id); - - CREATE TABLE IF NOT EXISTS "fiinha_offsets" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), - consumer TEXT NOT NULL, - message_id INTEGER NOT NULL - REFERENCES "fiinha_messages"(id), - instance_id INTEGER NOT NULL, - UNIQUE (consumer, message_id) - ) STRICT; - CREATE INDEX IF NOT EXISTS "fiinha_offsets_consumer" - ON "fiinha_offsets"(consumer); - - CREATE TABLE IF NOT EXISTS "fiinha_deadletters" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - uuid BLOB NOT NULL UNIQUE, - consumer TEXT NOT NULL, - message_id INTEGER NOT NULL - REFERENCES "fiinha_messages"(id), - instance_id INTEGER NOT NULL, - UNIQUE (consumer, message_id) - ) STRICT; - CREATE INDEX IF NOT EXISTS "fiinha_deadletters_consumer" - ON "fiinha_deadletters"(consumer); - - CREATE TABLE IF NOT EXISTS "fiinha_replays" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - deadletter_id INTEGER NOT NULL UNIQUE - REFERENCES "fiinha_deadletters"(id) , - message_id INTEGER NOT NULL UNIQUE - REFERENCES "fiinha_messages"(id) - ) STRICT; - - CREATE TABLE IF NOT EXISTS "fiinha_owners" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - topic TEXT NOT NULL, - consumer TEXT NOT NULL, - owner_id INTEGER NOT NULL, - UNIQUE (topic, consumer) - ) STRICT; - - CREATE TRIGGER IF NOT EXISTS "fiinha_check_instance_owns_topic" - BEFORE INSERT ON "fiinha_offsets" - WHEN NEW.instance_id != ( - SELECT owner_id FROM "fiinha_owners" - WHERE topic = ( - SELECT "fiinha_payloads".topic - FROM "fiinha_payloads" - JOIN "fiinha_messages" ON "fiinha_payloads".id = - "fiinha_messages".payload_id - WHERE "fiinha_messages".id = NEW.message_id - ) AND consumer = NEW.consumer - ) - BEGIN - SELECT RAISE( - ABORT, - 'instance does not own topic/consumer combo' - ); - END; - - CREATE TRIGGER IF NOT EXISTS "fiinha_check_can_publish_deadletter" - BEFORE INSERT ON "fiinha_deadletters" - WHEN NEW.instance_id != ( - SELECT owner_id FROM "fiinha_owners" - WHERE topic = ( - SELECT "fiinha_payloads".topic - FROM "fiinha_payloads" - JOIN "fiinha_messages" ON "fiinha_payloads".id = - "fiinha_messages".payload_id - WHERE "fiinha_messages".id = NEW.message_id - ) AND consumer = NEW.consumer - ) - BEGIN - SELECT RAISE( - ABORT, - 'Instance does not own topic/consumer combo' - ); - END; - - --- read: - --- owner: - --- take.sql: --- write: - INSERT INTO "fiinha_owners" (topic, consumer, owner_id) - VALUES (?, ?, ?) - ON CONFLICT (topic, consumer) DO - UPDATE SET owner_id=excluded.owner_id; - - --- read: - --- owner: - --- publish.sql: --- write: - INSERT INTO "fiinha_payloads" (topic, payload) - VALUES (?, ?); - - INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id) - VALUES (?, ?, last_insert_rowid()); - - --- read: - SELECT id, timestamp FROM "fiinha_messages" - WHERE uuid = ?; - - --- owner: - --- find.sql: --- write: - --- read: - SELECT - "fiinha_messages".id, - "fiinha_messages".timestamp, - "fiinha_messages".uuid, - "fiinha_payloads".payload - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_messages".flow_id = ? - ORDER BY "fiinha_messages".id DESC - LIMIT 1; - - --- owner: - --- next.sql: --- write: - --- read: - SELECT - ( - SELECT owner_id FROM "fiinha_owners" - WHERE - topic = ? AND - consumer = ? - LIMIT 1 - ) AS owner_id, - "fiinha_messages".id, - "fiinha_messages".timestamp, - "fiinha_messages".uuid, - "fiinha_messages".flow_id, - "fiinha_payloads".payload - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_messages".id NOT IN ( - SELECT message_id FROM "fiinha_offsets" - WHERE consumer = ? - ) - ORDER BY "fiinha_messages".id ASC - LIMIT 1; - - --- owner: - --- pending.sql: --- write: - --- read: - SELECT - "fiinha_messages".id, - "fiinha_messages".timestamp, - "fiinha_messages".uuid, - "fiinha_messages".flow_id, - "fiinha_payloads".topic, - "fiinha_payloads".payload - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_messages".id NOT IN ( - SELECT message_id FROM "fiinha_offsets" - WHERE consumer = ? - ) - ORDER BY "fiinha_messages".id ASC; - - --- owner: - SELECT owner_id FROM "fiinha_owners" - WHERE - topic = ? AND - consumer = ?; - - --- commit.sql: --- write: - INSERT INTO "fiinha_offsets" (consumer, message_id, instance_id) - VALUES (?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?); - - --- read: - --- owner: - --- toDead.sql: --- write: - INSERT INTO "fiinha_offsets" - ( consumer, message_id, instance_id) - VALUES ( ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?); - - INSERT INTO "fiinha_deadletters" - (uuid, consumer, message_id, instance_id) - VALUES (?, ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?); - - --- read: - --- owner: - --- replay.sql: --- write: - INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id) - SELECT - ?, - "fiinha_messages".flow_id, - "fiinha_messages".payload_id - FROM "fiinha_messages" - JOIN "fiinha_deadletters" ON - "fiinha_messages".id = "fiinha_deadletters".message_id - WHERE "fiinha_deadletters".uuid = ?; - - INSERT INTO "fiinha_replays" (deadletter_id, message_id) - VALUES ( - (SELECT id FROM "fiinha_deadletters" WHERE uuid = ?), - last_insert_rowid() - ); - - --- read: - SELECT - "fiinha_messages".id, - "fiinha_messages".timestamp, - "fiinha_messages".flow_id, - "fiinha_payloads".topic, - "fiinha_payloads".payload - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE "fiinha_messages".uuid = ?; - - --- owner: - --- oneDead.sql: --- write: - --- read: - SELECT - "fiinha_deadletters".uuid, - "fiinha_offsets".timestamp, - "fiinha_messages".uuid - FROM "fiinha_deadletters" - JOIN "fiinha_offsets" ON - "fiinha_deadletters".message_id = "fiinha_offsets".message_id - JOIN "fiinha_messages" ON - "fiinha_deadletters".message_id = "fiinha_messages".id - JOIN "fiinha_payloads" ON - "fiinha_messages".payload_id = "fiinha_payloads".id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_deadletters".consumer = ? AND - "fiinha_offsets".consumer = ? AND - "fiinha_deadletters".id NOT IN ( - SELECT deadletter_id FROM "fiinha_replays" - ) - ORDER BY "fiinha_deadletters".id ASC - LIMIT 1; - - --- owner: - --- allDead.sql: --- write: - --- read: - SELECT - "fiinha_deadletters".uuid, - "fiinha_deadletters".message_id, - "fiinha_offsets".timestamp, - "fiinha_offsets".consumer, - "fiinha_messages".timestamp, - "fiinha_messages".uuid, - "fiinha_messages".flow_id, - "fiinha_payloads".topic, - "fiinha_payloads".payload - FROM "fiinha_deadletters" - JOIN "fiinha_offsets" ON - "fiinha_deadletters".message_id = "fiinha_offsets".message_id - JOIN "fiinha_messages" ON - "fiinha_deadletters".message_id = "fiinha_messages".id - JOIN "fiinha_payloads" ON - "fiinha_messages".payload_id = "fiinha_payloads".id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_deadletters".consumer = ? AND - "fiinha_offsets".consumer = ? AND - "fiinha_deadletters".id NOT IN ( - SELECT deadletter_id FROM "fiinha_replays" - ) - ORDER BY "fiinha_deadletters".id ASC; - - --- owner: - --- size.sql: --- write: - --- read: - SELECT - COUNT(1) as size - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_messages".payload_id = "fiinha_payloads".id - WHERE "fiinha_payloads".topic = ?; - - --- owner: - --- count.sql: --- write: - --- read: - SELECT - COUNT(1) as count - FROM "fiinha_messages" - JOIN "fiinha_offsets" ON - "fiinha_messages".id = "fiinha_offsets".message_id - JOIN "fiinha_payloads" ON - "fiinha_messages".payload_id = "fiinha_payloads".id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_offsets".consumer = ?; - - --- owner: - --- hasData.sql: --- write: - --- read: - SELECT 1 as data - FROM "fiinha_messages" - JOIN "fiinha_payloads" ON - "fiinha_payloads".id = "fiinha_messages".payload_id - WHERE - "fiinha_payloads".topic = ? AND - "fiinha_messages".id NOT IN ( - SELECT message_id FROM "fiinha_offsets" - WHERE consumer = ? - ) - LIMIT 1; - - --- owner: |
