summaryrefslogtreecommitdiff
path: root/tests/q.go
diff options
context:
space:
mode:
Diffstat (limited to 'tests/q.go')
-rw-r--r--tests/q.go5776
1 files changed, 5776 insertions, 0 deletions
diff --git a/tests/q.go b/tests/q.go
new file mode 100644
index 0000000..6b9e422
--- /dev/null
+++ b/tests/q.go
@@ -0,0 +1,5776 @@
+package q
+
+import (
+ "bytes"
+ "database/sql"
+ "errors"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "reflect"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "acudego"
+ "guuid"
+ g "gobang"
+)
+
+
+
+func test_defaultPrefix() {
+ g.TestStart("defaultPrefix")
+
+ g.Testing("the defaultPrefix is valid", func() {
+ g.TErrorIf(g.ValidateSQLTablePrefix(defaultPrefix))
+ })
+}
+
+func test_inTx() {
+ /*
+ // FIXME
+ g.TestStart("inTx()")
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+
+ g.Testing("when fn() errors, we propagate it", func() {
+ myError := errors.New("to be propagated")
+ err := inTx(db, func(tx *sql.Tx) error {
+ return myError
+ })
+ g.TAssertEqual(err, myError)
+ })
+
+ g.Testing("on nil error we get nil", func() {
+ err := inTx(db, func(tx *sql.Tx) error {
+ return nil
+ })
+ g.TErrorIf(err)
+ })
+ */
+}
+
+func test_createTables() {
+ g.TestStart("createTables()")
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+
+ g.Testing("tables exist afterwards", func() {
+ tmpl := `
+ SELECT id FROM "%s_messages" LIMIT 1;
+ `
+ q := fmt.Sprintf(tmpl, defaultPrefix)
+
+ _, err := db.Exec(q)
+ g.TErrorNil(err)
+
+ err = createTables(db, defaultPrefix)
+ g.TErrorIf(err)
+
+ _, err = db.Exec(q)
+ g.TErrorIf(err)
+ })
+
+ g.Testing("we can do it multiple times", func() {
+ g.TErrorIf(g.SomeError(
+ createTables(db, defaultPrefix),
+ createTables(db, defaultPrefix),
+ createTables(db, defaultPrefix),
+ ))
+ })
+}
+
+func test_takeStmt() {
+ g.TestStart("takeStmt()")
+
+ const (
+ topic = "take() topic"
+ consumer = "take() consumer"
+ prefix = defaultPrefix
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ g.TErrorIf(takeErr)
+ defer g.SomeFnError(
+ takeClose,
+ db.Close,
+ )
+
+ const tmpl = `
+ SELECT owner_id from "%s_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?;
+ `
+ sqlOwner := fmt.Sprintf(tmpl, prefix)
+
+
+ g.Testing("when there is no owner, we become it", func() {
+ var ownerID int
+ err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
+ g.TAssertEqual(err, sql.ErrNoRows)
+
+ err = take(topic, consumer)
+ g.TErrorIf(err)
+
+ err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
+ g.TErrorIf(err)
+ g.TAssertEqual(ownerID, instanceID)
+ })
+
+ g.Testing("if there is already an owner, we overtake it", func() {
+ otherID := instanceID + 1
+
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ var ownerID int
+ err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
+ g.TErrorIf(err)
+ g.TAssertEqual(ownerID, instanceID)
+
+ err = take(topic, consumer)
+ g.TErrorIf(err)
+
+ err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
+ g.TErrorIf(err)
+ g.TAssertEqual(ownerID, otherID)
+ })
+ g.Testing("no error if closed more than once", func() {
+ g.TErrorIf(g.SomeError(
+ takeClose(),
+ takeClose(),
+ takeClose(),
+ ))
+ })
+}
+
+func test_publishStmt() {
+ g.TestStart("publishStmt()")
+
+ const (
+ topic = "publish() topic"
+ payloadStr = "publish() payload"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ publishErr,
+ ))
+ defer g.SomeFnError(
+ publishClose,
+ db.Close,
+ )
+
+
+ g.Testing("we can publish a message", func() {
+ messageID := guuid.New()
+ message, err := publish(unsent, messageID)
+ g.TErrorIf(err)
+
+ g.TAssertEqual(message.id, int64(1))
+ g.TAssertEqual(message.uuid, messageID)
+ g.TAssertEqual(message.topic, topic)
+ g.TAssertEqual(message.flowID, flowID)
+ g.TAssertEqual(message.payload, payload)
+ })
+
+ g.Testing("we can publish the same message repeatedly", func() {
+ messageID1 := guuid.New()
+ messageID2 := guuid.New()
+ message1, err1 := publish(unsent, messageID1)
+ message2, err2 := publish(unsent, messageID2)
+ g.TErrorIf(g.SomeError(err1, err2))
+
+ g.TAssertEqual(message1.id, message2.id - 1)
+ g.TAssertEqual(message1.topic, message2.topic)
+ g.TAssertEqual(message1.flowID, message2.flowID)
+ g.TAssertEqual(message1.payload, message2.payload)
+
+ g.TAssertEqual(message1.uuid, messageID1)
+ g.TAssertEqual(message2.uuid, messageID2)
+ })
+
+ g.Testing("publishing a message with the same UUID errors", func() {
+ messageID := guuid.New()
+ message1, err1 := publish(unsent, messageID)
+ _, err2 := publish(unsent, messageID)
+ g.TErrorIf(err1)
+
+ g.TAssertEqual(message1.uuid, messageID)
+ g.TAssertEqual(message1.topic, topic)
+ g.TAssertEqual(message1.flowID, flowID)
+ g.TAssertEqual(message1.payload, payload)
+
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ publishClose(),
+ publishClose(),
+ publishClose(),
+ ))
+ })
+}
+
+func test_findStmt() {
+ g.TestStart("findStmt()")
+
+ const (
+ topic = "find() topic"
+ payloadStr = "find() payload"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ find, findClose, findErr := findStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ publishErr,
+ findErr,
+ ))
+ defer g.SomeFnError(
+ publishClose,
+ findClose,
+ db.Close,
+ )
+
+ pub := func(flowID guuid.UUID) guuid.UUID {
+ unsentWithFlowID := unsent
+ unsentWithFlowID.FlowID = flowID
+ messageID := guuid.New()
+ _, err := publish(unsentWithFlowID, messageID)
+ g.TErrorIf(err)
+ return messageID
+ }
+
+
+ g.Testing("we can find a message by topic and flowID", func() {
+ flowID := guuid.New()
+ messageID := pub(flowID)
+ message, err := find(topic, flowID)
+ g.TErrorIf(err)
+
+ g.TAssertEqual(message.uuid, messageID)
+ g.TAssertEqual(message.topic, topic)
+ g.TAssertEqual(message.flowID, flowID)
+ g.TAssertEqual(message.payload, payload)
+ })
+
+ g.Testing("a non-existent message gives us an error", func() {
+ message, err := find(topic, guuid.New())
+ g.TAssertEqual(message, messageT{})
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("findig twice yields the exact same message", func() {
+ flowID := guuid.New()
+ messageID := pub(flowID)
+ message1, err1 := find(topic, flowID)
+ message2, err2 := find(topic, flowID)
+ g.TErrorIf(g.SomeError(err1, err2))
+
+ g.TAssertEqual(message1.uuid, messageID)
+ g.TAssertEqual(message1, message2)
+ })
+
+ g.Testing("returns the latest entry if multiple are available", func() {
+ flowID := guuid.New()
+
+ _ , err0 := find(topic, flowID)
+ pub(flowID)
+ message1, err1 := find(topic, flowID)
+ pub(flowID)
+ message2, err2 := find(topic, flowID)
+
+ g.TAssertEqual(err0, sql.ErrNoRows)
+ g.TErrorIf(g.SomeError(err1, err2))
+ g.TAssertEqual(message1.uuid == message2.uuid, false)
+ g.TAssertEqual(message1.id < message2.id, true)
+ })
+
+ g.Testing("no error if closed more than once", func() {
+ g.TErrorIf(g.SomeError(
+ findClose(),
+ findClose(),
+ findClose(),
+ ))
+ })
+}
+
+func test_nextStmt() {
+ g.TestStart("nextStmt()")
+
+ const (
+ topic = "next() topic"
+ payloadStr = "next() payload"
+ consumer = "next() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ nextErr,
+ commitErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ nextClose,
+ commitClose,
+ db.Close,
+ )
+
+ pub := func(topic string) messageT {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message
+ }
+
+
+ g.Testing("we get an error on empty topic", func() {
+ _, err := next(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("we don't get messages from other topics", func() {
+ pub("other topic")
+ _, err := next(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("we can get the next message", func() {
+ expectedMessage := pub(topic)
+ pub(topic)
+ pub(topic)
+ message, err := next(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(message, expectedMessage)
+ })
+
+ g.Testing("we keep getting the next until we commit", func() {
+ message1, err1 := next(topic, consumer)
+ message2, err2 := next(topic, consumer)
+ g.TErrorIf(commit(consumer, message1.uuid))
+ message3, err3 := next(topic, consumer)
+ g.TErrorIf(g.SomeError(err1, err2, err3))
+
+ g.TAssertEqual(message1, message2)
+ g.TAssertEqual(message2.uuid != message3.uuid, true)
+ })
+
+ g.Testing("each consumer has its own next message", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ message1, err1 := next(topic, consumer)
+ message2, err2 := next(topic, "other consumer")
+ g.TErrorIf(g.SomeError(err1, err2))
+ g.TAssertEqual(message1.uuid != message2.uuid, true)
+ })
+
+ g.Testing("error when we're not the owner", func() {
+ otherID := instanceID + 1
+
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ _, err := next(topic, consumer)
+ g.TErrorIf(err)
+
+ err = take(topic, consumer)
+ g.TErrorIf(err)
+
+ _, err = next(topic, consumer)
+ g.TAssertEqual(err, fmt.Errorf(
+ notOwnerErrorFmt,
+ otherID,
+ topic,
+ consumer,
+ instanceID,
+ ))
+ })
+
+ g.Testing("we can close more than once", func() {
+ g.TErrorIf(g.SomeError(
+ nextClose(),
+ nextClose(),
+ nextClose(),
+ ))
+ })
+}
+
+func test_messageEach() {
+ g.TestStart("messageEach()")
+
+ const (
+ topic = "messageEach() topic"
+ payloadStr = "messageEach() payload"
+ consumer = "messageEach() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ pendingErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ pendingClose,
+ db.Close,
+ )
+
+ pub := func() guuid.UUID {
+ message, err := publish(unsent, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+ g.TErrorIf(take(topic, consumer))
+
+
+ g.Testing("not called on empty set", func() {
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ messageEach(rows, func(messageT) error {
+ g.Unreachable()
+ return nil
+ })
+ })
+
+ g.Testing("the callback is called once for each entry", func() {
+ messageIDs := []guuid.UUID{
+ pub(),
+ pub(),
+ pub(),
+ }
+
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ var collectedIDs []guuid.UUID
+ err = messageEach(rows, func(message messageT) error {
+ collectedIDs = append(collectedIDs, message.uuid)
+ return nil
+ })
+ g.TErrorIf(err)
+
+ g.TAssertEqual(collectedIDs, messageIDs)
+ })
+
+ g.Testing("we halt if the timestamp is ill-formatted", func() {
+ messageID := pub()
+ message_id_bytes := messageID[:]
+ pub()
+ pub()
+ pub()
+
+ const tmplUpdate = `
+ UPDATE "%s_messages"
+ SET timestamp = '01/01/1970'
+ WHERE uuid = ?;
+ `
+ sqlUpdate := fmt.Sprintf(tmplUpdate, prefix)
+ _, err := db.Exec(sqlUpdate, message_id_bytes)
+ g.TErrorIf(err)
+
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ n := 0
+ err = messageEach(rows, func(messageT) error {
+ n++
+ return nil
+ })
+
+ g.TAssertEqual(
+ err,
+ &time.ParseError{
+ Layout: time.RFC3339Nano,
+ Value: "01/01/1970",
+ LayoutElem: "2006",
+ ValueElem: "01/01/1970",
+ Message: "",
+ },
+ )
+ g.TAssertEqual(n, 3)
+
+ const tmplDelete = `
+ DELETE FROM "%s_messages"
+ WHERE uuid = ?;
+ `
+ sqlDelete := fmt.Sprintf(tmplDelete, prefix)
+ _, err = db.Exec(sqlDelete, message_id_bytes)
+ g.TErrorIf(err)
+ })
+
+ g.Testing("we halt if the callback returns an error", func() {
+ myErr := errors.New("callback error early return")
+
+ rows1, err1 := pending(topic, consumer)
+ g.TErrorIf(err1)
+
+ n1 := 0
+ err1 = messageEach(rows1, func(messageT) error {
+ n1++
+ if n1 == 4 {
+ return myErr
+ }
+ return nil
+ })
+
+ rows2, err2 := pending(topic, consumer)
+ g.TErrorIf(err2)
+
+ n2 := 0
+ err2 = messageEach(rows2, func(messageT) error {
+ n2++
+ return nil
+ })
+
+ g.TAssertEqual(err1, myErr)
+ g.TErrorIf(err2)
+ g.TAssertEqual(n1, 4)
+ g.TAssertEqual(n2, 6)
+ })
+
+ g.Testing("noop when given nil for *sql.Rows", func() {
+ err := messageEach(nil, func(messageT) error {
+ g.Unreachable()
+ return nil
+ })
+ g.TErrorIf(err)
+ })
+}
+
+func test_pendingStmt() {
+ g.TestStart("pendingStmt()")
+
+ const (
+ topic = "pending() topic"
+ payloadStr = "pending() payload"
+ consumer = "pending() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ pendingErr,
+ commitErr,
+ toDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ pendingClose,
+ commitClose,
+ toDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) messageT {
+ g.TErrorIf(take(topic, consumer))
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message
+ }
+ g.TErrorIf(take(topic, consumer))
+
+ collectPending := func(topic string, consumer string) []messageT {
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ var messages []messageT
+ err = messageEach(rows, func(message messageT) error {
+ messages = append(messages, message)
+ return nil
+ })
+ g.TErrorIf(err)
+ return messages
+ }
+
+
+ g.Testing("an empty database has 0 pending items", func() {
+ g.TAssertEqual(len(collectPending(topic, consumer)), 0)
+ })
+
+ g.Testing("after publishing we get all messages", func() {
+ expected := []messageT{
+ pub(topic),
+ pub(topic),
+ }
+
+ g.TAssertEqualI(collectPending(topic, consumer), expected)
+ })
+
+ g.Testing("we get the same messages when calling again", func() {
+ messages1 := collectPending(topic, consumer)
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+ g.TAssertEqualI(messages1, messages2)
+ })
+
+ g.Testing("we don't get messages from other topics", func() {
+ pub("other topic")
+
+ g.TAssertEqual(len(collectPending(topic, consumer)), 2)
+ g.TAssertEqual(len(collectPending("other topic", consumer)), 1)
+ })
+
+ g.Testing("after others commit, pending still returns them", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ messages1 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+ g.TErrorIf(
+ commit("other consumer", messages1[0].uuid),
+ )
+
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqualI(messages1, messages2)
+ })
+
+ g.Testing("committing other topic doesn't change current", func() {
+ messages1 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+
+ message := pub("other topic")
+
+ g.TErrorIf(commit(consumer, message.uuid))
+
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqualI(messages1, messages2)
+ })
+
+ g.Testing("after commiting, pending doesn't return them again", func() {
+ messages1 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+
+ g.TErrorIf(commit(consumer, messages1[0].uuid))
+
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages2), 1)
+ g.TAssertEqual(messages2[0], messages1[1])
+
+ g.TErrorIf(commit(consumer, messages1[1].uuid))
+
+ messages3 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages3), 0)
+ })
+
+ g.Testing("on deadletter, pending also doesn't return them", func() {
+ messages0 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages0), 0)
+
+ message1 := pub(topic)
+ message2 := pub(topic)
+
+ messages1 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages1), 2)
+
+ err = toDead(consumer, message1.uuid, guuid.New())
+ g.TErrorIf(err)
+
+ messages2 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages2), 1)
+ g.TAssertEqual(messages2[0], message2)
+
+ err = toDead(consumer, message2.uuid, guuid.New())
+ g.TErrorIf(err)
+
+ messages3 := collectPending(topic, consumer)
+ g.TAssertEqual(len(messages3), 0)
+ })
+
+ g.Testing("if commits are unordered, pending is still sorted", func() {
+ message1 := pub(topic)
+ message2 := pub(topic)
+ message3 := pub(topic)
+
+ g.TAssertEqual(collectPending(topic, consumer), []messageT{
+ message1,
+ message2,
+ message3,
+ })
+
+ g.TErrorIf(commit(consumer, message2.uuid))
+ g.TAssertEqual(collectPending(topic, consumer), []messageT{
+ message1,
+ message3,
+ })
+
+ g.TErrorIf(commit(consumer, message1.uuid))
+ g.TAssertEqual(collectPending(topic, consumer), []messageT{
+ message3,
+ })
+
+ g.TErrorIf(commit(consumer, message3.uuid))
+ g.TAssertEqual(len(collectPending(topic, consumer)), 0)
+ })
+
+ g.Testing("when we're not the owners we get nothing", func() {
+ otherID := instanceID + 1
+
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ message1 := pub(topic)
+ message2 := pub(topic)
+ message3 := pub(topic)
+ message4 := pub(topic)
+ message5 := pub(topic)
+
+ expected := []messageT{
+ message1,
+ message2,
+ message3,
+ message4,
+ message5,
+ }
+
+ g.TAssertEqual(collectPending(topic, consumer), expected)
+
+ err := take(topic, consumer)
+ g.TErrorIf(err)
+
+ rows, err := pending(topic, consumer)
+ g.TErrorIf(err)
+
+ err = messageEach(rows, func(messageT) error {
+ g.Unreachable()
+ return nil
+ })
+ g.TErrorIf(err)
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ pendingClose(),
+ pendingClose(),
+ pendingClose(),
+ ))
+ })
+}
+
+func test_commitStmt() {
+ g.TestStart("commitStmt()")
+
+ const (
+ topic = "commit() topic"
+ payloadStr = "commit() payload"
+ consumer = "commit() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ commitErr,
+ toDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ commitClose,
+ toDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+ cmt := func(consumer string, messageID guuid.UUID) error {
+ g.TErrorIf(take(topic, consumer))
+
+ return commit(consumer, messageID)
+ }
+
+
+ g.Testing("we can't commit twice", func() {
+ messageID := pub(topic)
+
+ err1 := cmt(consumer, messageID)
+ err2 := cmt(consumer, messageID)
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("we can't commit non-existent messages", func() {
+ err := cmt(consumer, guuid.New())
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("multiple consumers may commit a message", func() {
+ messageID := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ cmt(consumer, messageID),
+ cmt("other consumer", messageID),
+ cmt("yet another consumer", messageID),
+ ))
+ })
+
+ g.Testing("a consumer can commit to multiple topics", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub("other topic")
+ messageID3 := pub("yet another topic")
+
+ g.TErrorIf(g.SomeError(
+ cmt(consumer, messageID1),
+ cmt(consumer, messageID2),
+ cmt(consumer, messageID3),
+ ))
+ })
+
+ g.Testing("a consumer can consume many messages from a topic", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+ messageID3 := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ cmt(consumer, messageID1),
+ cmt(consumer, messageID2),
+ cmt(consumer, messageID3),
+ ))
+ })
+
+ g.Testing("we can't commit a dead message", func() {
+ messageID := pub(topic)
+
+ err1 := toDead(consumer, messageID, guuid.New())
+ err2 := cmt(consumer, messageID)
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("error if we don't own the topic/consumer", func() {
+ otherID := instanceID + 1
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ messageID := pub(topic)
+
+ err := take(topic, consumer)
+ g.TErrorIf(err)
+
+ err = commit(consumer, messageID)
+ g.TAssertEqual(err, fmt.Errorf(
+ noLongerOwnerErrorFmt,
+ instanceID,
+ topic,
+ consumer,
+ otherID,
+ ))
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ commitClose(),
+ commitClose(),
+ commitClose(),
+ ))
+ })
+}
+
+func test_toDeadStmt() {
+ g.TestStart("toDeadStmt()")
+
+ const (
+ topic = "toDead() topic"
+ payloadStr = "toDead() payload"
+ consumer = "toDead() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ commitErr,
+ toDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ commitClose,
+ toDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+ asDead := func(
+ consumer string,
+ messageID guuid.UUID,
+ deadletterID guuid.UUID,
+ ) error {
+ g.TErrorIf(take(topic, consumer))
+ return toDead(consumer, messageID, deadletterID)
+ }
+
+
+ g.Testing("we can't mark as dead twice", func() {
+ messageID := pub(topic)
+
+ err1 := asDead(consumer, messageID, guuid.New())
+ err2 := asDead(consumer, messageID, guuid.New())
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("we can't reuse a deadletter id", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+ deadletterID := guuid.New()
+
+ err1 := asDead(consumer, messageID1, deadletterID)
+ err2 := asDead(consumer, messageID2, deadletterID)
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+
+ })
+
+ g.Testing("we can't mark as dead non-existent messages", func() {
+ err := asDead(consumer, guuid.New(), guuid.New())
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("multiple consumers may mark a message as dead", func() {
+ messageID := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ asDead(consumer, messageID, guuid.New()),
+ asDead("another consumer", messageID, guuid.New()),
+ asDead("yet another consumer", messageID, guuid.New()),
+ ))
+ })
+
+ g.Testing("a consumer can mark as dead in multiple topics", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub("other topic")
+ messageID3 := pub("yet other topic")
+
+ g.TErrorIf(g.SomeError(
+ asDead(consumer, messageID1, guuid.New()),
+ asDead(consumer, messageID2, guuid.New()),
+ asDead(consumer, messageID3, guuid.New()),
+ ))
+ })
+
+ g.Testing("a consumer can produce many deadletters in a topic", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+ messageID3 := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ asDead(consumer, messageID1, guuid.New()),
+ asDead(consumer, messageID2, guuid.New()),
+ asDead(consumer, messageID3, guuid.New()),
+ ))
+ })
+
+ g.Testing("a consumer can intercalate commits and deadletters", func() {
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+ messageID3 := pub(topic)
+ messageID4 := pub(topic)
+ messageID5 := pub(topic)
+
+ g.TErrorIf(g.SomeError(
+ asDead(consumer, messageID1, guuid.New()),
+ commit(consumer, messageID2),
+ commit(consumer, messageID3),
+ asDead(consumer, messageID4, guuid.New()),
+ commit(consumer, messageID5),
+ ))
+ })
+
+ g.Testing("we can't mark a committed message as dead", func() {
+ messageID := pub(topic)
+
+ err1 := commit(consumer, messageID)
+ err2 := asDead(consumer, messageID, guuid.New())
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("error if we don't own the message's consumer/topic", func() {
+ otherID := instanceID + 1
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+
+ take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ g.TErrorIf(takeErr)
+ defer takeClose()
+
+ err := toDead(consumer, messageID1, guuid.New())
+ g.TErrorIf(err)
+
+ err = take(topic, consumer)
+ g.TErrorIf(err)
+
+ err = toDead(consumer, messageID2, guuid.New())
+ g.TAssertEqual(err, fmt.Errorf(
+ noLongerOwnerErrorFmt,
+ instanceID,
+ topic,
+ consumer,
+ otherID,
+ ))
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ toDeadClose(),
+ toDeadClose(),
+ toDeadClose(),
+ ))
+ })
+}
+
+func test_replayStmt() {
+ g.TestStart("replayStmt()")
+
+ const (
+ topic = "replay() topic"
+ payloadStr = "replay() payload"
+ consumer = "replay() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ replayErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ replayClose,
+ db.Close,
+ )
+
+ pub := func() messageT {
+ message, err := publish(unsent, guuid.New())
+ g.TErrorIf(err)
+ return message
+ }
+ g.TErrorIf(take(topic, consumer))
+
+
+ g.Testing("we can replay a message", func() {
+ message := pub()
+ deadletterID := guuid.New()
+ replayedID := guuid.New()
+
+ err1 := toDead(consumer, message.uuid, deadletterID)
+ replayed, err2 := replay(deadletterID, replayedID)
+ g.TErrorIf(g.SomeError(err1, err2))
+
+ g.TAssertEqual(replayed.uuid, replayedID)
+ g.TAssertEqual(replayed.id == message.id, false)
+ g.TAssertEqual(replayed.uuid == message.uuid, false)
+ })
+
+ g.Testing("a replayed message keeps its payload", func() {
+ message := pub()
+ deadletterID := guuid.New()
+ err := toDead(consumer, message.uuid, deadletterID)
+ g.TErrorIf(err)
+
+ replayed, err := replay(deadletterID, guuid.New())
+ g.TErrorIf(err)
+ g.TAssertEqual(message.flowID, replayed.flowID)
+ g.TAssertEqual(message.payload, replayed.payload)
+ })
+
+ g.Testing("we can't replay a dead message twice", func() {
+ message := pub()
+ deadletterID := guuid.New()
+
+ err := toDead(consumer, message.uuid, deadletterID)
+ g.TErrorIf(err)
+
+ _, err1 := replay(deadletterID, guuid.New())
+ _, err2 := replay(deadletterID, guuid.New())
+ g.TErrorIf(err1)
+ g.TAssertEqual(
+ err2.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintUnique,
+ )
+ })
+
+ g.Testing("we cant replay non-existent messages", func() {
+ _, err := replay(guuid.New(), guuid.New())
+ g.TAssertEqual(
+ err.(acudego.Error).ExtendedCode,
+ acudego.ErrConstraintNotNull,
+ )
+ })
+
+ g.Testing("messages can die and then be replayed many times", func() {
+ message := pub()
+ deadletterID1 := guuid.New()
+ deadletterID2 := guuid.New()
+
+ err := toDead(consumer, message.uuid, deadletterID1)
+ g.TErrorIf(err)
+
+ replayed1, err := replay(deadletterID1, guuid.New())
+ g.TErrorIf(err)
+
+ err = toDead(consumer, replayed1.uuid, deadletterID2)
+ g.TErrorIf(err)
+
+ replayed2, err := replay(deadletterID2, guuid.New())
+ g.TErrorIf(err)
+
+ g.TAssertEqual(message.flowID, replayed1.flowID)
+ g.TAssertEqual(replayed1.flowID, replayed2.flowID)
+ })
+
+ g.Testing("no actual closing occurs", func() {
+ g.TErrorIf(g.SomeError(
+ replayClose(),
+ replayClose(),
+ replayClose(),
+ ))
+ })
+}
+
+func test_oneDeadStmt() {
+ g.TestStart("oneDeadStmt()")
+
+ const (
+ topic = "oneDead() topic"
+ payloadStr = "oneDead() payload"
+ consumer = "oneDead() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ replayErr,
+ oneDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ replayClose,
+ oneDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+
+ g.Testing("error on missing deadletters", func() {
+ _, err := oneDead(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("deadletters on other topics don't show for us", func() {
+ err := toDead(consumer, pub("other topic"), guuid.New())
+ g.TErrorIf(err)
+
+ _, err = oneDead(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("deadletters for other consumers don't show for use", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ err := toDead("other consumer", pub(topic), guuid.New())
+ g.TErrorIf(err)
+
+ _, err = oneDead(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+
+ g.Testing("after being replayed deadletters aren't returned", func() {
+ messageID1 := guuid.New()
+ messageID2 := guuid.New()
+ messageID3 := guuid.New()
+
+ err1 := toDead(consumer, pub(topic), messageID1)
+ err2 := toDead(consumer, pub(topic), messageID2)
+ err3 := toDead(consumer, pub(topic), messageID3)
+ g.TErrorIf(g.SomeError(err1, err2, err3))
+
+ deadletter, err := oneDead(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(deadletter.uuid, messageID1)
+
+ _, err = replay(messageID2, guuid.New())
+ g.TErrorIf(err)
+
+ deadletter, err = oneDead(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(deadletter.uuid, messageID1)
+
+ _, err = replay(messageID1, guuid.New())
+ g.TErrorIf(err)
+
+ deadletter, err = oneDead(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(deadletter.uuid, messageID3)
+
+ _, err = replay(messageID3, guuid.New())
+ g.TErrorIf(err)
+
+ _, err = oneDead(topic, consumer)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ })
+}
+
+func test_deadletterEach() {
+ g.TestStart("deadletterEach")
+
+ const (
+ topic = "deadletterEach() topic"
+ payloadStr = "deadletterEach() payload"
+ consumer = "deadletterEach() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ allDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ allDeadClose,
+ db.Close,
+ )
+
+ pub := func() guuid.UUID {
+ message, err := publish(unsent, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+ dead := func(messageID guuid.UUID) guuid.UUID {
+ deadletterID := guuid.New()
+ err := toDead(consumer, messageID, deadletterID)
+ g.TErrorIf(err)
+
+ return deadletterID
+ }
+ g.TErrorIf(take(topic, consumer))
+
+
+ g.Testing("not called on empty set", func() {
+ rows, err := allDead(topic, consumer)
+ g.TErrorIf(err)
+
+ n := 0
+ deadletterEach(rows, func(deadletterT, messageT) error {
+ n++
+ return nil
+ })
+ })
+
+ g.Testing("the callback is called once for each entry", func() {
+ expected := []guuid.UUID{
+ dead(pub()),
+ dead(pub()),
+ dead(pub()),
+ }
+
+ rows, err := allDead(topic, consumer)
+ g.TErrorIf(err)
+
+ var deadletterIDs []guuid.UUID
+ deadletterEach(rows, func(
+ deadletter deadletterT,
+ _ messageT,
+ ) error {
+ deadletterIDs = append(deadletterIDs, deadletter.uuid)
+ return nil
+ })
+
+ g.TAssertEqual(deadletterIDs, expected)
+ })
+
+ g.Testing("we halt if the timestamp is ill-formatted", func() {
+ messageID := pub()
+ message_id_bytes := messageID[:]
+ dead(messageID)
+ dead(pub())
+ dead(pub())
+ dead(pub())
+ dead(pub())
+
+ const tmplUpdate = `
+ UPDATE "%s_offsets"
+ SET timestamp = '01-01-1970'
+ WHERE message_id IN (
+ SELECT id FROM "%s_messages" WHERE uuid = ?
+ );
+ `
+ sqlUpdate := fmt.Sprintf(tmplUpdate, prefix, prefix)
+ _, err := db.Exec(sqlUpdate, message_id_bytes)
+ g.TErrorIf(err)
+
+ rows, err := allDead(topic, consumer)
+ g.TErrorIf(err)
+
+ n := 0
+ err = deadletterEach(rows, func(deadletterT, messageT) error {
+ n++
+ return nil
+ })
+
+ g.TAssertEqual(
+ err,
+ &time.ParseError{
+ Layout: time.RFC3339Nano,
+ Value: "01-01-1970",
+ LayoutElem: "2006",
+ ValueElem: "01-01-1970",
+ Message: "",
+ },
+ )
+ g.TAssertEqual(n, 3)
+
+ const tmplDelete = `
+ DELETE FROM "%s_offsets"
+ WHERE message_id IN (
+ SELECT id FROM "%s_messages" WHERE uuid = ?
+ );
+ `
+ sqlDelete := fmt.Sprintf(tmplDelete, prefix, prefix)
+ _, err = db.Exec(sqlDelete, message_id_bytes)
+ g.TErrorIf(err)
+ })
+
+ g.Testing("we halt if the callback returns an error", func() {
+ myErr := errors.New("early return error")
+
+ rows1, err1 := allDead(topic, consumer)
+ g.TErrorIf(err1)
+
+ n1 := 0
+ err1 = deadletterEach(rows1, func(deadletterT, messageT) error {
+ n1++
+ if n1 == 1 {
+ return myErr
+ }
+ return nil
+ })
+
+ rows2, err2 := allDead(topic, consumer)
+ g.TErrorIf(err2)
+
+ n2 := 0
+ err = deadletterEach(rows2, func(deadletterT, messageT) error {
+ n2++
+ return nil
+ })
+
+ g.TAssertEqual(err1, myErr)
+ g.TErrorIf(err2)
+ g.TAssertEqual(n1, 1)
+ g.TAssertEqual(n2, 7)
+ })
+}
+
+func test_allDeadStmt() {
+ g.TestStart("allDeadStmt()")
+
+ const (
+ topic = "allDead() topic"
+ payloadStr = "allDead() payload"
+ consumer = "allDead() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ replayErr,
+ allDeadErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ replayClose,
+ allDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+ collectAll := func(
+ topic string,
+ consumer string,
+ ) ([]deadletterT, []messageT) {
+ var (
+ deadletters []deadletterT
+ messages []messageT
+ )
+ eachFn := func(
+ deadletter deadletterT,
+ message messageT,
+ ) error {
+ deadletters = append(deadletters, deadletter)
+ messages = append(messages, message)
+ return nil
+ }
+
+ rows, err := allDead(topic, consumer)
+ g.TErrorIf(err)
+
+ err = deadletterEach(rows, eachFn)
+ g.TErrorIf(err)
+
+ return deadletters, messages
+ }
+
+
+ g.Testing("no entry on empty deadletters", func() {
+ deadletterIDs, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(deadletterIDs), 0)
+ })
+
+ g.Testing("deadletters on other topics don't show up", func() {
+ err := toDead(consumer, pub("other topic"), guuid.New())
+ g.TErrorIf(err)
+
+ deadletters, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(deadletters), 0)
+ })
+
+ g.Testing("deadletters of other consumers don't show up", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ err := toDead("other consumer", pub(topic), guuid.New())
+ g.TErrorIf(err)
+
+ deadletterIDs, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(deadletterIDs), 0)
+ })
+
+ g.Testing("deadletters are given in order", func() {
+ deadletterIDs := []guuid.UUID{
+ guuid.New(),
+ guuid.New(),
+ guuid.New(),
+ }
+ messageIDs := []guuid.UUID{
+ pub(topic),
+ pub(topic),
+ pub(topic),
+ }
+
+ err1 := toDead(consumer, messageIDs[0], deadletterIDs[0])
+ err2 := toDead(consumer, messageIDs[1], deadletterIDs[1])
+ err3 := toDead(consumer, messageIDs[2], deadletterIDs[2])
+ g.TErrorIf(g.SomeError(err1, err2, err3))
+
+ deadletters, messages := collectAll(topic, consumer)
+ g.TAssertEqual(deadletters[0].uuid, deadletterIDs[0])
+ g.TAssertEqual(deadletters[1].uuid, deadletterIDs[1])
+ g.TAssertEqual(deadletters[2].uuid, deadletterIDs[2])
+ g.TAssertEqual( messages[0].uuid, messageIDs[0])
+ g.TAssertEqual( messages[1].uuid, messageIDs[1])
+ g.TAssertEqual( messages[2].uuid, messageIDs[2])
+ g.TAssertEqual(len(deadletters), 3)
+ g.TAssertEqual(len(messages), 3)
+ })
+
+ g.Testing("after being replayed, they stop appearing", func() {
+ deadletters, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(deadletters), 3)
+
+ _, err := replay(deadletters[0].uuid, guuid.New())
+ g.TErrorIf(err)
+ collecteds, _ := collectAll(topic, consumer)
+ g.TAssertEqual(len(collecteds), 2)
+
+ _, err = replay(deadletters[1].uuid, guuid.New())
+ g.TErrorIf(err)
+ collecteds, _ = collectAll(topic, consumer)
+ g.TAssertEqual(len(collecteds), 1)
+
+ _, err = replay(deadletters[2].uuid, guuid.New())
+ g.TErrorIf(err)
+ collecteds, _ = collectAll(topic, consumer)
+ g.TAssertEqual(len(collecteds), 0)
+ })
+}
+
+func test_sizeStmt() {
+ g.TestStart("sizeStmt()")
+
+ const (
+ topic = "size() topic"
+ payloadStr = "size() payload"
+ consumer = "size() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
+ size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ toDeadErr,
+ replayErr,
+ oneDeadErr,
+ sizeErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ toDeadClose,
+ replayClose,
+ sizeClose,
+ oneDeadClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+
+ g.Testing("0 on empty topic", func() {
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("other topics don't fall into our count", func() {
+ pub("other topic")
+
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("otherwise we just get the sum", func() {
+ pub(topic)
+ pub(topic)
+ pub(topic)
+ pub(topic)
+ pub(topic)
+
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 5)
+ })
+
+ g.Testing("deadletters aren't taken into account", func() {
+ sixthMessageID := pub(topic)
+ err := toDead(consumer, sixthMessageID, guuid.New())
+ g.TErrorIf(err)
+
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 6)
+ })
+
+ g.Testing("after replay, deadletters increases the size", func() {
+ deadletter, err := oneDead(topic, consumer)
+ g.TErrorIf(err)
+
+ _, err = replay(deadletter.uuid, guuid.New())
+ g.TErrorIf(err)
+
+ n, err := size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 7)
+ })
+}
+
+func test_countStmt() {
+ g.TestStart("countStmt()")
+
+ const (
+ topic = "count() topic"
+ payloadStr = "count() payload"
+ consumer = "count() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ count, countClose, countErr := countStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ nextErr,
+ commitErr,
+ toDeadErr,
+ countErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ nextClose,
+ commitClose,
+ toDeadClose,
+ countClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+
+ g.Testing("0 on empty topic", func() {
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("other topics don't add to our count", func() {
+ err := commit(consumer, pub("other topic"))
+ g.TErrorIf(err)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("other consumers don't influence our count", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ err := commit("other consumer", pub(topic))
+ g.TErrorIf(err)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("unconsumed messages don't count", func() {
+ pub(topic)
+ pub(topic)
+ pub(topic)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 0)
+ })
+
+ g.Testing("consumed messages do count", func() {
+ message, err := next(topic, consumer)
+ g.TErrorIf(err)
+ err = commit(consumer, message.uuid)
+ g.TErrorIf(err)
+
+ message, err = next(topic, consumer)
+ g.TErrorIf(err)
+ err = commit(consumer, message.uuid)
+ g.TErrorIf(err)
+
+ message, err = next(topic, consumer)
+ g.TErrorIf(err)
+ err = commit(consumer, message.uuid)
+ g.TErrorIf(err)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 3)
+ })
+
+ g.Testing("deadletters count as consumed", func() {
+ message, err := next(topic, consumer)
+ g.TErrorIf(err)
+
+ err = toDead(consumer, message.uuid, guuid.New())
+ g.TErrorIf(err)
+
+ n, err := count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(n, 4)
+ })
+}
+
+func test_hasDataStmt() {
+ g.TestStart("hasDataStmt()")
+
+ const (
+ topic = "hasData() topic"
+ payloadStr = "hasData() payload"
+ consumer = "hasData() consumer"
+ prefix = defaultPrefix
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ g.TErrorIf(createTables(db, prefix))
+
+ instanceID := os.Getpid()
+ take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
+ commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID)
+ g.TErrorIf(g.SomeError(
+ takeErr,
+ publishErr,
+ nextErr,
+ commitErr,
+ toDeadErr,
+ hasDataErr,
+ ))
+ defer g.SomeFnError(
+ takeClose,
+ publishClose,
+ nextClose,
+ commitClose,
+ toDeadClose,
+ hasDataClose,
+ db.Close,
+ )
+
+ pub := func(topic string) guuid.UUID {
+ g.TErrorIf(take(topic, consumer))
+
+ unsentWithTopic := unsent
+ unsentWithTopic.Topic = topic
+ message, err := publish(unsentWithTopic, guuid.New())
+ g.TErrorIf(err)
+ return message.uuid
+ }
+
+
+ g.Testing("false on empty topic", func() {
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, false)
+ })
+
+ g.Testing("other topics don't change the response", func() {
+ pub("other topic")
+
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, false)
+ })
+
+ g.Testing("published messages flip the flag", func() {
+ pub(topic)
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, true)
+ })
+
+ g.Testing("other consumers don't influence us", func() {
+ g.TErrorIf(take(topic, "other consumer"))
+ message, err := next(topic, "other consumer")
+ g.TErrorIf(err)
+
+ err = commit("other consumer", message.uuid)
+ g.TErrorIf(err)
+
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, true)
+ })
+
+ g.Testing("consuming messages unflips the result", func() {
+ message, err := next(topic, consumer)
+ g.TErrorIf(err)
+
+ err = commit(consumer, message.uuid)
+ g.TErrorIf(err)
+
+ has, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has, false)
+ })
+
+ g.Testing("same for deadletters", func() {
+ has0, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has0, false)
+
+ messageID1 := pub(topic)
+ messageID2 := pub(topic)
+
+ has1, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has1, true)
+
+ err = toDead(consumer, messageID1, guuid.New())
+ g.TErrorIf(err)
+ err = commit(consumer, messageID2)
+ g.TErrorIf(err)
+
+ has2, err := hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(has2, false)
+ })
+}
+
+func test_initDB() {
+ g.TestStart("initDB()")
+
+ const (
+ topic = "initDB() topic"
+ payloadStr = "initDB() payload"
+ consumer = "initDB() consumer"
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ var messages []messageT
+ notifyFn := func(message messageT) {
+ messages = append(messages, message)
+ }
+
+ instanceID := os.Getpid()
+ queries, err := initDB(db, defaultPrefix, notifyFn, instanceID)
+ g.TErrorIf(err)
+ defer queries.close()
+
+ g.TErrorIf(queries.take(topic, consumer))
+
+
+ g.Testing("we can perform all the wrapped operations", func() {
+ messageID := guuid.New()
+ newMessageID := guuid.New()
+ deadletterID := guuid.New()
+
+ messageV1, err := queries.publish(unsent, messageID)
+ g.TErrorIf(err)
+
+ messageV2, err := queries.next(topic, consumer)
+ g.TErrorIf(err)
+
+ var messagesV3 []messageT
+ pendingFn := func(message messageT) error {
+ messagesV3 = append(messagesV3, message)
+ return nil
+ }
+ err = queries.pending(topic, consumer, pendingFn)
+ g.TErrorIf(err)
+ g.TAssertEqual(len(messagesV3), 1)
+ messageV3 := messagesV3[0]
+
+ err = queries.toDead(consumer, messageID, deadletterID)
+ g.TErrorIf(err)
+
+ deadletterV1, err := queries.oneDead(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(deadletterV1.uuid, deadletterID)
+
+ var (
+ deadlettersV2 []deadletterT
+ messagesV4 []messageT
+ )
+ deadletterFn := func(
+ deadletter deadletterT,
+ message messageT,
+ ) error {
+ deadlettersV2 = append(deadlettersV2, deadletter)
+ messagesV4 = append(messagesV4, message)
+ return nil
+ }
+ err = queries.allDead(topic, consumer, deadletterFn)
+ g.TErrorIf(err)
+ g.TAssertEqual(len(deadlettersV2), 1)
+ g.TAssertEqual(deadlettersV2[0].uuid, deadletterID)
+ g.TAssertEqual(len(messagesV4), 1)
+ messageV4 := messagesV4[0]
+
+ g.TAssertEqual(messageV1, messageV2)
+ g.TAssertEqual(messageV1, messageV3)
+ g.TAssertEqual(messageV1, messageV4)
+
+ newMessageV1, err := queries.replay(deadletterID, newMessageID)
+ g.TErrorIf(err)
+
+ err = queries.commit(consumer, newMessageID)
+ g.TErrorIf(err)
+
+ newMessageV0 := messageV1
+ newMessageV0.id = newMessageV1.id
+ newMessageV0.uuid = newMessageID
+ newMessageV0.timestamp = newMessageV1.timestamp
+ g.TAssertEqual(newMessageV1, newMessageV0)
+
+ size, err := queries.size(topic)
+ g.TErrorIf(err)
+ g.TAssertEqual(size, 2)
+
+ count, err := queries.count(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(count, 2)
+
+ hasData, err := queries.hasData(topic, consumer)
+ g.TErrorIf(err)
+ g.TAssertEqual(hasData, false)
+ })
+}
+
+func test_queriesTclose() {
+ g.TestStart("queriesT.close()")
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ instanceID := os.Getpid()
+ queries, err := initDB(db, defaultPrefix, func(messageT) {}, instanceID)
+ g.TErrorIf(err)
+
+
+ g.Testing("after closing, we can't run queries", func() {
+ unsent := UnsentMessage{ Payload: []byte{}, }
+ _, err := queries.publish(unsent, guuid.New())
+ g.TErrorIf(err)
+ g.TErrorIf(db.Close())
+
+ err = queries.close()
+ g.TErrorIf(err)
+
+ _, err = queries.publish(unsent, guuid.New())
+ g.TAssertEqual(
+ err.Error(),
+ "sql: database is closed",
+ )
+ })
+
+ g.Testing("closing mre than once does not error", func() {
+ g.TErrorIf(g.SomeError(
+ queries.close(),
+ queries.close(),
+ ))
+ })
+}
+
+
+func test_newPinger() {
+ g.TestStart("newPinger()")
+
+ g.Testing("onPing() on a closed pinger is a noop", func() {
+ pinger := newPinger[string]()
+ pinger.close()
+ pinger.onPing(func(s string) {
+ panic(s)
+ })
+ pinger.tryPing("ignored")
+ })
+
+ g.Testing("onPing() on a closed pinger with data gets that", func() {
+ pinger := newPinger[string]()
+ pinger.tryPing("received")
+ pinger.tryPing("ignored")
+ g.TAssertEqual(pinger.closed(), false)
+
+ pinger.close()
+ g.TAssertEqual(pinger.closed(), true)
+
+ c := make(chan string)
+ count := 0
+ go pinger.onPing(func(s string) {
+ if count > 0 {
+ panic(s)
+ }
+ count++
+
+ c <- s
+ })
+
+ given := <- c
+ g.TAssertEqual(given, "received")
+ })
+
+ g.Testing("when onPing is late, we loose messages", func() {
+ pinger := newPinger[string]()
+ pinger.tryPing("first seen")
+ pinger.tryPing("second dropped")
+ pinger.tryPing("third dropped")
+
+ c := make(chan string)
+ go pinger.onPing(func(s string) {
+ c <- s
+ })
+ s := <- c
+
+ close(c)
+ pinger.close()
+ g.TAssertEqual(s, "first seen")
+ })
+
+ g.Testing("if onPing is on time, it may not loose", func() {
+ pinger := newPinger[string]()
+ pinger.tryPing("first seen")
+
+ c := make(chan string)
+ go pinger.onPing(func(s string) {
+ c <- s
+ })
+
+ s1 := <- c
+ pinger.tryPing("second seen")
+ s2 := <- c
+
+ close(c)
+ pinger.close()
+ g.TAssertEqual(s1, "first seen")
+ g.TAssertEqual(s2, "second seen")
+ })
+
+ g.Testing("if onPing takes too long, it still looses messages", func() {
+ pinger := newPinger[string]()
+ pinger.tryPing("first seen")
+
+ c1 := make(chan string)
+ c2 := make(chan struct{})
+ go pinger.onPing(func(s string) {
+ c1 <- s
+ c2 <- struct{}{}
+ })
+
+ s1 := <- c1
+ pinger.tryPing("second seen")
+ pinger.tryPing("third dropped")
+ <- c2
+ s2 := <- c1
+ <- c2
+
+ close(c2)
+ close(c1)
+ pinger.close()
+ g.TAssertEqual(s1, "first seen")
+ g.TAssertEqual(s2, "second seen")
+ })
+}
+
+func test_makeSubscriptionsFunc() {
+ g.TestStart("makeSubscriptionsFunc()")
+
+ g.Testing("we can have multiple readers", func() {
+ subscriptions := makeSubscriptionsFuncs()
+
+ var (
+ readStarted sync.WaitGroup
+ readFinished sync.WaitGroup
+ )
+ c := make(chan struct{})
+ readFn := func(subscriptionsSetM) error {
+ readStarted.Done()
+ <- c
+ return nil
+ }
+ addReader := func() {
+ readStarted.Add(1)
+ readFinished.Add(1)
+ go func() {
+ subscriptions.read(readFn)
+ readFinished.Done()
+ }()
+ }
+
+ addReader()
+ addReader()
+ addReader()
+ readStarted.Wait()
+ c <- struct{}{}
+ c <- struct{}{}
+ c <- struct{}{}
+ readFinished.Wait()
+ })
+
+ g.Testing("writers are exclusive", func() {
+ subscriptions := makeSubscriptionsFuncs()
+
+ var (
+ readStarted sync.WaitGroup
+ readFinished sync.WaitGroup
+ writeWillStart sync.WaitGroup
+ writeFinished sync.WaitGroup
+ )
+ c := make(chan string)
+ readFn := func(subscriptionsSetM) error {
+ readStarted.Done()
+ c <- "reader"
+ return nil
+ }
+ addReader := func() {
+ readStarted.Add(1)
+ readFinished.Add(1)
+ go func() {
+ subscriptions.read(readFn)
+ readFinished.Done()
+ }()
+ }
+
+ writeFn := func(subscriptionsSetM) error {
+ c <- "writer"
+ return nil
+ }
+ addWriter := func() {
+ writeWillStart.Add(1)
+ writeFinished.Add(1)
+ go func() {
+ writeWillStart.Done()
+ subscriptions.write(writeFn)
+ writeFinished.Done()
+ }()
+ }
+
+ addReader()
+ addReader()
+ addReader()
+ readStarted.Wait()
+ addWriter()
+ writeWillStart.Wait()
+
+ g.TAssertEqual(<-c, "reader")
+ g.TAssertEqual(<-c, "reader")
+ g.TAssertEqual(<-c, "reader")
+
+ readFinished.Wait()
+ g.TAssertEqual(<-c, "writer")
+ writeFinished.Wait()
+ })
+}
+
+func test_makeNotifyFn() {
+ g.TestStart("makeNotifyFn()")
+
+ g.Testing("when topic is nil only top pinger gets pinged", func() {
+ pinger1 := newPinger[struct{}]()
+ pinger2 := newPinger[[]byte]()
+ defer pinger1.close()
+ defer pinger2.close()
+
+ go pinger1.onPing(func(struct{}) {
+ panic("consumer pinger")
+ })
+ go pinger2.onPing(func(payload []byte) {
+ panic("waiter pinger")
+ })
+
+ flowID := guuid.New()
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer-1": consumerT{
+ pinger: pinger1,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter-1": waiterT{
+ pinger: pinger2,
+ },
+ },
+ },
+ },
+ }
+ subsFn := func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ }
+ topPinger := newPinger[struct{}]()
+ defer topPinger.close()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go topPinger.onPing(func(struct{}) {
+ wg.Done()
+ })
+
+ notifyFn := makeNotifyFn(subsFn, topPinger)
+
+ message := messageT{
+ uuid: guuid.New(),
+ topic: "nobody is subscribed to this one",
+ payload: []byte("nobody with get this payload"),
+ }
+ notifyFn(message)
+ wg.Wait()
+ })
+
+ g.Testing("otherwise all interested subscribers get pinged", func() {
+ const topic = "the topic name"
+
+ pinger1 := newPinger[struct{}]()
+ pinger2 := newPinger[[]byte]()
+ defer pinger1.close()
+ defer pinger2.close()
+
+ var wg sync.WaitGroup
+ go pinger1.onPing(func(struct{}) {
+ wg.Done()
+ })
+ go pinger2.onPing(func([]byte) {
+ wg.Done()
+ })
+ wg.Add(2)
+
+ flowID := guuid.New()
+
+ set := subscriptionsSetM{
+ topic: topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer-1": consumerT{
+ pinger: pinger1,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter-1": waiterT{
+ pinger: pinger2,
+ },
+ },
+ },
+ },
+ }
+
+ subsFn := func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ }
+
+ topPinger := newPinger[struct{}]()
+ defer topPinger.close()
+ go topPinger.onPing(func(struct{}) {
+ wg.Done()
+ })
+ wg.Add(1)
+
+ notifyFn := makeNotifyFn(subsFn, topPinger)
+
+ message := messageT{
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: flowID,
+ payload: []byte("ignored in this test"),
+ }
+ notifyFn(message)
+ wg.Wait()
+ })
+}
+
+func test_collectClosedWaiters() {
+ g.TestStart("collectClosedWaiter()")
+
+ g.Testing("collects all the reports to be closed", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+ flowID4 := guuid.New()
+ flowID5 := guuid.New()
+
+ mkwaiter := func(closed bool) waiterT {
+ fn := func() bool {
+ return closed
+ }
+ return waiterT{
+ closed: &fn,
+ }
+ }
+
+ set := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": mkwaiter(false),
+ "waiter-2": mkwaiter(true),
+ "waiter-3": mkwaiter(true),
+ },
+ flowID2: map[string]waiterT{
+ "waiter-4": mkwaiter(true),
+ "waiter-5": mkwaiter(false),
+ },
+ },
+ },
+ "topic-2": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID3: map[string]waiterT{
+ "waiter-1": mkwaiter(false),
+ "waiter-2": mkwaiter(false),
+ },
+ flowID4: map[string]waiterT{
+ "waiter-3": mkwaiter(true),
+ "waiter-4": mkwaiter(true),
+ "waiter-5": mkwaiter(true),
+ "waiter-6": mkwaiter(true),
+ },
+ },
+ },
+ "topic-3": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID5: map[string]waiterT{},
+ },
+ },
+ "topic-4": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := map[string]map[guuid.UUID][]string{
+ "topic-1": map[guuid.UUID][]string{
+ flowID1: []string{
+ "waiter-2",
+ "waiter-3",
+ },
+ flowID2: []string{
+ "waiter-4",
+ },
+ },
+ "topic-2": map[guuid.UUID][]string{
+ flowID3: []string{},
+ flowID4: []string{
+ "waiter-3",
+ "waiter-4",
+ "waiter-5",
+ "waiter-6",
+ },
+ },
+ "topic-3": map[guuid.UUID][]string{
+ flowID5: []string{},
+ },
+ "topic-4": map[guuid.UUID][]string{},
+ }
+
+ given := collectClosedWaiters(set)
+
+ // sort names for equality
+ for _, waitersIndex := range given {
+ for _, names := range waitersIndex {
+ sort.Strings(names)
+ }
+ }
+ g.TAssertEqual(given, expected)
+ })
+}
+
+func test_trimEmptyLeaves() {
+ g.TestStart("trimEmptyLeaves()")
+
+ g.Testing("noop on an empty index", func() {
+ input := map[string]map[guuid.UUID][]string{}
+ expected := map[string]map[guuid.UUID][]string{}
+
+ trimEmptyLeaves(input)
+ g.TAssertEqual(input, expected)
+ })
+
+ g.Testing("simplifies tree when it can", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+ flowID4 := guuid.New()
+
+ input := map[string]map[guuid.UUID][]string{
+ "topic-1": map[guuid.UUID][]string{
+ flowID1: []string{
+ "waiter-1",
+ },
+ flowID2: []string{},
+ },
+ "topic-2": map[guuid.UUID][]string{
+ flowID3: []string{},
+ flowID4: []string{},
+ },
+ "topic-3": map[guuid.UUID][]string{},
+ }
+
+ expected := map[string]map[guuid.UUID][]string{
+ "topic-1": map[guuid.UUID][]string{
+ flowID1: []string{
+ "waiter-1",
+ },
+ },
+ }
+
+ trimEmptyLeaves(input)
+ g.TAssertEqual(input, expected)
+ })
+
+ g.Testing("fully prune tree if possible", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+ flowID4 := guuid.New()
+ flowID5 := guuid.New()
+
+ input := map[string]map[guuid.UUID][]string{
+ "topic-1": map[guuid.UUID][]string{},
+ "topic-2": map[guuid.UUID][]string{},
+ "topic-3": map[guuid.UUID][]string{},
+ "topic-4": map[guuid.UUID][]string{
+ flowID1: []string{},
+ },
+ "topic-5": map[guuid.UUID][]string{
+ flowID2: []string{},
+ flowID3: []string{},
+ flowID4: []string{},
+ flowID5: []string{},
+ },
+ }
+
+ expected := map[string]map[guuid.UUID][]string{}
+
+ trimEmptyLeaves(input)
+ g.TAssertEqual(input, expected)
+ })
+}
+
+func test_deleteIfEmpty() {
+ g.TestStart("deleteIfEmpty()")
+
+ g.Testing("noop if there are consumers", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ },
+ }
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ },
+ }
+ expected2 := subscriptionsSetM{}
+
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected1)
+
+ delete(set["topic"].consumers, "consumer")
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected2)
+ })
+
+ g.Testing("noop if there are waiters", func() {
+ flowID := guuid.New()
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: nil,
+ },
+ },
+ }
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: nil,
+ },
+ },
+ }
+ expected2 := subscriptionsSetM{}
+
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected1)
+
+ delete(set["topic"].waiters, flowID)
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected2)
+ })
+
+ g.Testing("otherwise deletes when empty", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{}
+
+ deleteIfEmpty(set, "topic")
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("unrelated topics are left untouched", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ deleteIfEmpty(set, "another-topic")
+ g.TAssertEqual(set, expected)
+ })
+}
+
+func test_deleteEmptyTopics() {
+ g.TestStart("deleteEmptyTopics()")
+
+ g.Testing("cleans up all empty topics from the set", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+
+ set := subscriptionsSetM{
+ "empty": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ "has-consumers": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ "has-waiters": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: nil,
+ },
+ },
+ "has-both": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID2: nil,
+ },
+ },
+ "has-neither": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "has-consumers": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ "has-waiters": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: nil,
+ },
+ },
+ "has-both": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID2: nil,
+ },
+ },
+ }
+
+ deleteEmptyTopics(set)
+ g.TAssertEqual(set, expected)
+ })
+}
+
+func test_removeClosedWaiter() {
+ g.TestStart("removeClosedWaiter()")
+
+ g.Testing("removes from set all that we request", func() {
+ flowID0 := guuid.New()
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+
+ set := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": waiterT{},
+ "waiter-2": waiterT{},
+ },
+ flowID2: map[string]waiterT{
+ "waiter-3": waiterT{},
+ "waiter-4": waiterT{},
+ "waiter-5": waiterT{},
+ },
+ },
+ },
+ "topic-2": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID3: map[string]waiterT{
+ "waiter-6": waiterT{},
+ "waiter-7": waiterT{},
+ "waiter-8": waiterT{},
+ },
+ },
+ },
+ "topic-3": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ input := map[string]map[guuid.UUID][]string{
+ "topic-0": map[guuid.UUID][]string{
+ flowID0: []string{
+ "waiter-0",
+ },
+ },
+ "topic-1": map[guuid.UUID][]string{
+ flowID1: []string{
+ "waiter-2",
+ },
+ flowID2: []string{
+ "waiter-3",
+ "waiter-4",
+ "waiter-5",
+ },
+ },
+ "topic-2": map[guuid.UUID][]string{
+ flowID3: []string{
+ "waiter-6",
+ "waiter-7",
+ "waiter-8",
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": waiterT{},
+ },
+ },
+ },
+ }
+
+ removeClosedWaiters(set, input)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("empty flowIDs from input GET LEAKED", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+
+ input := map[string]map[guuid.UUID][]string{
+ "topic-2": map[guuid.UUID][]string{
+ flowID2: []string{
+ "waiter",
+ },
+ },
+ }
+
+ set := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{},
+ },
+ },
+ "topic-2": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID2: map[string]waiterT{
+ "waiter": waiterT{},
+ },
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{},
+ },
+ },
+ }
+
+ removeClosedWaiters(set, input)
+ g.TAssertEqual(set, expected)
+ })
+}
+
+func test_reapClosedWaiters() {
+ g.TestStart("reapClosedWaiters()")
+
+ g.Testing("trimEmptyLeaves() prevents removal of empty flows", func() {
+ var (
+ set subscriptionsSetM
+ readCount = 0
+ writeCount = 0
+ )
+ readFn := func(fn func(subscriptionsSetM) error) error {
+ readCount++
+ return fn(set)
+ }
+ writeFn := func(fn func(subscriptionsSetM) error) error {
+ writeCount++
+ return fn(set)
+ }
+
+ openFn := func() bool {
+ return false
+ }
+ closedFn := func() bool {
+ return true
+ }
+ open := waiterT{ closed: &openFn }
+ closed := waiterT{ closed: &closedFn }
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+
+ set = subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": open,
+ "waiter-2": open,
+ "waiter-3": open,
+ },
+ flowID2: map[string]waiterT{
+ "waiter-4": open,
+ },
+ },
+ },
+ }
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": open,
+ "waiter-2": open,
+ "waiter-3": open,
+ },
+ flowID2: map[string]waiterT{
+ "waiter-4": open,
+ },
+ },
+ },
+ }
+
+ expected2 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-2": open,
+ "waiter-3": open,
+ },
+ flowID2: map[string]waiterT{
+ "waiter-4": open,
+ },
+ },
+ },
+ }
+
+ expected3 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-2": open,
+ "waiter-3": open,
+ },
+ },
+ },
+ }
+
+ reapClosedWaiters(readFn, writeFn)
+ g.TAssertEqual(readCount, 1)
+ g.TAssertEqual(writeCount, 0)
+ g.TAssertEqual(set, expected1)
+
+ set["topic"].waiters[flowID1]["waiter-1"] = closed
+ reapClosedWaiters(readFn, writeFn)
+ g.TAssertEqual(readCount, 2)
+ g.TAssertEqual(writeCount, 1)
+ g.TAssertEqual(set, expected2)
+
+ set["topic"].waiters[flowID2]["waiter-4"] = closed
+ reapClosedWaiters(readFn, writeFn)
+ g.TAssertEqual(readCount, 3)
+ g.TAssertEqual(writeCount, 2)
+ g.TAssertEqual(set, expected3)
+ })
+}
+
+func test_everyNthCall() {
+ g.TestStart("everyNthCall()")
+
+ g.Testing("0 (incorrect) would make fn never be called", func() {
+ never := everyNthCall[int](0, func(int) {
+ panic("unreachable")
+ })
+ for i := 0; i < 100; i++ {
+ never(i)
+ }
+ })
+
+ g.Testing("the first call is delayed to be the nth", func() {
+ count := 0
+ values := []string{}
+ fn := everyNthCall[string](3, func(s string) {
+ count++
+ values = append(values, s)
+ })
+
+ fn("1 ignored")
+ g.TAssertEqual(count, 0)
+ fn("2 ignored")
+ g.TAssertEqual(count, 0)
+ fn("3 not ignored")
+ g.TAssertEqual(count, 1)
+
+ fn("4 ignored")
+ fn("5 ignored")
+ g.TAssertEqual(count, 1)
+
+ fn("6 not ignored")
+ fn("7 ignored")
+ fn("8 ignored")
+ g.TAssertEqual(count, 2)
+
+ fn("9 not ignored")
+ g.TAssertEqual(count, 3)
+
+ expected := []string{
+ "3 not ignored",
+ "6 not ignored",
+ "9 not ignored",
+ }
+ g.TAssertEqual(values, expected)
+ })
+}
+
+func test_runReaper() {
+ g.TestStart("runReaper()")
+
+ g.Testing("debounce reapClosedWaiters `reaperSkipCount` times", func() {
+ set := subscriptionsSetM{}
+
+ var (
+ readCount = 0
+ writeCount = 0
+ )
+ readFn := func(fn func(subscriptionsSetM) error) error {
+ readCount++
+ return fn(set)
+ }
+ writeFn := func(fn func(subscriptionsSetM) error) error {
+ writeCount++
+ return fn(set)
+ }
+
+ var iterCount int
+ onPing := func(fn func(struct{})) {
+ for i := 0; i < iterCount; i++ {
+ fn(struct{}{})
+ }
+ }
+
+ iterCount = reaperSkipCount - 1
+ runReaper(onPing, readFn, writeFn)
+ g.TAssertEqual(readCount, 0)
+ g.TAssertEqual(writeCount, 0)
+
+ iterCount = reaperSkipCount
+ runReaper(onPing, readFn, writeFn)
+ g.TAssertEqual(readCount, 1)
+ g.TAssertEqual(writeCount, 0)
+ })
+}
+
+func test_NewWithPrefix() {
+ g.TestStart("NewWithPrefix()")
+
+ g.Testing("we get an error with a bad prefix", func() {
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ _, err = NewWithPrefix(db, "a bad prefix")
+ g.TAssertEqual(err, g.ErrBadSQLTablePrefix)
+ })
+
+ g.Testing("otherwise we have a queueT and no errors", func() {
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := NewWithPrefix(db, "good")
+ g.TErrorIf(err)
+ queue.Close()
+
+ g.TAssertEqual(queue.(queueT).pinger.closed(), true)
+ })
+}
+
+func test_New() {
+ g.TestStart("New()")
+
+ g.Testing("smoke test that we get a queueT", func() {
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ queue.Close()
+
+ g.TAssertEqual(queue.(queueT).pinger.closed(), true)
+ })
+}
+
+func test_asPublicMessage() {
+ g.TestStart("asPublicMessage()")
+
+ g.Testing("it picks the correct fields 🤷", func() {
+ input := messageT{
+ uuid: guuid.New(),
+ timestamp: time.Now(),
+ topic: "topic",
+ flowID: guuid.New(),
+ payload: []byte("payload"),
+ }
+
+ expected := Message{
+ ID: input.uuid,
+ Timestamp: input.timestamp,
+ Topic: input.topic,
+ FlowID: input.flowID,
+ Payload: input.payload,
+ }
+
+ given := asPublicMessage(input)
+ g.TAssertEqual(given, expected)
+ })
+}
+
+func test_queueT_Publish() {
+ g.TestStart("queueT.Publish()")
+
+ const (
+ topic = "queueT.Publish() topic"
+ payloadStr = "queueT.Publish() payload"
+ )
+ var (
+ flowID = guuid.New()
+ payload = []byte(payloadStr)
+ unsent = UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ )
+
+ db, err := sql.Open("acude", ":memory:")
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ defer queue.Close()
+
+
+ g.Testing("it just publishes and returns", func() {
+ message, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+
+ g.TAssertEqual(message.Timestamp == time.Time{}, false)
+ g.TAssertEqual(message.Topic, topic)
+ g.TAssertEqual(message.FlowID, flowID)
+ g.TAssertEqual(message.Payload, payload)
+ })
+
+ queue.Close()
+ g.TAssertEqual(queue.(queueT).pinger.closed(), true)
+}
+
+func test_registerConsumerFn() {
+ g.TestStart("registerConsumerFn()")
+
+ g.Testing("adds a new topicSubscriptionT{} if needed", func() {
+ consumer := consumerT{
+ data: consumerDataT{
+ topic: "topic",
+ name: "consumer",
+ },
+ }
+
+ set := subscriptionsSetM{}
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumer,
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ registerConsumerFn(consumer)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("otherwise it just uses what exists", func() {
+ flowID := guuid.New()
+
+ consumer := consumerT{
+ data: consumerDataT{
+ topic: "topic",
+ name: "consumer",
+ },
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "other-consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{},
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumer,
+ "other-consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{},
+ },
+ },
+ }
+
+ registerConsumerFn(consumer)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("overwrites existing consumer if desired", func() {
+ close1 := func() {}
+ consumer1 := consumerT{
+ data: consumerDataT{
+ topic: "topic",
+ name: "consumer",
+ },
+ close: &close1,
+ }
+
+ close2 := func() {}
+ consumer2 := consumerT{
+ data: consumerDataT{
+ topic: "topic",
+ name: "consumer",
+ },
+ close: &close2,
+ }
+
+ set := subscriptionsSetM{}
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumer1,
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected2 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumer2,
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ registerConsumerFn(consumer1)(set)
+ g.TAssertEqual(set, expected1)
+ g.TAssertEqual(reflect.DeepEqual(set, expected2), false)
+
+ registerConsumerFn(consumer2)(set)
+ g.TAssertEqual(set, expected2)
+ g.TAssertEqual(reflect.DeepEqual(set, expected1), false)
+ })
+}
+
+func test_registerWaiterFn() {
+ g.TestStart("registerWaiterFn()")
+
+ g.Testing("adds a new topicSubscriptionT{} if needed", func() {
+ flowID := guuid.New()
+
+ waiter := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ }
+
+ set := subscriptionsSetM{}
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter,
+ },
+ },
+ },
+ }
+
+ registerWaiterFn(waiter)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("adds a new waiters map if needed", func() {
+ flowID := guuid.New()
+
+ waiter := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter,
+ },
+ },
+ },
+ }
+
+ registerWaiterFn(waiter)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("otherwise it just uses what exists", func() {
+ flowID := guuid.New()
+
+ waiter := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "other-waiter": waiterT{},
+ },
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter,
+ "other-waiter": waiterT{},
+ },
+ },
+ },
+ }
+
+ registerWaiterFn(waiter)(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("overwrites existing waiter if desired", func() {
+ flowID := guuid.New()
+
+ close1 := func() {}
+ waiter1 := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ close: &close1,
+ }
+
+ close2 := func() {}
+ waiter2 := waiterT{
+ data: waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter",
+ },
+ close: &close2,
+ }
+
+ set := subscriptionsSetM{}
+
+ expected1 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter1,
+ },
+ },
+ },
+ }
+
+ expected2 := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{
+ "waiter": waiter2,
+ },
+ },
+ },
+ }
+
+ registerWaiterFn(waiter1)(set)
+ g.TAssertEqual(set, expected1)
+ g.TAssertEqual(reflect.DeepEqual(set, expected2), false)
+
+ registerWaiterFn(waiter2)(set)
+ g.TAssertEqual(set, expected2)
+ g.TAssertEqual(reflect.DeepEqual(set, expected1), false)
+ })
+}
+
+func test_makeConsumeOneFn() {
+ g.TestStart("makeConsumeOneFn()")
+
+ savedLogger := slog.Default()
+
+ s := new(strings.Builder)
+ g.SetLoggerOutput(s)
+
+ var (
+ successCount int
+ errorCount int
+ callbackErr error
+ successFnErr error
+ errorFnErr error
+ messages []Message
+ successNames []string
+ successIDs []guuid.UUID
+ errorNames []string
+ errorIDs []guuid.UUID
+ deadletterIDs []guuid.UUID
+ )
+
+ data := consumerDataT{
+ topic: "topic",
+ name: "name",
+ }
+
+ callback := func(message Message) error {
+ messages = append(messages, message)
+ return callbackErr
+ }
+
+ successFn := func(name string, messageID guuid.UUID) error {
+ successCount++
+ successNames = append(successNames, name)
+ successIDs = append(successIDs, messageID)
+ return successFnErr
+ }
+
+ errorFn := func(
+ name string,
+ messageID guuid.UUID,
+ deadletterID guuid.UUID,
+ ) error {
+ errorCount++
+ errorNames = append(errorNames, name)
+ errorIDs = append(errorIDs, messageID)
+ deadletterIDs = append(deadletterIDs, deadletterID)
+
+ return errorFnErr
+ }
+
+ consumeOneFn := makeConsumeOneFn(
+ data,
+ callback,
+ successFn,
+ errorFn,
+ )
+
+ message1 := messageT{ uuid: guuid.New() }
+ message2 := messageT{ uuid: guuid.New() }
+ message3 := messageT{ uuid: guuid.New() }
+ message4 := messageT{ uuid: guuid.New() }
+
+
+ g.Testing("error from successFn() is propagated", func() {
+ err := consumeOneFn(message1)
+ g.TErrorIf(err)
+ g.TAssertEqual(successCount, 1)
+ g.TAssertEqual(errorCount, 0)
+
+ successFnErr = errors.New("successFn() error")
+ err = consumeOneFn(message2)
+ g.TAssertEqual(err, successFnErr)
+ g.TAssertEqual(successCount, 2)
+ g.TAssertEqual(errorCount, 0)
+
+ g.TAssertEqual(s.String(), "")
+ })
+
+ g.Testing("error from callback() is logged and dropped", func() {
+ *s = strings.Builder{}
+
+ callbackErr = errors.New("callback() error")
+ err := consumeOneFn(message3)
+ g.TErrorIf(err)
+ g.TAssertEqual(successCount, 2)
+ g.TAssertEqual(errorCount, 1)
+ g.TAssertEqual(s.String() == "", false)
+ })
+
+ g.Testing("error from errorFn() is propagated", func() {
+ *s = strings.Builder{}
+
+ errorFnErr = errors.New("errorFn() error")
+ err := consumeOneFn(message4)
+ g.TAssertEqual(err, errorFnErr)
+ g.TAssertEqual(successCount, 2)
+ g.TAssertEqual(errorCount, 2)
+ g.TAssertEqual(s.String() == "", false)
+ })
+
+ g.Testing("calls happened with the expected arguments", func() {
+ expectedMessages := []Message{
+ asPublicMessage(message1),
+ asPublicMessage(message2),
+ asPublicMessage(message3),
+ asPublicMessage(message4),
+ }
+
+ g.TAssertEqual(messages, expectedMessages)
+ g.TAssertEqual(successNames, []string{ "name", "name" })
+ g.TAssertEqual(errorNames, []string{ "name", "name" })
+ g.TAssertEqual(successIDs, []guuid.UUID{
+ message1.uuid,
+ message2.uuid,
+ })
+ g.TAssertEqual(errorIDs, []guuid.UUID{
+ message3.uuid,
+ message4.uuid,
+ })
+ g.TAssertEqual(deadletterIDs[0] == message3.uuid, false)
+ g.TAssertEqual(deadletterIDs[1] == message4.uuid, false)
+ })
+
+ slog.SetDefault(savedLogger)
+}
+
+func test_makeConsumeAllFn() {
+ g.TestStart("makeConsumeAllFn()")
+
+ savedLogger := slog.Default()
+
+ s := new(strings.Builder)
+ g.SetLoggerOutput(s)
+
+ data := consumerDataT{}
+
+ consumeOneFn := func(messageT) error {
+ return nil
+ }
+
+ var eachFnErr error
+ eachFn := func(string, string, func(messageT) error) error {
+ return eachFnErr
+ }
+
+ consumeAllFn := makeConsumeAllFn(data, consumeOneFn, eachFn)
+
+
+ g.Testing("silent when eachFn() returns no error", func() {
+ consumeAllFn(struct{}{})
+ g.TAssertEqual(s.String(), "")
+ })
+
+ g.Testing("logs warning otherwise", func() {
+ eachFnErr = errors.New("eachFn() error")
+ consumeAllFn(struct{}{})
+ g.TAssertEqual(s.String() == "", false)
+ })
+
+ slog.SetDefault(savedLogger)
+}
+
+func test_makeWaitFn() {
+ g.TestStart("makeWaitFn()")
+
+ g.Testing("all it does is send the data and close things", func() {
+ callCount := 0
+ closeFn := func() {
+ callCount++
+ }
+
+ c := make(chan []byte, 1)
+ waitFn := makeWaitFn(c, closeFn)
+
+ payload := []byte("payload")
+ waitFn(payload)
+ given := <- c
+
+ g.TAssertEqual(given, payload)
+ g.TAssertEqual(callCount, 1)
+ })
+
+ g.Testing("you can call it twice for cases of race condition", func() {
+ callCount := 0
+ closeFn := func() {
+ callCount++
+ }
+
+ c := make(chan []byte, 1)
+ waitFn := makeWaitFn(c, closeFn)
+
+ payload := []byte("something")
+ waitFn(payload)
+ waitFn(payload)
+ given1 := <- c
+ given2 := <- c
+
+ g.TAssertEqual(given1, payload)
+ g.TAssertEqual(given2, []byte(nil))
+ })
+}
+
+func test_runConsumer() {
+ g.TestStart("runConsumer()")
+
+ g.Testing("calls consumeAllFn() at least one", func() {
+ onPing := func(fn func(struct{})) {}
+
+ count := 0
+ consumeAllFn := func(struct{}) {
+ count++
+ }
+
+ runConsumer(onPing, consumeAllFn)
+ g.TAssertEqual(count, 1)
+ })
+
+ g.Testing("can call consumeAllFn() (onPing + 1) times", func() {
+ onPing := func(fn func(struct{})) {
+ fn(struct{}{})
+ fn(struct{}{})
+ fn(struct{}{})
+ }
+
+ count := 0
+ consumeAllFn := func(struct{}) {
+ count++
+ }
+
+ runConsumer(onPing, consumeAllFn)
+ g.TAssertEqual(count, 4)
+ })
+}
+
+func test_tryFinding() {
+ g.TestStart("tryFinding()")
+
+ g.Testing("noop in case of failure", func() {
+ myErr := errors.New("find() error")
+ findFn := func(string, guuid.UUID) (messageT, error) {
+ return messageT{}, myErr
+ }
+
+ count := 0
+ waitFn := func([]byte) {
+ count++
+ }
+
+
+ tryFinding(findFn, "topic", guuid.New(), waitFn)
+ g.TAssertEqual(count, 0)
+ })
+
+ g.Testing("calls waitFn in case of success", func() {
+ payload := []byte("find() payload")
+
+ findFn := func(string, guuid.UUID) (messageT, error) {
+ return messageT{ payload: payload }, nil
+ }
+
+ payloads := [][]byte{}
+ waitFn := func(payload []byte) {
+ payloads = append(payloads, payload)
+ }
+
+
+ tryFinding(findFn, "topic", guuid.New(), waitFn)
+ g.TAssertEqual(payloads, [][]byte{ payload })
+ })
+}
+
+func test_queueT_Subscribe() {
+ g.TestStart("queueT.Subscribe()")
+
+ set := subscriptionsSetM{}
+ consumed := []guuid.UUID{}
+ messages := []messageT{
+ messageT{ uuid: guuid.New() },
+ messageT{ uuid: guuid.New() },
+ }
+
+ var takeErr error
+ queue := queueT{
+ queries: queriesT{
+ take: func(string, string) error {
+ return takeErr
+ },
+ pending: func(
+ topic string,
+ consumer string,
+ fn func(messageT) error,
+ ) error {
+ for _, message := range messages {
+ consumed = append(
+ consumed,
+ message.uuid,
+ )
+ _ = fn(message)
+ }
+ return nil
+ },
+ commit: func(
+ consumer string,
+ messageID guuid.UUID,
+ ) error {
+ return nil
+ },
+ toDead: func(string, guuid.UUID, guuid.UUID) error {
+ g.Unreachable()
+ return nil
+ },
+ },
+ subscriptions: subscriptionsT{
+ write: func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ },
+ },
+ }
+
+
+ g.Testing("registers our callback in the set and calls it", func() {
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ queue.Subscribe("topic", "consumer-1", func(Message) error {
+ wg.Done()
+ return nil
+ })
+ defer queue.Unsubscribe("topic", "consumer-1")
+
+ wg.Wait()
+ g.TAssertEqual(consumed, []guuid.UUID{
+ messages[0].uuid,
+ messages[1].uuid,
+ })
+ })
+
+ g.Testing("our callback also gets called when pinged", func() {
+ consumed = []guuid.UUID{}
+
+ var wg sync.WaitGroup
+ wg.Add(4)
+
+ queue.Subscribe("topic", "consumer-2", func(m Message) error {
+ wg.Done()
+ return nil
+ })
+ defer queue.Unsubscribe("topic", "consumer-2")
+
+ consumer := set["topic"].consumers["consumer-2"]
+ consumer.pinger.tryPing(struct{}{})
+
+ wg.Wait()
+
+ g.TAssertEqual(consumed, []guuid.UUID{
+ messages[0].uuid,
+ messages[1].uuid,
+ messages[0].uuid,
+ messages[1].uuid,
+ })
+ })
+
+ g.Testing("we try to own the topic", func() {
+ takeErr = errors.New("queueT.Subscribe() 1")
+
+ err := queue.Subscribe("topic", "name", func(Message) error {
+ g.Unreachable()
+ return nil
+ })
+
+ g.TAssertEqual(err, takeErr)
+ takeErr = nil
+ })
+
+ g.Testing("if we can't own the topic, we don't get registered", func() {
+ takeErr = errors.New("queueT.Subscribe() 2")
+
+ err := queue.Subscribe("topic", "name", func(Message) error {
+ g.Unreachable()
+ return nil
+ })
+ g.TErrorNil(err)
+
+ expected := subscriptionsSetM{}
+ g.TAssertEqual(set, expected)
+
+ takeErr = nil
+ })
+}
+
+func test_queueT_WaitFor() {
+ g.TestStart("queueT.WaitFor()")
+
+ findErr := errors.New("find() error")
+ message := messageT{
+ payload: []byte("queueT.WaitFor() payload"),
+ }
+
+ set := subscriptionsSetM{}
+
+ queue := queueT{
+ queries: queriesT{
+ find: func(
+ topic string,
+ flowID guuid.UUID,
+ ) (messageT, error) {
+ return message, findErr
+ },
+ },
+ subscriptions: subscriptionsT{
+ write: func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ },
+ read: func(fn func(subscriptionsSetM) error) error {
+ return fn(set)
+ },
+ },
+ }
+
+
+ g.Testing("registers the waiter in the set", func() {
+ flowID := guuid.New()
+
+ defer queue.WaitFor("topic", flowID, "waiter-1").Close()
+
+ expected := waiterDataT{
+ topic: "topic",
+ flowID: flowID,
+ name: "waiter-1",
+ }
+
+ g.TAssertEqual(
+ set["topic"].waiters[flowID]["waiter-1"].data,
+ expected,
+ )
+ })
+
+ g.Testing("the channel gets a message when waiter is pinged", func() {
+ flowID := guuid.New()
+ payload := []byte("sent payload")
+
+ w := queue.WaitFor("topic", flowID, "waiter-2")
+ defer w.Close()
+
+ waiter := set["topic"].waiters[flowID]["waiter-2"]
+ waiter.pinger.tryPing(payload)
+
+ given := <- w.Channel
+
+ g.TAssertEqual(given, payload)
+ g.TAssertEqual((*waiter.closed)(), true)
+ })
+
+ g.Testing("we can also WaitFor() after publishing the message", func() {
+ findErr = nil
+ flowID := guuid.New()
+
+ w := queue.WaitFor("topic", flowID, "waiter-3")
+ defer w.Close()
+
+ given := <- w.Channel
+
+ waiter := set["topic"].waiters[flowID]["waiter-3"]
+ waiter.pinger.tryPing([]byte("ignored"))
+
+ g.TAssertEqual(given, message.payload)
+ g.TAssertEqual((*waiter.closed)(), true)
+ })
+
+ g.Testing("if the data already exists we get it immediatelly", func() {
+ flowID := guuid.New()
+
+ w := queue.WaitFor("topic", flowID, "waiter-4")
+ defer w.Close()
+
+ waiter := set["topic"].waiters[flowID]["waiter-4"]
+ g.TAssertEqual((*waiter.closed)(), true)
+
+ given := <- w.Channel
+ g.TAssertEqual(given, message.payload)
+ })
+}
+
+func test_unsubscribeIfExistsFn() {
+ g.TestStart("unsubscribeIfExistsFn()")
+
+ g.Testing("noop on empty set", func() {
+ set := subscriptionsSetM{}
+ expected := subscriptionsSetM{}
+ unsubscribeIfExistsFn("topic", "consumer")(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("noop on missing topic", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{},
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{},
+ }
+
+ unsubscribeIfExistsFn("other-topic", "consumer")(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("noop on missing consumer", func() {
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{},
+ },
+ },
+ }
+
+ unsubscribeIfExistsFn("topic", "other-consumer")(set)
+ g.TAssertEqual(set, expected)
+ })
+
+ g.Testing("closes consumer and removes it from set", func() {
+ flowID := guuid.New()
+
+ count := 0
+ close := func() {
+ count++
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{
+ close: &close,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{},
+ },
+ },
+ }
+
+ expected := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID: map[string]waiterT{},
+ },
+ },
+ }
+
+ unsubscribeIfExistsFn("topic", "consumer")(set)
+ g.TAssertEqual(set, expected)
+ g.TAssertEqual(count, 1)
+ })
+
+ g.Testing("empty topics are also removed", func() {
+ count := 0
+ close := func() {
+ count++
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{
+ close: &close,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{}
+
+ unsubscribeIfExistsFn("topic", "consumer")(set)
+ g.TAssertEqual(set, expected)
+ g.TAssertEqual(count, 1)
+ })
+}
+
+func test_queueT_Unsubscribe() {
+ g.TestStart("queueT.Unsubscribe()")
+
+ g.Testing("calls unsubscribesIfExists() via writeFn()", func() {
+ closed := false
+ close := func() {
+ closed = true
+ }
+
+ set := subscriptionsSetM{
+ "topic": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer": consumerT{
+ close: &close,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ expected := subscriptionsSetM{}
+
+ queue := queueT{
+ subscriptions: subscriptionsT{
+ write: func(
+ fn func(subscriptionsSetM) error,
+ ) error {
+ return fn(set)
+ },
+ },
+ }
+
+ queue.Unsubscribe("topic", "consumer")
+ g.TAssertEqual(set, expected)
+ g.TAssertEqual(closed, true)
+ })
+}
+
+func test_cleanSubscriptions() {
+ g.TestStart("cleanSubscriptions()")
+
+ g.Testing("all consumers and waiters get close()'d", func() {
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+ flowID3 := guuid.New()
+
+ type pairT struct{
+ closed func() bool
+ fn func()
+ }
+
+ mkclose := func() pairT {
+ closed := false
+ return pairT{
+ closed: func() bool {
+ return closed
+ },
+ fn: func() {
+ closed = true
+ },
+ }
+ }
+
+ c := mkclose()
+ g.TAssertEqual(c.closed(), false)
+ c.fn()
+ var x bool = c.closed()
+ g.TAssertEqual(x, true)
+
+ close1 := mkclose()
+ close2 := mkclose()
+ close3 := mkclose()
+ close4 := mkclose()
+ close5 := mkclose()
+ close6 := mkclose()
+ close7 := mkclose()
+ close8 := mkclose()
+
+ set := subscriptionsSetM{
+ "topic-1": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer-1": consumerT{
+ close: &close1.fn,
+ },
+ "consumer-2": consumerT{
+ close: &close2.fn,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID1: map[string]waiterT{
+ "waiter-1": waiterT{
+ close: &close3.fn,
+ },
+ },
+ flowID2: map[string]waiterT{
+ "waiter-2": waiterT{
+ close: &close4.fn,
+ },
+ "waiter-3": waiterT{
+ close: &close5.fn,
+ },
+ },
+ },
+ },
+ "topic-2": topicSubscriptionT{
+ consumers: map[string]consumerT{
+ "consumer-3": consumerT{
+ close: &close6.fn,
+ },
+ },
+ waiters: map[guuid.UUID]map[string]waiterT{
+ flowID3: map[string]waiterT{
+ "waiter-4": waiterT{
+ close: &close7.fn,
+ },
+ },
+ },
+ },
+ "topic-3": topicSubscriptionT{
+ consumers: map[string]consumerT{},
+ waiters: map[guuid.UUID]map[string]waiterT{},
+ },
+ }
+
+ cleanSubscriptions(set)
+
+ given := []bool{
+ close1.closed(),
+ close2.closed(),
+ close3.closed(),
+ close4.closed(),
+ close5.closed(),
+ close6.closed(),
+ close7.closed(),
+ close8.closed(),
+ }
+
+ expected := []bool{
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ false,
+ }
+
+ g.TAssertEqualI(given, expected)
+ })
+}
+
+func test_queueT_Close() {
+ g.TestStart("queueT.Close()")
+
+ g.Testing("clean pinger, subscriptions and queries", func() {
+ var (
+ pingerCount = 0
+ subscriptionsCount = 0
+ queriesCount = 0
+
+ subscriptionsErr = errors.New("subscriptionsT{} error")
+ queriesErr = errors.New("queriesT{} error")
+ )
+ queue := queueT{
+ queries: queriesT{
+ close: func() error{
+ queriesCount++
+ return queriesErr
+ },
+ },
+ subscriptions: subscriptionsT{
+ write: func(
+ func(subscriptionsSetM) error,
+ ) error {
+ subscriptionsCount++
+ return subscriptionsErr
+ },
+ },
+ pinger: pingerT[struct{}]{
+ close: func() {
+ pingerCount++
+ },
+ },
+ }
+
+ err := queue.Close()
+ g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr))
+ g.TAssertEqual(pingerCount, 1)
+ g.TAssertEqual(subscriptionsCount, 1)
+ g.TAssertEqual(queriesCount, 1)
+ })
+}
+
+
+func test_topicGetopt() {
+ g.TestStart("topicGetopt()")
+
+ g.Testing("checks for required positional argument", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{},
+ }
+
+ argsOut, ok := topicGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "Missing TOPIC.\n")
+ g.TAssertEqual(ok, false)
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("success otherwise", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"a topic"},
+ }
+
+ argsOut, ok := topicGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(ok, true)
+ argsIn.topic = "a topic"
+ g.TAssertEqual(argsOut, argsIn)
+ })
+}
+
+func test_topicConsumerGetopt() {
+ g.TestStart("topicConsumerGetopt()")
+
+ g.Testing("checks for TOPIC argument", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{},
+ }
+
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "Missing TOPIC.\n")
+ g.TAssertEqual(ok, false)
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("we get an error on unsupported flag", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"-Z"},
+ }
+
+ const message = "flag provided but not defined: -Z\n"
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), message)
+ g.TAssertEqual(ok, false)
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("we also get an error on incorrect usage of flags", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"-C"},
+ }
+
+ const message = "flag needs an argument: -C\n"
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), message)
+ g.TAssertEqual(ok, false)
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("we can customize the CONSUMER", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"-C", "custom consumer", "this topic"},
+ }
+
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(ok, true)
+ argsIn.topic = "this topic"
+ argsIn.consumer = "custom consumer"
+ g.TAssertEqual(argsOut, argsIn)
+ })
+
+ g.Testing("otherwise we get the default one", func() {
+ var w strings.Builder
+ argsIn := argsT{
+ args: []string{"T"},
+ }
+
+ argsOut, ok := topicConsumerGetopt(argsIn, &w)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(ok, true)
+ argsIn.topic = "T"
+ argsIn.consumer = "default-consumer"
+ g.TAssertEqual(argsOut, argsIn)
+ })
+}
+
+func test_inExec() {
+ g.TestStart("inExec()")
+
+ const (
+ topic = "inExec topic"
+ payloadStr = "inExec payload"
+ )
+ var (
+ payload = []byte(payloadStr)
+ args = argsT{
+ topic: topic,
+ }
+ )
+
+ var (
+ publishErr error
+ messages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ publish: func(
+ unsent UnsentMessage,
+ messageID guuid.UUID,
+ ) (messageT, error) {
+ if publishErr != nil {
+ return messageT{}, publishErr
+ }
+
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: messageID,
+ topic: unsent.Topic,
+ flowID: unsent.FlowID,
+ payload: unsent.Payload,
+ }
+ messages = append(messages, message)
+ return message, nil
+ },
+ }
+
+
+ g.Testing("messageID to output when successful", func() {
+ r := bytes.NewReader(payload)
+ var w strings.Builder
+ rc, err := inExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(messages[0].topic, topic)
+ g.TAssertEqual(messages[0].payload, payload)
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(w.String(), messages[0].uuid.String() + "\n")
+ })
+
+ g.Testing("if reading fails, we return the error", func() {
+ var r *os.File
+ var w strings.Builder
+ rc, err := inExec(args, queries, r, &w)
+ g.TAssertEqual(
+ err.Error(),
+ "invalid argument",
+ )
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+
+ g.Testing("if publishing fails, we return the error", func() {
+ publishErr = errors.New("publish() error")
+ r := strings.NewReader("read but not published")
+ var w strings.Builder
+ rc, err := inExec(args, queries, r, &w)
+ g.TAssertEqual(err, publishErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_outExec() {
+ g.TestStart("outExec()")
+
+ const (
+ topic = "outExec topic"
+ consumer = "outExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ takeErr error
+ nextErr error
+ messages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ take: func(string, string) error {
+ return takeErr
+ },
+ next: func(string, string) (messageT, error) {
+ if nextErr != nil {
+ return messageT{}, nextErr
+ }
+
+ if len(messages) == 0 {
+ return messageT{}, sql.ErrNoRows
+ }
+
+ return messages[0], nil
+ },
+ }
+ pub := func(payload []byte) {
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+
+
+ g.Testing("exit code 1 when we can't take", func() {
+ takeErr = errors.New("outExec() take error")
+
+ var w strings.Builder
+
+ rc, err := outExec(args, queries, r, &w)
+ g.TAssertEqual(err, takeErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+
+ takeErr = nil
+ })
+
+ g.Testing("exit code 3 when no message is available", func() {
+ var w strings.Builder
+
+ rc, err := outExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 3)
+ })
+
+ g.Testing("we get the same message until we commit", func() {
+ var (
+ w1 strings.Builder
+ w2 strings.Builder
+ w3 strings.Builder
+ )
+ args := argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+
+ pub([]byte("first payload"))
+ pub([]byte("second payload"))
+
+ rc1, err1 := outExec(args, queries, r, &w1)
+ rc2, err2 := outExec(args, queries, r, &w2)
+ messages = messages[1:]
+ rc3, err3 := outExec(args, queries, r, &w3)
+
+ g.TErrorIf(g.SomeError(err1, err2, err3))
+ g.TAssertEqual(w1.String(), "first payload\n")
+ g.TAssertEqual(w2.String(), "first payload\n")
+ g.TAssertEqual(w3.String(), "second payload\n")
+ g.TAssertEqual(rc1, 0)
+ g.TAssertEqual(rc2, 0)
+ g.TAssertEqual(rc3, 0)
+ })
+
+ g.Testing("we propagate the error when the query fails", func() {
+ nextErr = errors.New("next() error")
+ var w strings.Builder
+ rc, err := outExec(args, queries, r, &w)
+ g.TAssertEqual(err, nextErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_commitExec() {
+ g.TestStart("commitExec()")
+
+ const (
+ topic = "commitExec topic"
+ consumer = "commitExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ takeErr error
+ nextErr error
+ commitErr error
+ messages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ take: func(string, string) error {
+ return takeErr
+ },
+ next: func(string, string) (messageT, error) {
+ if nextErr != nil {
+ return messageT{}, nextErr
+ }
+
+ if len(messages) == 0 {
+ return messageT{}, sql.ErrNoRows
+ }
+
+ return messages[0], nil
+ },
+ commit: func(string, guuid.UUID) error {
+ if commitErr != nil {
+ return commitErr
+ }
+
+ messages = messages[1:]
+ return nil
+ },
+ }
+ pub := func(payload []byte) {
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+
+
+ g.Testing("error when we can't take", func() {
+ takeErr = errors.New("commitExec() take error")
+
+ var w strings.Builder
+
+ rc, err := commitExec(args, queries, r, &w)
+ g.TAssertEqual(err, takeErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+
+ takeErr = nil
+ })
+
+ g.Testing("error when there is nothing to commit", func() {
+ var w strings.Builder
+
+ rc, err := commitExec(args, queries, r, &w)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+
+ g.Testing("messages get committed in order", func() {
+ var w strings.Builder
+
+ pub([]byte("first payload"))
+ pub([]byte("second payload"))
+ pub([]byte("third payload"))
+
+ message1 := messages[0]
+ g.TAssertEqual(message1.payload, []byte("first payload"))
+
+ rc, err := commitExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ message2 := messages[0]
+ g.TAssertEqual(message2.payload, []byte("second payload"))
+
+ rc, err = commitExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ message3 := messages[0]
+ g.TAssertEqual(message3.payload, []byte("third payload"))
+
+ rc, err = commitExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ g.TAssertEqual(len(messages), 0)
+ })
+
+ g.Testing("when next() query fails, we propagate its result", func() {
+ nextErr = errors.New("next() error")
+ var w strings.Builder
+ rc, err := commitExec(args, queries, r, &w)
+ g.TAssertEqual(err, nextErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ nextErr = nil
+ })
+
+ g.Testing("we also propagate the error on commit() failure", func() {
+ commitErr = errors.New("commit() error")
+ pub([]byte{})
+ var w strings.Builder
+ rc, err := commitExec(args, queries, r, &w)
+ g.TAssertEqual(err, commitErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_deadExec() {
+ g.TestStart("deadExec()")
+
+ const (
+ topic = "deadExec topic"
+ consumer = "deadExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ takeErr error
+ nextErr error
+ toDeadErr error
+ messages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ take: func(string, string) error {
+ return takeErr
+ },
+ next: func(string, string) (messageT, error) {
+ if nextErr != nil {
+ return messageT{}, nextErr
+ }
+
+ if len(messages) == 0 {
+ return messageT{}, sql.ErrNoRows
+ }
+
+ return messages[0], nil
+ },
+ toDead: func(
+ _ string,
+ _ guuid.UUID,
+ deadletterID guuid.UUID,
+ ) error {
+ if toDeadErr != nil {
+ return toDeadErr
+ }
+
+ messages = messages[1:]
+ return nil
+ },
+ }
+ pub := func(payload []byte) {
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+
+
+ g.Testing("error when we can't take", func() {
+ takeErr = errors.New("deadExec() take error")
+
+ var w strings.Builder
+
+ rc, err := deadExec(args, queries, r, &w)
+ g.TAssertEqual(err, takeErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+
+ takeErr = nil
+ })
+
+ g.Testing("error when there is nothing to mark as dead", func() {
+ var w strings.Builder
+
+ rc, err := deadExec(args, queries, r, &w)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+
+ g.Testing("the latest message becomes a deadletter", func() {
+ var w strings.Builder
+
+ pub([]byte("first payload"))
+ pub([]byte("second payload"))
+
+ message1 := messages[0]
+ g.TAssertEqual(message1.payload, []byte("first payload"))
+
+ rc, err := deadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ message2 := messages[0]
+ g.TAssertEqual(message2.payload, []byte("second payload"))
+
+ rc, err = deadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ g.TAssertEqual(len(messages), 0)
+ })
+
+ g.Testing("next() error is propagated", func() {
+ nextErr = errors.New("next() error")
+ var w strings.Builder
+ rc, err := deadExec(args, queries, r, &w)
+ g.TAssertEqual(err, nextErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ nextErr = nil
+ })
+
+ g.Testing("toDead() error is propagated", func() {
+ toDeadErr = errors.New("toDead() error")
+ pub([]byte{})
+ var w strings.Builder
+ rc, err := deadExec(args, queries, r, &w)
+ g.TAssertEqual(err, toDeadErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_listDeadExec() {
+ g.TestStart("listDeadExec()")
+
+ const (
+ topic = "listDeadExec topic"
+ consumer = "listDeadExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ messages []messageT
+ deadletters []deadletterT
+ id int64 = 0
+ errorIndex = -1
+ allDeadErr = errors.New("allDead() error")
+ )
+ queries := queriesT{
+ allDead: func(
+ _ string,
+ _ string,
+ callback func(deadletterT, messageT) error,
+ ) error {
+ for i, deadletter := range deadletters {
+ if i == errorIndex {
+ return allDeadErr
+ }
+
+ callback(deadletter, messageT{})
+ }
+
+ return nil
+ },
+ }
+ pub := func() {
+ payload := []byte("ignored payload for this test")
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+ commit := func() {
+ messages = messages[1:]
+ }
+ dead := func() {
+ message := messages[0]
+ now := time.Now()
+ deadletter := deadletterT{
+ uuid: guuid.New(),
+ timestamp: now,
+ consumer: consumer,
+ messageID: message.uuid,
+ }
+
+ messages = messages[1:]
+ deadletters = append(deadletters, deadletter)
+ }
+ replay := func() {
+ pub()
+ deadletters = deadletters[1:]
+ }
+
+
+ g.Testing("nothing is shown if topic is empty", func() {
+ var w strings.Builder
+
+ rc, err := listDeadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ })
+
+ g.Testing("deadletters are printed in order", func() {
+ var w strings.Builder
+
+ pub()
+ pub()
+ pub()
+
+ rc, err := listDeadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+
+ dead()
+ commit()
+ dead()
+
+ expected := fmt.Sprintf(
+ "%s\t%s\t%s\n%s\t%s\t%s\n",
+ deadletters[0].uuid.String(),
+ deadletters[0].timestamp.Format(time.RFC3339),
+ deadletters[0].consumer,
+ deadletters[1].uuid.String(),
+ deadletters[1].timestamp.Format(time.RFC3339),
+ deadletters[1].consumer,
+ )
+
+ rc, err = listDeadExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), expected)
+ g.TAssertEqual(rc, 0)
+ })
+
+ g.Testing("deadletters disappear after being replayed", func() {
+ var (
+ w1 strings.Builder
+ w2 strings.Builder
+ )
+
+ replay()
+
+ deadletter := deadletters[0]
+ expected := fmt.Sprintf(
+ "%s\t%s\t%s\n",
+ deadletter.uuid.String(),
+ deadletter.timestamp.Format(time.RFC3339),
+ deadletter.consumer,
+ )
+
+ rc, err := listDeadExec(args, queries, r, &w1)
+ g.TErrorIf(err)
+ g.TAssertEqual(w1.String(), expected)
+ g.TAssertEqual(rc, 0)
+
+ replay()
+
+ rc, err = listDeadExec(args, queries, r, &w2)
+ g.TErrorIf(err)
+ g.TAssertEqual(w2.String(), "")
+ g.TAssertEqual(rc, 0)
+ })
+
+ g.Testing("a database failure interrupts the output", func() {
+ var w strings.Builder
+
+ pub()
+ pub()
+ pub()
+ dead()
+ dead()
+ dead()
+
+ deadletter := deadletters[0]
+ expected := fmt.Sprintf(
+ "%s\t%s\t%s\n",
+ deadletter.uuid.String(),
+ deadletter.timestamp.Format(time.RFC3339),
+ deadletter.consumer,
+ )
+
+ errorIndex = 1
+ rc, err := listDeadExec(args, queries, r, &w)
+ g.TAssertEqual(err, allDeadErr)
+ g.TAssertEqual(w.String(), expected)
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_replayExec() {
+ g.TestStart("replayExec()")
+
+ const (
+ topic = "replayExec topic"
+ consumer = "replayExec consumer"
+ )
+ var (
+ w strings.Builder
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var (
+ oneDeadErr error
+ replayErr error
+ messages []messageT
+ deadletters []deadletterT
+ deadMessages []messageT
+ id int64 = 0
+ )
+ queries := queriesT{
+ oneDead: func(string, string) (deadletterT, error) {
+ if oneDeadErr != nil {
+ return deadletterT{}, oneDeadErr
+ }
+
+ if len(deadletters) == 0 {
+ return deadletterT{}, sql.ErrNoRows
+ }
+
+ return deadletters[0], nil
+ },
+ replay: func(guuid.UUID, guuid.UUID) (messageT, error) {
+ if replayErr != nil {
+ return messageT{}, replayErr
+ }
+
+ message := deadMessages[0]
+ messages = append(messages, message)
+ deadletters = deadletters[1:]
+ deadMessages = deadMessages[1:]
+ return message, nil
+ },
+ }
+ pub := func(payload []byte) {
+ id++
+ now := time.Now()
+ message := messageT{
+ id: id,
+ timestamp: now,
+ uuid: guuid.New(),
+ topic: topic,
+ flowID: guuid.New(),
+ payload: payload,
+ }
+ messages = append(messages, message)
+ }
+ commit := func() {
+ messages = messages[1:]
+ }
+ dead := func() {
+ message := messages[0]
+ deadletter := deadletterT{ uuid: guuid.New() }
+
+ messages = messages[1:]
+ deadletters = append(deadletters, deadletter)
+ deadMessages = append(deadMessages, message)
+ }
+ next := func() string {
+ return string(messages[0].payload)
+ }
+
+
+ g.Testing("error when there is nothing to replay", func() {
+ rc, err := replayExec(args, queries, r, &w)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+
+ pub([]byte("first payload"))
+ pub([]byte("second payload"))
+ pub([]byte("third payload"))
+ pub([]byte("fourth payload"))
+
+ rc, err = replayExec(args, queries, r, &w)
+ g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+
+ g.Testing("deadletters are replayed in order", func() {
+ dead()
+ commit()
+ dead()
+ commit()
+
+ rc, err := replayExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(next(), "first payload")
+
+ commit()
+
+ rc, err = replayExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(next(), "third payload")
+ })
+
+ g.Testing("oneDead() error is forwarded", func() {
+ oneDeadErr = errors.New("oneDead() error")
+ rc, err := replayExec(args, queries, r, &w)
+ g.TAssertEqual(err, oneDeadErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ oneDeadErr = nil
+ })
+
+ g.Testing("replay() error is also forwarded", func() {
+ pub([]byte{})
+ dead()
+
+ replayErr = errors.New("replay() error")
+ rc, err := replayExec(args, queries, r, &w)
+ g.TAssertEqual(err, replayErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ })
+}
+
+func test_sizeExec() {
+ g.TestStart("sizeExec()")
+
+ const (
+ topic = "sizeExec topic"
+ consumer = "sizeExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var sizeErr error
+ queries := queriesT{
+ size: func(string) (int, error) {
+ if sizeErr != nil {
+ return -1, sizeErr
+ }
+
+ return 123, nil
+ },
+ }
+
+
+ g.Testing("it propagates the error when the query fails", func() {
+ sizeErr = errors.New("size() error")
+ var w strings.Builder
+ rc, err := sizeExec(args, queries, r, &w)
+ g.TAssertEqual(err, sizeErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ sizeErr = nil
+ })
+
+ g.Testing("otherwise it just prints what is was given", func() {
+ var w strings.Builder
+ rc, err := sizeExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "123\n")
+ g.TAssertEqual(rc, 0)
+ })
+}
+
+func test_countExec() {
+ g.TestStart("countExec()")
+
+ const (
+ topic = "countExec topic"
+ consumer = "countExec consumer"
+ )
+ var (
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ var countErr error
+ queries := queriesT{
+ count: func(string, string) (int, error) {
+ if countErr != nil {
+ return -1, countErr
+ }
+
+ return 2222, nil
+ },
+ }
+
+
+ g.Testing("it propagates the query error", func() {
+ countErr = errors.New("count() error")
+ var w strings.Builder
+ rc, err := countExec(args, queries, r, &w)
+ g.TAssertEqual(err, countErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ countErr = nil
+ })
+
+ g.Testing("otherwise it prints the given count", func() {
+ var w strings.Builder
+ rc, err := countExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(w.String(), "2222\n")
+ g.TAssertEqual(rc, 0)
+ })
+}
+
+func test_hasDataExec() {
+ g.TestStart("hasData()")
+
+ const (
+ topic = "hasData topic"
+ consumer = "hasData consumer"
+ )
+ var (
+ w strings.Builder
+ r = strings.NewReader("")
+ args = argsT{
+ topic: topic,
+ consumer: consumer,
+ }
+ )
+
+ hasData := true
+ var hasDataErr error
+ queries := queriesT{
+ hasData: func(string, string) (bool, error) {
+ if hasDataErr != nil {
+ return false, hasDataErr
+ }
+
+ return hasData, nil
+ },
+ }
+
+
+ g.Testing("it propagates the query error", func() {
+ hasDataErr = errors.New("hasData() error")
+ rc, err := hasDataExec(args, queries, r, &w)
+ g.TAssertEqual(err, hasDataErr)
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 1)
+ hasDataErr = nil
+ })
+
+ g.Testing("otherwise if just returns (not prints) the flag", func() {
+ hasData = true
+ rc, err := hasDataExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 0)
+
+ hasData = false
+ rc, err = hasDataExec(args, queries, r, &w)
+ g.TErrorIf(err)
+ g.TAssertEqual(rc, 1)
+
+ g.TAssertEqual(w.String(), "")
+ })
+}
+
+func test_usage() {
+ g.TestStart("usage()")
+
+ g.Testing("it just writes to io.Writer", func() {
+ var w strings.Builder
+ usage("xxx", &w)
+ const message =
+ "Usage: xxx [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
+ g.TAssertEqual(w.String(), message)
+ })
+
+ g.Testing("noop on io.Discard for io.Writer", func() {
+ usage("AN ERROR IF SEEN ANYWHERE!", io.Discard)
+ })
+}
+
+func test_getopt() {
+ g.TestStart("getopt()")
+
+ const warning = "Missing COMMAND.\n"
+ const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
+
+ execFn := func(argsT, queriesT, io.Reader, io.Writer) (int, error) {
+ return 0, nil
+ }
+ commandsMap := map[string]commandT {
+ "good": commandT{
+ name: "good",
+ getopt: func(args argsT, _ io.Writer) (argsT, bool) {
+ return args, true
+ },
+ exec: execFn,
+ },
+ "bad": commandT{
+ name: "bad",
+ getopt: func(args argsT, w io.Writer) (argsT, bool) {
+ if len(args.args) == 0 {
+ fmt.Fprintln(
+ w,
+ "no required arg",
+ )
+ return args, false
+ }
+
+ if args.args[0] != "required" {
+ fmt.Fprintln(
+ w,
+ "not correct one",
+ )
+ return args, false
+ }
+
+ args.topic = "a topic"
+ args.consumer = "a consumer"
+ return args, true
+ },
+ exec: execFn,
+ },
+ }
+
+
+ g.Testing("we suppress the default error message", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-h"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ g.TAssertEqual(w.String(), usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("we get an error on unsupported flag", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-X"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ const message = "flag provided but not defined: -X\n"
+ g.TAssertEqual(w.String(), message + usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("we also get an error on incorrect usage of flags", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-f"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ const message = "flag needs an argument: -f\n"
+ g.TAssertEqual(w.String(), message + usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("error when not given a command", func() {
+ var w strings.Builder
+ argv := []string{"$0"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ g.TAssertEqual(w.String(), warning + usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("error on unknown command", func() {
+ var w strings.Builder
+ argv := []string{"$0", "unknown"}
+ _, _, rc := getopt(argv, commandsMap, &w)
+
+ const message = `Bad COMMAND: "unknown".` + "\n"
+ g.TAssertEqual(w.String(), message + usage)
+ g.TAssertEqual(rc, 2)
+ })
+
+ g.Testing("checks the command usage", func() {
+ var (
+ w1 strings.Builder
+ w2 strings.Builder
+ w3 strings.Builder
+ )
+
+ argv1 := []string{"$0", "bad"}
+ argv2 := []string{"$0", "bad", "arg"}
+ argv3 := []string{"$0", "bad", "required"}
+ _, _, rc1 := getopt(argv1, commandsMap, &w1)
+ _, _, rc2 := getopt(argv2, commandsMap, &w2)
+ args, command, rc3 := getopt(argv3, commandsMap, &w3)
+ expectedArgs := argsT{
+ databasePath: "q.db",
+ prefix: "q",
+ command: "bad",
+ allArgs: argv3,
+ args: argv3[2:],
+ topic: "a topic",
+ consumer: "a consumer",
+ }
+
+ g.TAssertEqual(w1.String(), "no required arg\n" + usage)
+ g.TAssertEqual(w2.String(), "not correct one\n" + usage)
+ g.TAssertEqual(w3.String(), "")
+ g.TAssertEqual(rc1, 2)
+ g.TAssertEqual(rc2, 2)
+ g.TAssertEqual(rc3, 0)
+ g.TAssertEqual(args, expectedArgs)
+ g.TAssertEqual(command.name, "bad")
+ })
+
+ g.Testing("when given a command we the default values", func() {
+ var w strings.Builder
+ args, command, rc := getopt(
+ []string{"$0", "good"},
+ commandsMap,
+ &w,
+ )
+ expectedArgs := argsT{
+ databasePath: "q.db",
+ prefix: "q",
+ command: "good",
+ allArgs: []string{"$0", "good"},
+ args: []string{},
+ }
+
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(args, expectedArgs)
+ g.TAssertEqual(command.name, "good")
+ })
+
+ g.Testing("we can customize both values", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-f", "F", "-p", "P", "good"}
+ args, command, rc := getopt(argv, commandsMap, &w)
+ expectedArgs := argsT{
+ databasePath: "F",
+ prefix: "P",
+ command: "good",
+ allArgs: argv,
+ args: []string{},
+ }
+
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(args, expectedArgs)
+ g.TAssertEqual(command.name, "good")
+ })
+
+ g.Testing("a command can have its own commands and options", func() {
+ var w strings.Builder
+ argv := []string{"$0", "-f", "F", "good", "-f", "-f", "SUB"}
+ args, command, rc := getopt(argv, commandsMap, &w)
+ expectedArgs := argsT{
+ databasePath: "F",
+ prefix: "q",
+ command: "good",
+ allArgs: argv,
+ args: []string{"-f", "-f", "SUB"},
+ }
+
+ g.TAssertEqual(w.String(), "")
+ g.TAssertEqual(rc, 0)
+ g.TAssertEqual(args, expectedArgs)
+ g.TAssertEqual(command.name, "good")
+ })
+}
+
+func test_runCommand() {
+ g.TestStart("runCommand()")
+
+ const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n"
+
+
+ g.Testing("returns an error on bad prefix", func() {
+ stdin := strings.NewReader("")
+ var (
+ stdout strings.Builder
+ stderr strings.Builder
+ )
+ args := argsT{
+ prefix: "a bad prefix",
+ command: "in",
+ allArgs: []string{"$0"},
+ args: []string{"some topic name"},
+ }
+ rc := runCommand(args, commands["in"], stdin, &stdout, &stderr)
+
+ g.TAssertEqual(rc, 1)
+ g.TAssertEqual(stdout.String(), "")
+ g.TAssertEqual(stderr.String(), "Invalid table prefix\n")
+ })
+
+ g.Testing("otherwise it build a queueT and calls command", func() {
+ stdin := strings.NewReader("")
+ var (
+ stdout1 strings.Builder
+ stdout2 strings.Builder
+ stderr1 strings.Builder
+ stderr2 strings.Builder
+ )
+ args1 := argsT{
+ prefix: defaultPrefix,
+ command: "good",
+ }
+ args2 := argsT{
+ prefix: defaultPrefix,
+ command: "bad",
+ }
+ myErr := errors.New("an error")
+ good := commandT{
+ exec: func(
+ argsT,
+ queriesT,
+ io.Reader,
+ io.Writer,
+ ) (int, error) {
+ return 0, nil
+ },
+ }
+ bad := commandT{
+ exec: func(
+ _ argsT,
+ _ queriesT,
+ _ io.Reader,
+ w io.Writer,
+ ) (int, error) {
+ fmt.Fprintf(w, "some text\n")
+ return 1, myErr
+ },
+ }
+ rc1 := runCommand(args1, good, stdin, &stdout1, &stderr1)
+ rc2 := runCommand(args2, bad, stdin, &stdout2, &stderr2)
+
+ g.TAssertEqual(stdout1.String(), "")
+ g.TAssertEqual(stdout2.String(), "some text\n")
+ g.TAssertEqual(stderr1.String(), "")
+ g.TAssertEqual(stderr2.String(), "an error\n")
+ g.TAssertEqual(rc1, 0)
+ g.TAssertEqual(rc2, 1)
+ })
+}
+
+
+func dumpQueries() {
+ queries := []struct{name string; fn func(string) queryT}{
+ { "createTables", createTablesSQL },
+ { "take", takeSQL },
+ { "publish", publishSQL },
+ { "find", findSQL },
+ { "pending", pendingSQL },
+ { "commit", commitSQL },
+ { "toDead", toDeadSQL },
+ { "replay", replaySQL },
+ { "oneDead", oneDeadSQL },
+ { "allDead", allDeadSQL },
+ { "size", sizeSQL },
+ { "count", countSQL },
+ { "hasData", hasDataSQL },
+ }
+ for _, query := range queries {
+ q := query.fn(defaultPrefix)
+ fmt.Printf("\n-- %s.sql:", query.name)
+ fmt.Printf("\n-- write: %s\n", q.write)
+ fmt.Printf("\n-- read: %s\n", q.read)
+ fmt.Printf("\n-- owner: %s\n", q.owner)
+ }
+}
+
+
+
+func MainTest() {
+ if os.Getenv("TESTING_DUMP_SQL_QUERIES") != "" {
+ dumpQueries()
+ return
+ }
+
+ g.Init()
+ test_defaultPrefix()
+ test_inTx()
+ test_createTables()
+ test_takeStmt()
+ test_publishStmt()
+ test_findStmt()
+ test_nextStmt()
+ test_messageEach()
+ test_pendingStmt()
+ test_commitStmt()
+ test_toDeadStmt()
+ test_replayStmt()
+ test_oneDeadStmt()
+ test_deadletterEach()
+ test_allDeadStmt()
+ test_sizeStmt()
+ test_countStmt()
+ test_hasDataStmt()
+ test_initDB()
+ test_queriesTclose()
+ test_newPinger()
+ test_makeSubscriptionsFunc()
+ test_makeNotifyFn()
+ test_collectClosedWaiters()
+ test_trimEmptyLeaves()
+ test_deleteIfEmpty()
+ test_deleteEmptyTopics()
+ test_removeClosedWaiter()
+ test_reapClosedWaiters()
+ test_everyNthCall()
+ test_runReaper()
+ test_NewWithPrefix()
+ test_New()
+ test_asPublicMessage()
+ test_queueT_Publish()
+ test_registerConsumerFn()
+ test_registerWaiterFn()
+ test_makeConsumeOneFn()
+ test_makeConsumeAllFn()
+ test_makeWaitFn()
+ test_runConsumer()
+ test_tryFinding()
+ test_queueT_Subscribe()
+ test_queueT_WaitFor()
+ test_unsubscribeIfExistsFn()
+ test_queueT_Unsubscribe()
+ test_cleanSubscriptions()
+ test_queueT_Close()
+ test_topicGetopt()
+ test_topicConsumerGetopt()
+ test_inExec()
+ test_outExec()
+ test_commitExec()
+ test_deadExec()
+ test_listDeadExec()
+ test_replayExec()
+ test_sizeExec()
+ test_countExec()
+ test_hasDataExec()
+ test_usage()
+ test_getopt()
+ test_runCommand()
+}