diff options
Diffstat (limited to 'tests/q.go')
-rw-r--r-- | tests/q.go | 5776 |
1 files changed, 5776 insertions, 0 deletions
diff --git a/tests/q.go b/tests/q.go new file mode 100644 index 0000000..6b9e422 --- /dev/null +++ b/tests/q.go @@ -0,0 +1,5776 @@ +package q + +import ( + "bytes" + "database/sql" + "errors" + "fmt" + "io" + "log/slog" + "os" + "reflect" + "sort" + "strings" + "sync" + "time" + + "acudego" + "guuid" + g "gobang" +) + + + +func test_defaultPrefix() { + g.TestStart("defaultPrefix") + + g.Testing("the defaultPrefix is valid", func() { + g.TErrorIf(g.ValidateSQLTablePrefix(defaultPrefix)) + }) +} + +func test_inTx() { + /* + // FIXME + g.TestStart("inTx()") + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + + g.Testing("when fn() errors, we propagate it", func() { + myError := errors.New("to be propagated") + err := inTx(db, func(tx *sql.Tx) error { + return myError + }) + g.TAssertEqual(err, myError) + }) + + g.Testing("on nil error we get nil", func() { + err := inTx(db, func(tx *sql.Tx) error { + return nil + }) + g.TErrorIf(err) + }) + */ +} + +func test_createTables() { + g.TestStart("createTables()") + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + + g.Testing("tables exist afterwards", func() { + tmpl := ` + SELECT id FROM "%s_messages" LIMIT 1; + ` + q := fmt.Sprintf(tmpl, defaultPrefix) + + _, err := db.Exec(q) + g.TErrorNil(err) + + err = createTables(db, defaultPrefix) + g.TErrorIf(err) + + _, err = db.Exec(q) + g.TErrorIf(err) + }) + + g.Testing("we can do it multiple times", func() { + g.TErrorIf(g.SomeError( + createTables(db, defaultPrefix), + createTables(db, defaultPrefix), + createTables(db, defaultPrefix), + )) + }) +} + +func test_takeStmt() { + g.TestStart("takeStmt()") + + const ( + topic = "take() topic" + consumer = "take() consumer" + prefix = defaultPrefix + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + g.TErrorIf(takeErr) + defer g.SomeFnError( + takeClose, + db.Close, + ) + + const tmpl = ` + SELECT owner_id from "%s_owners" + WHERE + topic = ? AND + consumer = ?; + ` + sqlOwner := fmt.Sprintf(tmpl, prefix) + + + g.Testing("when there is no owner, we become it", func() { + var ownerID int + err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) + g.TAssertEqual(err, sql.ErrNoRows) + + err = take(topic, consumer) + g.TErrorIf(err) + + err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) + g.TErrorIf(err) + g.TAssertEqual(ownerID, instanceID) + }) + + g.Testing("if there is already an owner, we overtake it", func() { + otherID := instanceID + 1 + + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + var ownerID int + err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) + g.TErrorIf(err) + g.TAssertEqual(ownerID, instanceID) + + err = take(topic, consumer) + g.TErrorIf(err) + + err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) + g.TErrorIf(err) + g.TAssertEqual(ownerID, otherID) + }) + g.Testing("no error if closed more than once", func() { + g.TErrorIf(g.SomeError( + takeClose(), + takeClose(), + takeClose(), + )) + }) +} + +func test_publishStmt() { + g.TestStart("publishStmt()") + + const ( + topic = "publish() topic" + payloadStr = "publish() payload" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + publishErr, + )) + defer g.SomeFnError( + publishClose, + db.Close, + ) + + + g.Testing("we can publish a message", func() { + messageID := guuid.New() + message, err := publish(unsent, messageID) + g.TErrorIf(err) + + g.TAssertEqual(message.id, int64(1)) + g.TAssertEqual(message.uuid, messageID) + g.TAssertEqual(message.topic, topic) + g.TAssertEqual(message.flowID, flowID) + g.TAssertEqual(message.payload, payload) + }) + + g.Testing("we can publish the same message repeatedly", func() { + messageID1 := guuid.New() + messageID2 := guuid.New() + message1, err1 := publish(unsent, messageID1) + message2, err2 := publish(unsent, messageID2) + g.TErrorIf(g.SomeError(err1, err2)) + + g.TAssertEqual(message1.id, message2.id - 1) + g.TAssertEqual(message1.topic, message2.topic) + g.TAssertEqual(message1.flowID, message2.flowID) + g.TAssertEqual(message1.payload, message2.payload) + + g.TAssertEqual(message1.uuid, messageID1) + g.TAssertEqual(message2.uuid, messageID2) + }) + + g.Testing("publishing a message with the same UUID errors", func() { + messageID := guuid.New() + message1, err1 := publish(unsent, messageID) + _, err2 := publish(unsent, messageID) + g.TErrorIf(err1) + + g.TAssertEqual(message1.uuid, messageID) + g.TAssertEqual(message1.topic, topic) + g.TAssertEqual(message1.flowID, flowID) + g.TAssertEqual(message1.payload, payload) + + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + publishClose(), + publishClose(), + publishClose(), + )) + }) +} + +func test_findStmt() { + g.TestStart("findStmt()") + + const ( + topic = "find() topic" + payloadStr = "find() payload" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + find, findClose, findErr := findStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + publishErr, + findErr, + )) + defer g.SomeFnError( + publishClose, + findClose, + db.Close, + ) + + pub := func(flowID guuid.UUID) guuid.UUID { + unsentWithFlowID := unsent + unsentWithFlowID.FlowID = flowID + messageID := guuid.New() + _, err := publish(unsentWithFlowID, messageID) + g.TErrorIf(err) + return messageID + } + + + g.Testing("we can find a message by topic and flowID", func() { + flowID := guuid.New() + messageID := pub(flowID) + message, err := find(topic, flowID) + g.TErrorIf(err) + + g.TAssertEqual(message.uuid, messageID) + g.TAssertEqual(message.topic, topic) + g.TAssertEqual(message.flowID, flowID) + g.TAssertEqual(message.payload, payload) + }) + + g.Testing("a non-existent message gives us an error", func() { + message, err := find(topic, guuid.New()) + g.TAssertEqual(message, messageT{}) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("findig twice yields the exact same message", func() { + flowID := guuid.New() + messageID := pub(flowID) + message1, err1 := find(topic, flowID) + message2, err2 := find(topic, flowID) + g.TErrorIf(g.SomeError(err1, err2)) + + g.TAssertEqual(message1.uuid, messageID) + g.TAssertEqual(message1, message2) + }) + + g.Testing("returns the latest entry if multiple are available", func() { + flowID := guuid.New() + + _ , err0 := find(topic, flowID) + pub(flowID) + message1, err1 := find(topic, flowID) + pub(flowID) + message2, err2 := find(topic, flowID) + + g.TAssertEqual(err0, sql.ErrNoRows) + g.TErrorIf(g.SomeError(err1, err2)) + g.TAssertEqual(message1.uuid == message2.uuid, false) + g.TAssertEqual(message1.id < message2.id, true) + }) + + g.Testing("no error if closed more than once", func() { + g.TErrorIf(g.SomeError( + findClose(), + findClose(), + findClose(), + )) + }) +} + +func test_nextStmt() { + g.TestStart("nextStmt()") + + const ( + topic = "next() topic" + payloadStr = "next() payload" + consumer = "next() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + next, nextClose, nextErr := nextStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + nextErr, + commitErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + nextClose, + commitClose, + db.Close, + ) + + pub := func(topic string) messageT { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message + } + + + g.Testing("we get an error on empty topic", func() { + _, err := next(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("we don't get messages from other topics", func() { + pub("other topic") + _, err := next(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("we can get the next message", func() { + expectedMessage := pub(topic) + pub(topic) + pub(topic) + message, err := next(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(message, expectedMessage) + }) + + g.Testing("we keep getting the next until we commit", func() { + message1, err1 := next(topic, consumer) + message2, err2 := next(topic, consumer) + g.TErrorIf(commit(consumer, message1.uuid)) + message3, err3 := next(topic, consumer) + g.TErrorIf(g.SomeError(err1, err2, err3)) + + g.TAssertEqual(message1, message2) + g.TAssertEqual(message2.uuid != message3.uuid, true) + }) + + g.Testing("each consumer has its own next message", func() { + g.TErrorIf(take(topic, "other consumer")) + message1, err1 := next(topic, consumer) + message2, err2 := next(topic, "other consumer") + g.TErrorIf(g.SomeError(err1, err2)) + g.TAssertEqual(message1.uuid != message2.uuid, true) + }) + + g.Testing("error when we're not the owner", func() { + otherID := instanceID + 1 + + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + _, err := next(topic, consumer) + g.TErrorIf(err) + + err = take(topic, consumer) + g.TErrorIf(err) + + _, err = next(topic, consumer) + g.TAssertEqual(err, fmt.Errorf( + notOwnerErrorFmt, + otherID, + topic, + consumer, + instanceID, + )) + }) + + g.Testing("we can close more than once", func() { + g.TErrorIf(g.SomeError( + nextClose(), + nextClose(), + nextClose(), + )) + }) +} + +func test_messageEach() { + g.TestStart("messageEach()") + + const ( + topic = "messageEach() topic" + payloadStr = "messageEach() payload" + consumer = "messageEach() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + pendingErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + pendingClose, + db.Close, + ) + + pub := func() guuid.UUID { + message, err := publish(unsent, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + g.TErrorIf(take(topic, consumer)) + + + g.Testing("not called on empty set", func() { + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + messageEach(rows, func(messageT) error { + g.Unreachable() + return nil + }) + }) + + g.Testing("the callback is called once for each entry", func() { + messageIDs := []guuid.UUID{ + pub(), + pub(), + pub(), + } + + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + var collectedIDs []guuid.UUID + err = messageEach(rows, func(message messageT) error { + collectedIDs = append(collectedIDs, message.uuid) + return nil + }) + g.TErrorIf(err) + + g.TAssertEqual(collectedIDs, messageIDs) + }) + + g.Testing("we halt if the timestamp is ill-formatted", func() { + messageID := pub() + message_id_bytes := messageID[:] + pub() + pub() + pub() + + const tmplUpdate = ` + UPDATE "%s_messages" + SET timestamp = '01/01/1970' + WHERE uuid = ?; + ` + sqlUpdate := fmt.Sprintf(tmplUpdate, prefix) + _, err := db.Exec(sqlUpdate, message_id_bytes) + g.TErrorIf(err) + + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + n := 0 + err = messageEach(rows, func(messageT) error { + n++ + return nil + }) + + g.TAssertEqual( + err, + &time.ParseError{ + Layout: time.RFC3339Nano, + Value: "01/01/1970", + LayoutElem: "2006", + ValueElem: "01/01/1970", + Message: "", + }, + ) + g.TAssertEqual(n, 3) + + const tmplDelete = ` + DELETE FROM "%s_messages" + WHERE uuid = ?; + ` + sqlDelete := fmt.Sprintf(tmplDelete, prefix) + _, err = db.Exec(sqlDelete, message_id_bytes) + g.TErrorIf(err) + }) + + g.Testing("we halt if the callback returns an error", func() { + myErr := errors.New("callback error early return") + + rows1, err1 := pending(topic, consumer) + g.TErrorIf(err1) + + n1 := 0 + err1 = messageEach(rows1, func(messageT) error { + n1++ + if n1 == 4 { + return myErr + } + return nil + }) + + rows2, err2 := pending(topic, consumer) + g.TErrorIf(err2) + + n2 := 0 + err2 = messageEach(rows2, func(messageT) error { + n2++ + return nil + }) + + g.TAssertEqual(err1, myErr) + g.TErrorIf(err2) + g.TAssertEqual(n1, 4) + g.TAssertEqual(n2, 6) + }) + + g.Testing("noop when given nil for *sql.Rows", func() { + err := messageEach(nil, func(messageT) error { + g.Unreachable() + return nil + }) + g.TErrorIf(err) + }) +} + +func test_pendingStmt() { + g.TestStart("pendingStmt()") + + const ( + topic = "pending() topic" + payloadStr = "pending() payload" + consumer = "pending() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + pendingErr, + commitErr, + toDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + pendingClose, + commitClose, + toDeadClose, + db.Close, + ) + + pub := func(topic string) messageT { + g.TErrorIf(take(topic, consumer)) + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message + } + g.TErrorIf(take(topic, consumer)) + + collectPending := func(topic string, consumer string) []messageT { + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + var messages []messageT + err = messageEach(rows, func(message messageT) error { + messages = append(messages, message) + return nil + }) + g.TErrorIf(err) + return messages + } + + + g.Testing("an empty database has 0 pending items", func() { + g.TAssertEqual(len(collectPending(topic, consumer)), 0) + }) + + g.Testing("after publishing we get all messages", func() { + expected := []messageT{ + pub(topic), + pub(topic), + } + + g.TAssertEqualI(collectPending(topic, consumer), expected) + }) + + g.Testing("we get the same messages when calling again", func() { + messages1 := collectPending(topic, consumer) + messages2 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + g.TAssertEqualI(messages1, messages2) + }) + + g.Testing("we don't get messages from other topics", func() { + pub("other topic") + + g.TAssertEqual(len(collectPending(topic, consumer)), 2) + g.TAssertEqual(len(collectPending("other topic", consumer)), 1) + }) + + g.Testing("after others commit, pending still returns them", func() { + g.TErrorIf(take(topic, "other consumer")) + messages1 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + g.TErrorIf( + commit("other consumer", messages1[0].uuid), + ) + + messages2 := collectPending(topic, consumer) + g.TAssertEqualI(messages1, messages2) + }) + + g.Testing("committing other topic doesn't change current", func() { + messages1 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + + message := pub("other topic") + + g.TErrorIf(commit(consumer, message.uuid)) + + messages2 := collectPending(topic, consumer) + g.TAssertEqualI(messages1, messages2) + }) + + g.Testing("after commiting, pending doesn't return them again", func() { + messages1 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + + g.TErrorIf(commit(consumer, messages1[0].uuid)) + + messages2 := collectPending(topic, consumer) + g.TAssertEqual(len(messages2), 1) + g.TAssertEqual(messages2[0], messages1[1]) + + g.TErrorIf(commit(consumer, messages1[1].uuid)) + + messages3 := collectPending(topic, consumer) + g.TAssertEqual(len(messages3), 0) + }) + + g.Testing("on deadletter, pending also doesn't return them", func() { + messages0 := collectPending(topic, consumer) + g.TAssertEqual(len(messages0), 0) + + message1 := pub(topic) + message2 := pub(topic) + + messages1 := collectPending(topic, consumer) + g.TAssertEqual(len(messages1), 2) + + err = toDead(consumer, message1.uuid, guuid.New()) + g.TErrorIf(err) + + messages2 := collectPending(topic, consumer) + g.TAssertEqual(len(messages2), 1) + g.TAssertEqual(messages2[0], message2) + + err = toDead(consumer, message2.uuid, guuid.New()) + g.TErrorIf(err) + + messages3 := collectPending(topic, consumer) + g.TAssertEqual(len(messages3), 0) + }) + + g.Testing("if commits are unordered, pending is still sorted", func() { + message1 := pub(topic) + message2 := pub(topic) + message3 := pub(topic) + + g.TAssertEqual(collectPending(topic, consumer), []messageT{ + message1, + message2, + message3, + }) + + g.TErrorIf(commit(consumer, message2.uuid)) + g.TAssertEqual(collectPending(topic, consumer), []messageT{ + message1, + message3, + }) + + g.TErrorIf(commit(consumer, message1.uuid)) + g.TAssertEqual(collectPending(topic, consumer), []messageT{ + message3, + }) + + g.TErrorIf(commit(consumer, message3.uuid)) + g.TAssertEqual(len(collectPending(topic, consumer)), 0) + }) + + g.Testing("when we're not the owners we get nothing", func() { + otherID := instanceID + 1 + + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + message1 := pub(topic) + message2 := pub(topic) + message3 := pub(topic) + message4 := pub(topic) + message5 := pub(topic) + + expected := []messageT{ + message1, + message2, + message3, + message4, + message5, + } + + g.TAssertEqual(collectPending(topic, consumer), expected) + + err := take(topic, consumer) + g.TErrorIf(err) + + rows, err := pending(topic, consumer) + g.TErrorIf(err) + + err = messageEach(rows, func(messageT) error { + g.Unreachable() + return nil + }) + g.TErrorIf(err) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + pendingClose(), + pendingClose(), + pendingClose(), + )) + }) +} + +func test_commitStmt() { + g.TestStart("commitStmt()") + + const ( + topic = "commit() topic" + payloadStr = "commit() payload" + consumer = "commit() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + commitErr, + toDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + commitClose, + toDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + cmt := func(consumer string, messageID guuid.UUID) error { + g.TErrorIf(take(topic, consumer)) + + return commit(consumer, messageID) + } + + + g.Testing("we can't commit twice", func() { + messageID := pub(topic) + + err1 := cmt(consumer, messageID) + err2 := cmt(consumer, messageID) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("we can't commit non-existent messages", func() { + err := cmt(consumer, guuid.New()) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("multiple consumers may commit a message", func() { + messageID := pub(topic) + + g.TErrorIf(g.SomeError( + cmt(consumer, messageID), + cmt("other consumer", messageID), + cmt("yet another consumer", messageID), + )) + }) + + g.Testing("a consumer can commit to multiple topics", func() { + messageID1 := pub(topic) + messageID2 := pub("other topic") + messageID3 := pub("yet another topic") + + g.TErrorIf(g.SomeError( + cmt(consumer, messageID1), + cmt(consumer, messageID2), + cmt(consumer, messageID3), + )) + }) + + g.Testing("a consumer can consume many messages from a topic", func() { + messageID1 := pub(topic) + messageID2 := pub(topic) + messageID3 := pub(topic) + + g.TErrorIf(g.SomeError( + cmt(consumer, messageID1), + cmt(consumer, messageID2), + cmt(consumer, messageID3), + )) + }) + + g.Testing("we can't commit a dead message", func() { + messageID := pub(topic) + + err1 := toDead(consumer, messageID, guuid.New()) + err2 := cmt(consumer, messageID) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("error if we don't own the topic/consumer", func() { + otherID := instanceID + 1 + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + messageID := pub(topic) + + err := take(topic, consumer) + g.TErrorIf(err) + + err = commit(consumer, messageID) + g.TAssertEqual(err, fmt.Errorf( + noLongerOwnerErrorFmt, + instanceID, + topic, + consumer, + otherID, + )) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + commitClose(), + commitClose(), + commitClose(), + )) + }) +} + +func test_toDeadStmt() { + g.TestStart("toDeadStmt()") + + const ( + topic = "toDead() topic" + payloadStr = "toDead() payload" + consumer = "toDead() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + commitErr, + toDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + commitClose, + toDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + asDead := func( + consumer string, + messageID guuid.UUID, + deadletterID guuid.UUID, + ) error { + g.TErrorIf(take(topic, consumer)) + return toDead(consumer, messageID, deadletterID) + } + + + g.Testing("we can't mark as dead twice", func() { + messageID := pub(topic) + + err1 := asDead(consumer, messageID, guuid.New()) + err2 := asDead(consumer, messageID, guuid.New()) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("we can't reuse a deadletter id", func() { + messageID1 := pub(topic) + messageID2 := pub(topic) + deadletterID := guuid.New() + + err1 := asDead(consumer, messageID1, deadletterID) + err2 := asDead(consumer, messageID2, deadletterID) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + + }) + + g.Testing("we can't mark as dead non-existent messages", func() { + err := asDead(consumer, guuid.New(), guuid.New()) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("multiple consumers may mark a message as dead", func() { + messageID := pub(topic) + + g.TErrorIf(g.SomeError( + asDead(consumer, messageID, guuid.New()), + asDead("another consumer", messageID, guuid.New()), + asDead("yet another consumer", messageID, guuid.New()), + )) + }) + + g.Testing("a consumer can mark as dead in multiple topics", func() { + messageID1 := pub(topic) + messageID2 := pub("other topic") + messageID3 := pub("yet other topic") + + g.TErrorIf(g.SomeError( + asDead(consumer, messageID1, guuid.New()), + asDead(consumer, messageID2, guuid.New()), + asDead(consumer, messageID3, guuid.New()), + )) + }) + + g.Testing("a consumer can produce many deadletters in a topic", func() { + messageID1 := pub(topic) + messageID2 := pub(topic) + messageID3 := pub(topic) + + g.TErrorIf(g.SomeError( + asDead(consumer, messageID1, guuid.New()), + asDead(consumer, messageID2, guuid.New()), + asDead(consumer, messageID3, guuid.New()), + )) + }) + + g.Testing("a consumer can intercalate commits and deadletters", func() { + messageID1 := pub(topic) + messageID2 := pub(topic) + messageID3 := pub(topic) + messageID4 := pub(topic) + messageID5 := pub(topic) + + g.TErrorIf(g.SomeError( + asDead(consumer, messageID1, guuid.New()), + commit(consumer, messageID2), + commit(consumer, messageID3), + asDead(consumer, messageID4, guuid.New()), + commit(consumer, messageID5), + )) + }) + + g.Testing("we can't mark a committed message as dead", func() { + messageID := pub(topic) + + err1 := commit(consumer, messageID) + err2 := asDead(consumer, messageID, guuid.New()) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("error if we don't own the message's consumer/topic", func() { + otherID := instanceID + 1 + messageID1 := pub(topic) + messageID2 := pub(topic) + + take, takeClose, takeErr := takeStmt(db, prefix, otherID) + g.TErrorIf(takeErr) + defer takeClose() + + err := toDead(consumer, messageID1, guuid.New()) + g.TErrorIf(err) + + err = take(topic, consumer) + g.TErrorIf(err) + + err = toDead(consumer, messageID2, guuid.New()) + g.TAssertEqual(err, fmt.Errorf( + noLongerOwnerErrorFmt, + instanceID, + topic, + consumer, + otherID, + )) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + toDeadClose(), + toDeadClose(), + toDeadClose(), + )) + }) +} + +func test_replayStmt() { + g.TestStart("replayStmt()") + + const ( + topic = "replay() topic" + payloadStr = "replay() payload" + consumer = "replay() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + replayErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + replayClose, + db.Close, + ) + + pub := func() messageT { + message, err := publish(unsent, guuid.New()) + g.TErrorIf(err) + return message + } + g.TErrorIf(take(topic, consumer)) + + + g.Testing("we can replay a message", func() { + message := pub() + deadletterID := guuid.New() + replayedID := guuid.New() + + err1 := toDead(consumer, message.uuid, deadletterID) + replayed, err2 := replay(deadletterID, replayedID) + g.TErrorIf(g.SomeError(err1, err2)) + + g.TAssertEqual(replayed.uuid, replayedID) + g.TAssertEqual(replayed.id == message.id, false) + g.TAssertEqual(replayed.uuid == message.uuid, false) + }) + + g.Testing("a replayed message keeps its payload", func() { + message := pub() + deadletterID := guuid.New() + err := toDead(consumer, message.uuid, deadletterID) + g.TErrorIf(err) + + replayed, err := replay(deadletterID, guuid.New()) + g.TErrorIf(err) + g.TAssertEqual(message.flowID, replayed.flowID) + g.TAssertEqual(message.payload, replayed.payload) + }) + + g.Testing("we can't replay a dead message twice", func() { + message := pub() + deadletterID := guuid.New() + + err := toDead(consumer, message.uuid, deadletterID) + g.TErrorIf(err) + + _, err1 := replay(deadletterID, guuid.New()) + _, err2 := replay(deadletterID, guuid.New()) + g.TErrorIf(err1) + g.TAssertEqual( + err2.(acudego.Error).ExtendedCode, + acudego.ErrConstraintUnique, + ) + }) + + g.Testing("we cant replay non-existent messages", func() { + _, err := replay(guuid.New(), guuid.New()) + g.TAssertEqual( + err.(acudego.Error).ExtendedCode, + acudego.ErrConstraintNotNull, + ) + }) + + g.Testing("messages can die and then be replayed many times", func() { + message := pub() + deadletterID1 := guuid.New() + deadletterID2 := guuid.New() + + err := toDead(consumer, message.uuid, deadletterID1) + g.TErrorIf(err) + + replayed1, err := replay(deadletterID1, guuid.New()) + g.TErrorIf(err) + + err = toDead(consumer, replayed1.uuid, deadletterID2) + g.TErrorIf(err) + + replayed2, err := replay(deadletterID2, guuid.New()) + g.TErrorIf(err) + + g.TAssertEqual(message.flowID, replayed1.flowID) + g.TAssertEqual(replayed1.flowID, replayed2.flowID) + }) + + g.Testing("no actual closing occurs", func() { + g.TErrorIf(g.SomeError( + replayClose(), + replayClose(), + replayClose(), + )) + }) +} + +func test_oneDeadStmt() { + g.TestStart("oneDeadStmt()") + + const ( + topic = "oneDead() topic" + payloadStr = "oneDead() payload" + consumer = "oneDead() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + replayErr, + oneDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + replayClose, + oneDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + + g.Testing("error on missing deadletters", func() { + _, err := oneDead(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("deadletters on other topics don't show for us", func() { + err := toDead(consumer, pub("other topic"), guuid.New()) + g.TErrorIf(err) + + _, err = oneDead(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("deadletters for other consumers don't show for use", func() { + g.TErrorIf(take(topic, "other consumer")) + err := toDead("other consumer", pub(topic), guuid.New()) + g.TErrorIf(err) + + _, err = oneDead(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) + + g.Testing("after being replayed deadletters aren't returned", func() { + messageID1 := guuid.New() + messageID2 := guuid.New() + messageID3 := guuid.New() + + err1 := toDead(consumer, pub(topic), messageID1) + err2 := toDead(consumer, pub(topic), messageID2) + err3 := toDead(consumer, pub(topic), messageID3) + g.TErrorIf(g.SomeError(err1, err2, err3)) + + deadletter, err := oneDead(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(deadletter.uuid, messageID1) + + _, err = replay(messageID2, guuid.New()) + g.TErrorIf(err) + + deadletter, err = oneDead(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(deadletter.uuid, messageID1) + + _, err = replay(messageID1, guuid.New()) + g.TErrorIf(err) + + deadletter, err = oneDead(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(deadletter.uuid, messageID3) + + _, err = replay(messageID3, guuid.New()) + g.TErrorIf(err) + + _, err = oneDead(topic, consumer) + g.TAssertEqual(err, sql.ErrNoRows) + }) +} + +func test_deadletterEach() { + g.TestStart("deadletterEach") + + const ( + topic = "deadletterEach() topic" + payloadStr = "deadletterEach() payload" + consumer = "deadletterEach() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + allDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + allDeadClose, + db.Close, + ) + + pub := func() guuid.UUID { + message, err := publish(unsent, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + dead := func(messageID guuid.UUID) guuid.UUID { + deadletterID := guuid.New() + err := toDead(consumer, messageID, deadletterID) + g.TErrorIf(err) + + return deadletterID + } + g.TErrorIf(take(topic, consumer)) + + + g.Testing("not called on empty set", func() { + rows, err := allDead(topic, consumer) + g.TErrorIf(err) + + n := 0 + deadletterEach(rows, func(deadletterT, messageT) error { + n++ + return nil + }) + }) + + g.Testing("the callback is called once for each entry", func() { + expected := []guuid.UUID{ + dead(pub()), + dead(pub()), + dead(pub()), + } + + rows, err := allDead(topic, consumer) + g.TErrorIf(err) + + var deadletterIDs []guuid.UUID + deadletterEach(rows, func( + deadletter deadletterT, + _ messageT, + ) error { + deadletterIDs = append(deadletterIDs, deadletter.uuid) + return nil + }) + + g.TAssertEqual(deadletterIDs, expected) + }) + + g.Testing("we halt if the timestamp is ill-formatted", func() { + messageID := pub() + message_id_bytes := messageID[:] + dead(messageID) + dead(pub()) + dead(pub()) + dead(pub()) + dead(pub()) + + const tmplUpdate = ` + UPDATE "%s_offsets" + SET timestamp = '01-01-1970' + WHERE message_id IN ( + SELECT id FROM "%s_messages" WHERE uuid = ? + ); + ` + sqlUpdate := fmt.Sprintf(tmplUpdate, prefix, prefix) + _, err := db.Exec(sqlUpdate, message_id_bytes) + g.TErrorIf(err) + + rows, err := allDead(topic, consumer) + g.TErrorIf(err) + + n := 0 + err = deadletterEach(rows, func(deadletterT, messageT) error { + n++ + return nil + }) + + g.TAssertEqual( + err, + &time.ParseError{ + Layout: time.RFC3339Nano, + Value: "01-01-1970", + LayoutElem: "2006", + ValueElem: "01-01-1970", + Message: "", + }, + ) + g.TAssertEqual(n, 3) + + const tmplDelete = ` + DELETE FROM "%s_offsets" + WHERE message_id IN ( + SELECT id FROM "%s_messages" WHERE uuid = ? + ); + ` + sqlDelete := fmt.Sprintf(tmplDelete, prefix, prefix) + _, err = db.Exec(sqlDelete, message_id_bytes) + g.TErrorIf(err) + }) + + g.Testing("we halt if the callback returns an error", func() { + myErr := errors.New("early return error") + + rows1, err1 := allDead(topic, consumer) + g.TErrorIf(err1) + + n1 := 0 + err1 = deadletterEach(rows1, func(deadletterT, messageT) error { + n1++ + if n1 == 1 { + return myErr + } + return nil + }) + + rows2, err2 := allDead(topic, consumer) + g.TErrorIf(err2) + + n2 := 0 + err = deadletterEach(rows2, func(deadletterT, messageT) error { + n2++ + return nil + }) + + g.TAssertEqual(err1, myErr) + g.TErrorIf(err2) + g.TAssertEqual(n1, 1) + g.TAssertEqual(n2, 7) + }) +} + +func test_allDeadStmt() { + g.TestStart("allDeadStmt()") + + const ( + topic = "allDead() topic" + payloadStr = "allDead() payload" + consumer = "allDead() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + replayErr, + allDeadErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + replayClose, + allDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + collectAll := func( + topic string, + consumer string, + ) ([]deadletterT, []messageT) { + var ( + deadletters []deadletterT + messages []messageT + ) + eachFn := func( + deadletter deadletterT, + message messageT, + ) error { + deadletters = append(deadletters, deadletter) + messages = append(messages, message) + return nil + } + + rows, err := allDead(topic, consumer) + g.TErrorIf(err) + + err = deadletterEach(rows, eachFn) + g.TErrorIf(err) + + return deadletters, messages + } + + + g.Testing("no entry on empty deadletters", func() { + deadletterIDs, _ := collectAll(topic, consumer) + g.TAssertEqual(len(deadletterIDs), 0) + }) + + g.Testing("deadletters on other topics don't show up", func() { + err := toDead(consumer, pub("other topic"), guuid.New()) + g.TErrorIf(err) + + deadletters, _ := collectAll(topic, consumer) + g.TAssertEqual(len(deadletters), 0) + }) + + g.Testing("deadletters of other consumers don't show up", func() { + g.TErrorIf(take(topic, "other consumer")) + err := toDead("other consumer", pub(topic), guuid.New()) + g.TErrorIf(err) + + deadletterIDs, _ := collectAll(topic, consumer) + g.TAssertEqual(len(deadletterIDs), 0) + }) + + g.Testing("deadletters are given in order", func() { + deadletterIDs := []guuid.UUID{ + guuid.New(), + guuid.New(), + guuid.New(), + } + messageIDs := []guuid.UUID{ + pub(topic), + pub(topic), + pub(topic), + } + + err1 := toDead(consumer, messageIDs[0], deadletterIDs[0]) + err2 := toDead(consumer, messageIDs[1], deadletterIDs[1]) + err3 := toDead(consumer, messageIDs[2], deadletterIDs[2]) + g.TErrorIf(g.SomeError(err1, err2, err3)) + + deadletters, messages := collectAll(topic, consumer) + g.TAssertEqual(deadletters[0].uuid, deadletterIDs[0]) + g.TAssertEqual(deadletters[1].uuid, deadletterIDs[1]) + g.TAssertEqual(deadletters[2].uuid, deadletterIDs[2]) + g.TAssertEqual( messages[0].uuid, messageIDs[0]) + g.TAssertEqual( messages[1].uuid, messageIDs[1]) + g.TAssertEqual( messages[2].uuid, messageIDs[2]) + g.TAssertEqual(len(deadletters), 3) + g.TAssertEqual(len(messages), 3) + }) + + g.Testing("after being replayed, they stop appearing", func() { + deadletters, _ := collectAll(topic, consumer) + g.TAssertEqual(len(deadletters), 3) + + _, err := replay(deadletters[0].uuid, guuid.New()) + g.TErrorIf(err) + collecteds, _ := collectAll(topic, consumer) + g.TAssertEqual(len(collecteds), 2) + + _, err = replay(deadletters[1].uuid, guuid.New()) + g.TErrorIf(err) + collecteds, _ = collectAll(topic, consumer) + g.TAssertEqual(len(collecteds), 1) + + _, err = replay(deadletters[2].uuid, guuid.New()) + g.TErrorIf(err) + collecteds, _ = collectAll(topic, consumer) + g.TAssertEqual(len(collecteds), 0) + }) +} + +func test_sizeStmt() { + g.TestStart("sizeStmt()") + + const ( + topic = "size() topic" + payloadStr = "size() payload" + consumer = "size() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID) + size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + toDeadErr, + replayErr, + oneDeadErr, + sizeErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + toDeadClose, + replayClose, + sizeClose, + oneDeadClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + + g.Testing("0 on empty topic", func() { + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("other topics don't fall into our count", func() { + pub("other topic") + + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("otherwise we just get the sum", func() { + pub(topic) + pub(topic) + pub(topic) + pub(topic) + pub(topic) + + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 5) + }) + + g.Testing("deadletters aren't taken into account", func() { + sixthMessageID := pub(topic) + err := toDead(consumer, sixthMessageID, guuid.New()) + g.TErrorIf(err) + + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 6) + }) + + g.Testing("after replay, deadletters increases the size", func() { + deadletter, err := oneDead(topic, consumer) + g.TErrorIf(err) + + _, err = replay(deadletter.uuid, guuid.New()) + g.TErrorIf(err) + + n, err := size(topic) + g.TErrorIf(err) + g.TAssertEqual(n, 7) + }) +} + +func test_countStmt() { + g.TestStart("countStmt()") + + const ( + topic = "count() topic" + payloadStr = "count() payload" + consumer = "count() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + next, nextClose, nextErr := nextStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + count, countClose, countErr := countStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + nextErr, + commitErr, + toDeadErr, + countErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + nextClose, + commitClose, + toDeadClose, + countClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + + g.Testing("0 on empty topic", func() { + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("other topics don't add to our count", func() { + err := commit(consumer, pub("other topic")) + g.TErrorIf(err) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("other consumers don't influence our count", func() { + g.TErrorIf(take(topic, "other consumer")) + err := commit("other consumer", pub(topic)) + g.TErrorIf(err) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("unconsumed messages don't count", func() { + pub(topic) + pub(topic) + pub(topic) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 0) + }) + + g.Testing("consumed messages do count", func() { + message, err := next(topic, consumer) + g.TErrorIf(err) + err = commit(consumer, message.uuid) + g.TErrorIf(err) + + message, err = next(topic, consumer) + g.TErrorIf(err) + err = commit(consumer, message.uuid) + g.TErrorIf(err) + + message, err = next(topic, consumer) + g.TErrorIf(err) + err = commit(consumer, message.uuid) + g.TErrorIf(err) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 3) + }) + + g.Testing("deadletters count as consumed", func() { + message, err := next(topic, consumer) + g.TErrorIf(err) + + err = toDead(consumer, message.uuid, guuid.New()) + g.TErrorIf(err) + + n, err := count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(n, 4) + }) +} + +func test_hasDataStmt() { + g.TestStart("hasDataStmt()") + + const ( + topic = "hasData() topic" + payloadStr = "hasData() payload" + consumer = "hasData() consumer" + prefix = defaultPrefix + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + g.TErrorIf(createTables(db, prefix)) + + instanceID := os.Getpid() + take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + next, nextClose, nextErr := nextStmt(db, prefix, instanceID) + commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID) + g.TErrorIf(g.SomeError( + takeErr, + publishErr, + nextErr, + commitErr, + toDeadErr, + hasDataErr, + )) + defer g.SomeFnError( + takeClose, + publishClose, + nextClose, + commitClose, + toDeadClose, + hasDataClose, + db.Close, + ) + + pub := func(topic string) guuid.UUID { + g.TErrorIf(take(topic, consumer)) + + unsentWithTopic := unsent + unsentWithTopic.Topic = topic + message, err := publish(unsentWithTopic, guuid.New()) + g.TErrorIf(err) + return message.uuid + } + + + g.Testing("false on empty topic", func() { + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, false) + }) + + g.Testing("other topics don't change the response", func() { + pub("other topic") + + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, false) + }) + + g.Testing("published messages flip the flag", func() { + pub(topic) + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, true) + }) + + g.Testing("other consumers don't influence us", func() { + g.TErrorIf(take(topic, "other consumer")) + message, err := next(topic, "other consumer") + g.TErrorIf(err) + + err = commit("other consumer", message.uuid) + g.TErrorIf(err) + + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, true) + }) + + g.Testing("consuming messages unflips the result", func() { + message, err := next(topic, consumer) + g.TErrorIf(err) + + err = commit(consumer, message.uuid) + g.TErrorIf(err) + + has, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has, false) + }) + + g.Testing("same for deadletters", func() { + has0, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has0, false) + + messageID1 := pub(topic) + messageID2 := pub(topic) + + has1, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has1, true) + + err = toDead(consumer, messageID1, guuid.New()) + g.TErrorIf(err) + err = commit(consumer, messageID2) + g.TErrorIf(err) + + has2, err := hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(has2, false) + }) +} + +func test_initDB() { + g.TestStart("initDB()") + + const ( + topic = "initDB() topic" + payloadStr = "initDB() payload" + consumer = "initDB() consumer" + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + var messages []messageT + notifyFn := func(message messageT) { + messages = append(messages, message) + } + + instanceID := os.Getpid() + queries, err := initDB(db, defaultPrefix, notifyFn, instanceID) + g.TErrorIf(err) + defer queries.close() + + g.TErrorIf(queries.take(topic, consumer)) + + + g.Testing("we can perform all the wrapped operations", func() { + messageID := guuid.New() + newMessageID := guuid.New() + deadletterID := guuid.New() + + messageV1, err := queries.publish(unsent, messageID) + g.TErrorIf(err) + + messageV2, err := queries.next(topic, consumer) + g.TErrorIf(err) + + var messagesV3 []messageT + pendingFn := func(message messageT) error { + messagesV3 = append(messagesV3, message) + return nil + } + err = queries.pending(topic, consumer, pendingFn) + g.TErrorIf(err) + g.TAssertEqual(len(messagesV3), 1) + messageV3 := messagesV3[0] + + err = queries.toDead(consumer, messageID, deadletterID) + g.TErrorIf(err) + + deadletterV1, err := queries.oneDead(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(deadletterV1.uuid, deadletterID) + + var ( + deadlettersV2 []deadletterT + messagesV4 []messageT + ) + deadletterFn := func( + deadletter deadletterT, + message messageT, + ) error { + deadlettersV2 = append(deadlettersV2, deadletter) + messagesV4 = append(messagesV4, message) + return nil + } + err = queries.allDead(topic, consumer, deadletterFn) + g.TErrorIf(err) + g.TAssertEqual(len(deadlettersV2), 1) + g.TAssertEqual(deadlettersV2[0].uuid, deadletterID) + g.TAssertEqual(len(messagesV4), 1) + messageV4 := messagesV4[0] + + g.TAssertEqual(messageV1, messageV2) + g.TAssertEqual(messageV1, messageV3) + g.TAssertEqual(messageV1, messageV4) + + newMessageV1, err := queries.replay(deadletterID, newMessageID) + g.TErrorIf(err) + + err = queries.commit(consumer, newMessageID) + g.TErrorIf(err) + + newMessageV0 := messageV1 + newMessageV0.id = newMessageV1.id + newMessageV0.uuid = newMessageID + newMessageV0.timestamp = newMessageV1.timestamp + g.TAssertEqual(newMessageV1, newMessageV0) + + size, err := queries.size(topic) + g.TErrorIf(err) + g.TAssertEqual(size, 2) + + count, err := queries.count(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(count, 2) + + hasData, err := queries.hasData(topic, consumer) + g.TErrorIf(err) + g.TAssertEqual(hasData, false) + }) +} + +func test_queriesTclose() { + g.TestStart("queriesT.close()") + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + instanceID := os.Getpid() + queries, err := initDB(db, defaultPrefix, func(messageT) {}, instanceID) + g.TErrorIf(err) + + + g.Testing("after closing, we can't run queries", func() { + unsent := UnsentMessage{ Payload: []byte{}, } + _, err := queries.publish(unsent, guuid.New()) + g.TErrorIf(err) + g.TErrorIf(db.Close()) + + err = queries.close() + g.TErrorIf(err) + + _, err = queries.publish(unsent, guuid.New()) + g.TAssertEqual( + err.Error(), + "sql: database is closed", + ) + }) + + g.Testing("closing mre than once does not error", func() { + g.TErrorIf(g.SomeError( + queries.close(), + queries.close(), + )) + }) +} + + +func test_newPinger() { + g.TestStart("newPinger()") + + g.Testing("onPing() on a closed pinger is a noop", func() { + pinger := newPinger[string]() + pinger.close() + pinger.onPing(func(s string) { + panic(s) + }) + pinger.tryPing("ignored") + }) + + g.Testing("onPing() on a closed pinger with data gets that", func() { + pinger := newPinger[string]() + pinger.tryPing("received") + pinger.tryPing("ignored") + g.TAssertEqual(pinger.closed(), false) + + pinger.close() + g.TAssertEqual(pinger.closed(), true) + + c := make(chan string) + count := 0 + go pinger.onPing(func(s string) { + if count > 0 { + panic(s) + } + count++ + + c <- s + }) + + given := <- c + g.TAssertEqual(given, "received") + }) + + g.Testing("when onPing is late, we loose messages", func() { + pinger := newPinger[string]() + pinger.tryPing("first seen") + pinger.tryPing("second dropped") + pinger.tryPing("third dropped") + + c := make(chan string) + go pinger.onPing(func(s string) { + c <- s + }) + s := <- c + + close(c) + pinger.close() + g.TAssertEqual(s, "first seen") + }) + + g.Testing("if onPing is on time, it may not loose", func() { + pinger := newPinger[string]() + pinger.tryPing("first seen") + + c := make(chan string) + go pinger.onPing(func(s string) { + c <- s + }) + + s1 := <- c + pinger.tryPing("second seen") + s2 := <- c + + close(c) + pinger.close() + g.TAssertEqual(s1, "first seen") + g.TAssertEqual(s2, "second seen") + }) + + g.Testing("if onPing takes too long, it still looses messages", func() { + pinger := newPinger[string]() + pinger.tryPing("first seen") + + c1 := make(chan string) + c2 := make(chan struct{}) + go pinger.onPing(func(s string) { + c1 <- s + c2 <- struct{}{} + }) + + s1 := <- c1 + pinger.tryPing("second seen") + pinger.tryPing("third dropped") + <- c2 + s2 := <- c1 + <- c2 + + close(c2) + close(c1) + pinger.close() + g.TAssertEqual(s1, "first seen") + g.TAssertEqual(s2, "second seen") + }) +} + +func test_makeSubscriptionsFunc() { + g.TestStart("makeSubscriptionsFunc()") + + g.Testing("we can have multiple readers", func() { + subscriptions := makeSubscriptionsFuncs() + + var ( + readStarted sync.WaitGroup + readFinished sync.WaitGroup + ) + c := make(chan struct{}) + readFn := func(subscriptionsSetM) error { + readStarted.Done() + <- c + return nil + } + addReader := func() { + readStarted.Add(1) + readFinished.Add(1) + go func() { + subscriptions.read(readFn) + readFinished.Done() + }() + } + + addReader() + addReader() + addReader() + readStarted.Wait() + c <- struct{}{} + c <- struct{}{} + c <- struct{}{} + readFinished.Wait() + }) + + g.Testing("writers are exclusive", func() { + subscriptions := makeSubscriptionsFuncs() + + var ( + readStarted sync.WaitGroup + readFinished sync.WaitGroup + writeWillStart sync.WaitGroup + writeFinished sync.WaitGroup + ) + c := make(chan string) + readFn := func(subscriptionsSetM) error { + readStarted.Done() + c <- "reader" + return nil + } + addReader := func() { + readStarted.Add(1) + readFinished.Add(1) + go func() { + subscriptions.read(readFn) + readFinished.Done() + }() + } + + writeFn := func(subscriptionsSetM) error { + c <- "writer" + return nil + } + addWriter := func() { + writeWillStart.Add(1) + writeFinished.Add(1) + go func() { + writeWillStart.Done() + subscriptions.write(writeFn) + writeFinished.Done() + }() + } + + addReader() + addReader() + addReader() + readStarted.Wait() + addWriter() + writeWillStart.Wait() + + g.TAssertEqual(<-c, "reader") + g.TAssertEqual(<-c, "reader") + g.TAssertEqual(<-c, "reader") + + readFinished.Wait() + g.TAssertEqual(<-c, "writer") + writeFinished.Wait() + }) +} + +func test_makeNotifyFn() { + g.TestStart("makeNotifyFn()") + + g.Testing("when topic is nil only top pinger gets pinged", func() { + pinger1 := newPinger[struct{}]() + pinger2 := newPinger[[]byte]() + defer pinger1.close() + defer pinger2.close() + + go pinger1.onPing(func(struct{}) { + panic("consumer pinger") + }) + go pinger2.onPing(func(payload []byte) { + panic("waiter pinger") + }) + + flowID := guuid.New() + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer-1": consumerT{ + pinger: pinger1, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter-1": waiterT{ + pinger: pinger2, + }, + }, + }, + }, + } + subsFn := func(fn func(subscriptionsSetM) error) error { + return fn(set) + } + topPinger := newPinger[struct{}]() + defer topPinger.close() + + var wg sync.WaitGroup + wg.Add(1) + go topPinger.onPing(func(struct{}) { + wg.Done() + }) + + notifyFn := makeNotifyFn(subsFn, topPinger) + + message := messageT{ + uuid: guuid.New(), + topic: "nobody is subscribed to this one", + payload: []byte("nobody with get this payload"), + } + notifyFn(message) + wg.Wait() + }) + + g.Testing("otherwise all interested subscribers get pinged", func() { + const topic = "the topic name" + + pinger1 := newPinger[struct{}]() + pinger2 := newPinger[[]byte]() + defer pinger1.close() + defer pinger2.close() + + var wg sync.WaitGroup + go pinger1.onPing(func(struct{}) { + wg.Done() + }) + go pinger2.onPing(func([]byte) { + wg.Done() + }) + wg.Add(2) + + flowID := guuid.New() + + set := subscriptionsSetM{ + topic: topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer-1": consumerT{ + pinger: pinger1, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter-1": waiterT{ + pinger: pinger2, + }, + }, + }, + }, + } + + subsFn := func(fn func(subscriptionsSetM) error) error { + return fn(set) + } + + topPinger := newPinger[struct{}]() + defer topPinger.close() + go topPinger.onPing(func(struct{}) { + wg.Done() + }) + wg.Add(1) + + notifyFn := makeNotifyFn(subsFn, topPinger) + + message := messageT{ + uuid: guuid.New(), + topic: topic, + flowID: flowID, + payload: []byte("ignored in this test"), + } + notifyFn(message) + wg.Wait() + }) +} + +func test_collectClosedWaiters() { + g.TestStart("collectClosedWaiter()") + + g.Testing("collects all the reports to be closed", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + flowID4 := guuid.New() + flowID5 := guuid.New() + + mkwaiter := func(closed bool) waiterT { + fn := func() bool { + return closed + } + return waiterT{ + closed: &fn, + } + } + + set := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": mkwaiter(false), + "waiter-2": mkwaiter(true), + "waiter-3": mkwaiter(true), + }, + flowID2: map[string]waiterT{ + "waiter-4": mkwaiter(true), + "waiter-5": mkwaiter(false), + }, + }, + }, + "topic-2": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID3: map[string]waiterT{ + "waiter-1": mkwaiter(false), + "waiter-2": mkwaiter(false), + }, + flowID4: map[string]waiterT{ + "waiter-3": mkwaiter(true), + "waiter-4": mkwaiter(true), + "waiter-5": mkwaiter(true), + "waiter-6": mkwaiter(true), + }, + }, + }, + "topic-3": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID5: map[string]waiterT{}, + }, + }, + "topic-4": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := map[string]map[guuid.UUID][]string{ + "topic-1": map[guuid.UUID][]string{ + flowID1: []string{ + "waiter-2", + "waiter-3", + }, + flowID2: []string{ + "waiter-4", + }, + }, + "topic-2": map[guuid.UUID][]string{ + flowID3: []string{}, + flowID4: []string{ + "waiter-3", + "waiter-4", + "waiter-5", + "waiter-6", + }, + }, + "topic-3": map[guuid.UUID][]string{ + flowID5: []string{}, + }, + "topic-4": map[guuid.UUID][]string{}, + } + + given := collectClosedWaiters(set) + + // sort names for equality + for _, waitersIndex := range given { + for _, names := range waitersIndex { + sort.Strings(names) + } + } + g.TAssertEqual(given, expected) + }) +} + +func test_trimEmptyLeaves() { + g.TestStart("trimEmptyLeaves()") + + g.Testing("noop on an empty index", func() { + input := map[string]map[guuid.UUID][]string{} + expected := map[string]map[guuid.UUID][]string{} + + trimEmptyLeaves(input) + g.TAssertEqual(input, expected) + }) + + g.Testing("simplifies tree when it can", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + flowID4 := guuid.New() + + input := map[string]map[guuid.UUID][]string{ + "topic-1": map[guuid.UUID][]string{ + flowID1: []string{ + "waiter-1", + }, + flowID2: []string{}, + }, + "topic-2": map[guuid.UUID][]string{ + flowID3: []string{}, + flowID4: []string{}, + }, + "topic-3": map[guuid.UUID][]string{}, + } + + expected := map[string]map[guuid.UUID][]string{ + "topic-1": map[guuid.UUID][]string{ + flowID1: []string{ + "waiter-1", + }, + }, + } + + trimEmptyLeaves(input) + g.TAssertEqual(input, expected) + }) + + g.Testing("fully prune tree if possible", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + flowID4 := guuid.New() + flowID5 := guuid.New() + + input := map[string]map[guuid.UUID][]string{ + "topic-1": map[guuid.UUID][]string{}, + "topic-2": map[guuid.UUID][]string{}, + "topic-3": map[guuid.UUID][]string{}, + "topic-4": map[guuid.UUID][]string{ + flowID1: []string{}, + }, + "topic-5": map[guuid.UUID][]string{ + flowID2: []string{}, + flowID3: []string{}, + flowID4: []string{}, + flowID5: []string{}, + }, + } + + expected := map[string]map[guuid.UUID][]string{} + + trimEmptyLeaves(input) + g.TAssertEqual(input, expected) + }) +} + +func test_deleteIfEmpty() { + g.TestStart("deleteIfEmpty()") + + g.Testing("noop if there are consumers", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + }, + } + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + }, + } + expected2 := subscriptionsSetM{} + + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected1) + + delete(set["topic"].consumers, "consumer") + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected2) + }) + + g.Testing("noop if there are waiters", func() { + flowID := guuid.New() + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: nil, + }, + }, + } + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: nil, + }, + }, + } + expected2 := subscriptionsSetM{} + + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected1) + + delete(set["topic"].waiters, flowID) + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected2) + }) + + g.Testing("otherwise deletes when empty", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{} + + deleteIfEmpty(set, "topic") + g.TAssertEqual(set, expected) + }) + + g.Testing("unrelated topics are left untouched", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + deleteIfEmpty(set, "another-topic") + g.TAssertEqual(set, expected) + }) +} + +func test_deleteEmptyTopics() { + g.TestStart("deleteEmptyTopics()") + + g.Testing("cleans up all empty topics from the set", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + + set := subscriptionsSetM{ + "empty": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + "has-consumers": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + "has-waiters": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: nil, + }, + }, + "has-both": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID2: nil, + }, + }, + "has-neither": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{ + "has-consumers": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + "has-waiters": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: nil, + }, + }, + "has-both": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID2: nil, + }, + }, + } + + deleteEmptyTopics(set) + g.TAssertEqual(set, expected) + }) +} + +func test_removeClosedWaiter() { + g.TestStart("removeClosedWaiter()") + + g.Testing("removes from set all that we request", func() { + flowID0 := guuid.New() + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + + set := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": waiterT{}, + "waiter-2": waiterT{}, + }, + flowID2: map[string]waiterT{ + "waiter-3": waiterT{}, + "waiter-4": waiterT{}, + "waiter-5": waiterT{}, + }, + }, + }, + "topic-2": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID3: map[string]waiterT{ + "waiter-6": waiterT{}, + "waiter-7": waiterT{}, + "waiter-8": waiterT{}, + }, + }, + }, + "topic-3": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + input := map[string]map[guuid.UUID][]string{ + "topic-0": map[guuid.UUID][]string{ + flowID0: []string{ + "waiter-0", + }, + }, + "topic-1": map[guuid.UUID][]string{ + flowID1: []string{ + "waiter-2", + }, + flowID2: []string{ + "waiter-3", + "waiter-4", + "waiter-5", + }, + }, + "topic-2": map[guuid.UUID][]string{ + flowID3: []string{ + "waiter-6", + "waiter-7", + "waiter-8", + }, + }, + } + + expected := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": waiterT{}, + }, + }, + }, + } + + removeClosedWaiters(set, input) + g.TAssertEqual(set, expected) + }) + + g.Testing("empty flowIDs from input GET LEAKED", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + + input := map[string]map[guuid.UUID][]string{ + "topic-2": map[guuid.UUID][]string{ + flowID2: []string{ + "waiter", + }, + }, + } + + set := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{}, + }, + }, + "topic-2": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID2: map[string]waiterT{ + "waiter": waiterT{}, + }, + }, + }, + } + + expected := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{}, + }, + }, + } + + removeClosedWaiters(set, input) + g.TAssertEqual(set, expected) + }) +} + +func test_reapClosedWaiters() { + g.TestStart("reapClosedWaiters()") + + g.Testing("trimEmptyLeaves() prevents removal of empty flows", func() { + var ( + set subscriptionsSetM + readCount = 0 + writeCount = 0 + ) + readFn := func(fn func(subscriptionsSetM) error) error { + readCount++ + return fn(set) + } + writeFn := func(fn func(subscriptionsSetM) error) error { + writeCount++ + return fn(set) + } + + openFn := func() bool { + return false + } + closedFn := func() bool { + return true + } + open := waiterT{ closed: &openFn } + closed := waiterT{ closed: &closedFn } + flowID1 := guuid.New() + flowID2 := guuid.New() + + set = subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": open, + "waiter-2": open, + "waiter-3": open, + }, + flowID2: map[string]waiterT{ + "waiter-4": open, + }, + }, + }, + } + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": open, + "waiter-2": open, + "waiter-3": open, + }, + flowID2: map[string]waiterT{ + "waiter-4": open, + }, + }, + }, + } + + expected2 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-2": open, + "waiter-3": open, + }, + flowID2: map[string]waiterT{ + "waiter-4": open, + }, + }, + }, + } + + expected3 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-2": open, + "waiter-3": open, + }, + }, + }, + } + + reapClosedWaiters(readFn, writeFn) + g.TAssertEqual(readCount, 1) + g.TAssertEqual(writeCount, 0) + g.TAssertEqual(set, expected1) + + set["topic"].waiters[flowID1]["waiter-1"] = closed + reapClosedWaiters(readFn, writeFn) + g.TAssertEqual(readCount, 2) + g.TAssertEqual(writeCount, 1) + g.TAssertEqual(set, expected2) + + set["topic"].waiters[flowID2]["waiter-4"] = closed + reapClosedWaiters(readFn, writeFn) + g.TAssertEqual(readCount, 3) + g.TAssertEqual(writeCount, 2) + g.TAssertEqual(set, expected3) + }) +} + +func test_everyNthCall() { + g.TestStart("everyNthCall()") + + g.Testing("0 (incorrect) would make fn never be called", func() { + never := everyNthCall[int](0, func(int) { + panic("unreachable") + }) + for i := 0; i < 100; i++ { + never(i) + } + }) + + g.Testing("the first call is delayed to be the nth", func() { + count := 0 + values := []string{} + fn := everyNthCall[string](3, func(s string) { + count++ + values = append(values, s) + }) + + fn("1 ignored") + g.TAssertEqual(count, 0) + fn("2 ignored") + g.TAssertEqual(count, 0) + fn("3 not ignored") + g.TAssertEqual(count, 1) + + fn("4 ignored") + fn("5 ignored") + g.TAssertEqual(count, 1) + + fn("6 not ignored") + fn("7 ignored") + fn("8 ignored") + g.TAssertEqual(count, 2) + + fn("9 not ignored") + g.TAssertEqual(count, 3) + + expected := []string{ + "3 not ignored", + "6 not ignored", + "9 not ignored", + } + g.TAssertEqual(values, expected) + }) +} + +func test_runReaper() { + g.TestStart("runReaper()") + + g.Testing("debounce reapClosedWaiters `reaperSkipCount` times", func() { + set := subscriptionsSetM{} + + var ( + readCount = 0 + writeCount = 0 + ) + readFn := func(fn func(subscriptionsSetM) error) error { + readCount++ + return fn(set) + } + writeFn := func(fn func(subscriptionsSetM) error) error { + writeCount++ + return fn(set) + } + + var iterCount int + onPing := func(fn func(struct{})) { + for i := 0; i < iterCount; i++ { + fn(struct{}{}) + } + } + + iterCount = reaperSkipCount - 1 + runReaper(onPing, readFn, writeFn) + g.TAssertEqual(readCount, 0) + g.TAssertEqual(writeCount, 0) + + iterCount = reaperSkipCount + runReaper(onPing, readFn, writeFn) + g.TAssertEqual(readCount, 1) + g.TAssertEqual(writeCount, 0) + }) +} + +func test_NewWithPrefix() { + g.TestStart("NewWithPrefix()") + + g.Testing("we get an error with a bad prefix", func() { + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + _, err = NewWithPrefix(db, "a bad prefix") + g.TAssertEqual(err, g.ErrBadSQLTablePrefix) + }) + + g.Testing("otherwise we have a queueT and no errors", func() { + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + queue, err := NewWithPrefix(db, "good") + g.TErrorIf(err) + queue.Close() + + g.TAssertEqual(queue.(queueT).pinger.closed(), true) + }) +} + +func test_New() { + g.TestStart("New()") + + g.Testing("smoke test that we get a queueT", func() { + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + queue.Close() + + g.TAssertEqual(queue.(queueT).pinger.closed(), true) + }) +} + +func test_asPublicMessage() { + g.TestStart("asPublicMessage()") + + g.Testing("it picks the correct fields 🤷", func() { + input := messageT{ + uuid: guuid.New(), + timestamp: time.Now(), + topic: "topic", + flowID: guuid.New(), + payload: []byte("payload"), + } + + expected := Message{ + ID: input.uuid, + Timestamp: input.timestamp, + Topic: input.topic, + FlowID: input.flowID, + Payload: input.payload, + } + + given := asPublicMessage(input) + g.TAssertEqual(given, expected) + }) +} + +func test_queueT_Publish() { + g.TestStart("queueT.Publish()") + + const ( + topic = "queueT.Publish() topic" + payloadStr = "queueT.Publish() payload" + ) + var ( + flowID = guuid.New() + payload = []byte(payloadStr) + unsent = UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + ) + + db, err := sql.Open("acude", ":memory:") + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + defer queue.Close() + + + g.Testing("it just publishes and returns", func() { + message, err := queue.Publish(unsent) + g.TErrorIf(err) + + g.TAssertEqual(message.Timestamp == time.Time{}, false) + g.TAssertEqual(message.Topic, topic) + g.TAssertEqual(message.FlowID, flowID) + g.TAssertEqual(message.Payload, payload) + }) + + queue.Close() + g.TAssertEqual(queue.(queueT).pinger.closed(), true) +} + +func test_registerConsumerFn() { + g.TestStart("registerConsumerFn()") + + g.Testing("adds a new topicSubscriptionT{} if needed", func() { + consumer := consumerT{ + data: consumerDataT{ + topic: "topic", + name: "consumer", + }, + } + + set := subscriptionsSetM{} + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumer, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + registerConsumerFn(consumer)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("otherwise it just uses what exists", func() { + flowID := guuid.New() + + consumer := consumerT{ + data: consumerDataT{ + topic: "topic", + name: "consumer", + }, + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "other-consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{}, + }, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumer, + "other-consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{}, + }, + }, + } + + registerConsumerFn(consumer)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("overwrites existing consumer if desired", func() { + close1 := func() {} + consumer1 := consumerT{ + data: consumerDataT{ + topic: "topic", + name: "consumer", + }, + close: &close1, + } + + close2 := func() {} + consumer2 := consumerT{ + data: consumerDataT{ + topic: "topic", + name: "consumer", + }, + close: &close2, + } + + set := subscriptionsSetM{} + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumer1, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected2 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumer2, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + registerConsumerFn(consumer1)(set) + g.TAssertEqual(set, expected1) + g.TAssertEqual(reflect.DeepEqual(set, expected2), false) + + registerConsumerFn(consumer2)(set) + g.TAssertEqual(set, expected2) + g.TAssertEqual(reflect.DeepEqual(set, expected1), false) + }) +} + +func test_registerWaiterFn() { + g.TestStart("registerWaiterFn()") + + g.Testing("adds a new topicSubscriptionT{} if needed", func() { + flowID := guuid.New() + + waiter := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + } + + set := subscriptionsSetM{} + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter, + }, + }, + }, + } + + registerWaiterFn(waiter)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("adds a new waiters map if needed", func() { + flowID := guuid.New() + + waiter := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter, + }, + }, + }, + } + + registerWaiterFn(waiter)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("otherwise it just uses what exists", func() { + flowID := guuid.New() + + waiter := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "other-waiter": waiterT{}, + }, + }, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter, + "other-waiter": waiterT{}, + }, + }, + }, + } + + registerWaiterFn(waiter)(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("overwrites existing waiter if desired", func() { + flowID := guuid.New() + + close1 := func() {} + waiter1 := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + close: &close1, + } + + close2 := func() {} + waiter2 := waiterT{ + data: waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter", + }, + close: &close2, + } + + set := subscriptionsSetM{} + + expected1 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter1, + }, + }, + }, + } + + expected2 := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{ + "waiter": waiter2, + }, + }, + }, + } + + registerWaiterFn(waiter1)(set) + g.TAssertEqual(set, expected1) + g.TAssertEqual(reflect.DeepEqual(set, expected2), false) + + registerWaiterFn(waiter2)(set) + g.TAssertEqual(set, expected2) + g.TAssertEqual(reflect.DeepEqual(set, expected1), false) + }) +} + +func test_makeConsumeOneFn() { + g.TestStart("makeConsumeOneFn()") + + savedLogger := slog.Default() + + s := new(strings.Builder) + g.SetLoggerOutput(s) + + var ( + successCount int + errorCount int + callbackErr error + successFnErr error + errorFnErr error + messages []Message + successNames []string + successIDs []guuid.UUID + errorNames []string + errorIDs []guuid.UUID + deadletterIDs []guuid.UUID + ) + + data := consumerDataT{ + topic: "topic", + name: "name", + } + + callback := func(message Message) error { + messages = append(messages, message) + return callbackErr + } + + successFn := func(name string, messageID guuid.UUID) error { + successCount++ + successNames = append(successNames, name) + successIDs = append(successIDs, messageID) + return successFnErr + } + + errorFn := func( + name string, + messageID guuid.UUID, + deadletterID guuid.UUID, + ) error { + errorCount++ + errorNames = append(errorNames, name) + errorIDs = append(errorIDs, messageID) + deadletterIDs = append(deadletterIDs, deadletterID) + + return errorFnErr + } + + consumeOneFn := makeConsumeOneFn( + data, + callback, + successFn, + errorFn, + ) + + message1 := messageT{ uuid: guuid.New() } + message2 := messageT{ uuid: guuid.New() } + message3 := messageT{ uuid: guuid.New() } + message4 := messageT{ uuid: guuid.New() } + + + g.Testing("error from successFn() is propagated", func() { + err := consumeOneFn(message1) + g.TErrorIf(err) + g.TAssertEqual(successCount, 1) + g.TAssertEqual(errorCount, 0) + + successFnErr = errors.New("successFn() error") + err = consumeOneFn(message2) + g.TAssertEqual(err, successFnErr) + g.TAssertEqual(successCount, 2) + g.TAssertEqual(errorCount, 0) + + g.TAssertEqual(s.String(), "") + }) + + g.Testing("error from callback() is logged and dropped", func() { + *s = strings.Builder{} + + callbackErr = errors.New("callback() error") + err := consumeOneFn(message3) + g.TErrorIf(err) + g.TAssertEqual(successCount, 2) + g.TAssertEqual(errorCount, 1) + g.TAssertEqual(s.String() == "", false) + }) + + g.Testing("error from errorFn() is propagated", func() { + *s = strings.Builder{} + + errorFnErr = errors.New("errorFn() error") + err := consumeOneFn(message4) + g.TAssertEqual(err, errorFnErr) + g.TAssertEqual(successCount, 2) + g.TAssertEqual(errorCount, 2) + g.TAssertEqual(s.String() == "", false) + }) + + g.Testing("calls happened with the expected arguments", func() { + expectedMessages := []Message{ + asPublicMessage(message1), + asPublicMessage(message2), + asPublicMessage(message3), + asPublicMessage(message4), + } + + g.TAssertEqual(messages, expectedMessages) + g.TAssertEqual(successNames, []string{ "name", "name" }) + g.TAssertEqual(errorNames, []string{ "name", "name" }) + g.TAssertEqual(successIDs, []guuid.UUID{ + message1.uuid, + message2.uuid, + }) + g.TAssertEqual(errorIDs, []guuid.UUID{ + message3.uuid, + message4.uuid, + }) + g.TAssertEqual(deadletterIDs[0] == message3.uuid, false) + g.TAssertEqual(deadletterIDs[1] == message4.uuid, false) + }) + + slog.SetDefault(savedLogger) +} + +func test_makeConsumeAllFn() { + g.TestStart("makeConsumeAllFn()") + + savedLogger := slog.Default() + + s := new(strings.Builder) + g.SetLoggerOutput(s) + + data := consumerDataT{} + + consumeOneFn := func(messageT) error { + return nil + } + + var eachFnErr error + eachFn := func(string, string, func(messageT) error) error { + return eachFnErr + } + + consumeAllFn := makeConsumeAllFn(data, consumeOneFn, eachFn) + + + g.Testing("silent when eachFn() returns no error", func() { + consumeAllFn(struct{}{}) + g.TAssertEqual(s.String(), "") + }) + + g.Testing("logs warning otherwise", func() { + eachFnErr = errors.New("eachFn() error") + consumeAllFn(struct{}{}) + g.TAssertEqual(s.String() == "", false) + }) + + slog.SetDefault(savedLogger) +} + +func test_makeWaitFn() { + g.TestStart("makeWaitFn()") + + g.Testing("all it does is send the data and close things", func() { + callCount := 0 + closeFn := func() { + callCount++ + } + + c := make(chan []byte, 1) + waitFn := makeWaitFn(c, closeFn) + + payload := []byte("payload") + waitFn(payload) + given := <- c + + g.TAssertEqual(given, payload) + g.TAssertEqual(callCount, 1) + }) + + g.Testing("you can call it twice for cases of race condition", func() { + callCount := 0 + closeFn := func() { + callCount++ + } + + c := make(chan []byte, 1) + waitFn := makeWaitFn(c, closeFn) + + payload := []byte("something") + waitFn(payload) + waitFn(payload) + given1 := <- c + given2 := <- c + + g.TAssertEqual(given1, payload) + g.TAssertEqual(given2, []byte(nil)) + }) +} + +func test_runConsumer() { + g.TestStart("runConsumer()") + + g.Testing("calls consumeAllFn() at least one", func() { + onPing := func(fn func(struct{})) {} + + count := 0 + consumeAllFn := func(struct{}) { + count++ + } + + runConsumer(onPing, consumeAllFn) + g.TAssertEqual(count, 1) + }) + + g.Testing("can call consumeAllFn() (onPing + 1) times", func() { + onPing := func(fn func(struct{})) { + fn(struct{}{}) + fn(struct{}{}) + fn(struct{}{}) + } + + count := 0 + consumeAllFn := func(struct{}) { + count++ + } + + runConsumer(onPing, consumeAllFn) + g.TAssertEqual(count, 4) + }) +} + +func test_tryFinding() { + g.TestStart("tryFinding()") + + g.Testing("noop in case of failure", func() { + myErr := errors.New("find() error") + findFn := func(string, guuid.UUID) (messageT, error) { + return messageT{}, myErr + } + + count := 0 + waitFn := func([]byte) { + count++ + } + + + tryFinding(findFn, "topic", guuid.New(), waitFn) + g.TAssertEqual(count, 0) + }) + + g.Testing("calls waitFn in case of success", func() { + payload := []byte("find() payload") + + findFn := func(string, guuid.UUID) (messageT, error) { + return messageT{ payload: payload }, nil + } + + payloads := [][]byte{} + waitFn := func(payload []byte) { + payloads = append(payloads, payload) + } + + + tryFinding(findFn, "topic", guuid.New(), waitFn) + g.TAssertEqual(payloads, [][]byte{ payload }) + }) +} + +func test_queueT_Subscribe() { + g.TestStart("queueT.Subscribe()") + + set := subscriptionsSetM{} + consumed := []guuid.UUID{} + messages := []messageT{ + messageT{ uuid: guuid.New() }, + messageT{ uuid: guuid.New() }, + } + + var takeErr error + queue := queueT{ + queries: queriesT{ + take: func(string, string) error { + return takeErr + }, + pending: func( + topic string, + consumer string, + fn func(messageT) error, + ) error { + for _, message := range messages { + consumed = append( + consumed, + message.uuid, + ) + _ = fn(message) + } + return nil + }, + commit: func( + consumer string, + messageID guuid.UUID, + ) error { + return nil + }, + toDead: func(string, guuid.UUID, guuid.UUID) error { + g.Unreachable() + return nil + }, + }, + subscriptions: subscriptionsT{ + write: func(fn func(subscriptionsSetM) error) error { + return fn(set) + }, + }, + } + + + g.Testing("registers our callback in the set and calls it", func() { + var wg sync.WaitGroup + wg.Add(2) + + queue.Subscribe("topic", "consumer-1", func(Message) error { + wg.Done() + return nil + }) + defer queue.Unsubscribe("topic", "consumer-1") + + wg.Wait() + g.TAssertEqual(consumed, []guuid.UUID{ + messages[0].uuid, + messages[1].uuid, + }) + }) + + g.Testing("our callback also gets called when pinged", func() { + consumed = []guuid.UUID{} + + var wg sync.WaitGroup + wg.Add(4) + + queue.Subscribe("topic", "consumer-2", func(m Message) error { + wg.Done() + return nil + }) + defer queue.Unsubscribe("topic", "consumer-2") + + consumer := set["topic"].consumers["consumer-2"] + consumer.pinger.tryPing(struct{}{}) + + wg.Wait() + + g.TAssertEqual(consumed, []guuid.UUID{ + messages[0].uuid, + messages[1].uuid, + messages[0].uuid, + messages[1].uuid, + }) + }) + + g.Testing("we try to own the topic", func() { + takeErr = errors.New("queueT.Subscribe() 1") + + err := queue.Subscribe("topic", "name", func(Message) error { + g.Unreachable() + return nil + }) + + g.TAssertEqual(err, takeErr) + takeErr = nil + }) + + g.Testing("if we can't own the topic, we don't get registered", func() { + takeErr = errors.New("queueT.Subscribe() 2") + + err := queue.Subscribe("topic", "name", func(Message) error { + g.Unreachable() + return nil + }) + g.TErrorNil(err) + + expected := subscriptionsSetM{} + g.TAssertEqual(set, expected) + + takeErr = nil + }) +} + +func test_queueT_WaitFor() { + g.TestStart("queueT.WaitFor()") + + findErr := errors.New("find() error") + message := messageT{ + payload: []byte("queueT.WaitFor() payload"), + } + + set := subscriptionsSetM{} + + queue := queueT{ + queries: queriesT{ + find: func( + topic string, + flowID guuid.UUID, + ) (messageT, error) { + return message, findErr + }, + }, + subscriptions: subscriptionsT{ + write: func(fn func(subscriptionsSetM) error) error { + return fn(set) + }, + read: func(fn func(subscriptionsSetM) error) error { + return fn(set) + }, + }, + } + + + g.Testing("registers the waiter in the set", func() { + flowID := guuid.New() + + defer queue.WaitFor("topic", flowID, "waiter-1").Close() + + expected := waiterDataT{ + topic: "topic", + flowID: flowID, + name: "waiter-1", + } + + g.TAssertEqual( + set["topic"].waiters[flowID]["waiter-1"].data, + expected, + ) + }) + + g.Testing("the channel gets a message when waiter is pinged", func() { + flowID := guuid.New() + payload := []byte("sent payload") + + w := queue.WaitFor("topic", flowID, "waiter-2") + defer w.Close() + + waiter := set["topic"].waiters[flowID]["waiter-2"] + waiter.pinger.tryPing(payload) + + given := <- w.Channel + + g.TAssertEqual(given, payload) + g.TAssertEqual((*waiter.closed)(), true) + }) + + g.Testing("we can also WaitFor() after publishing the message", func() { + findErr = nil + flowID := guuid.New() + + w := queue.WaitFor("topic", flowID, "waiter-3") + defer w.Close() + + given := <- w.Channel + + waiter := set["topic"].waiters[flowID]["waiter-3"] + waiter.pinger.tryPing([]byte("ignored")) + + g.TAssertEqual(given, message.payload) + g.TAssertEqual((*waiter.closed)(), true) + }) + + g.Testing("if the data already exists we get it immediatelly", func() { + flowID := guuid.New() + + w := queue.WaitFor("topic", flowID, "waiter-4") + defer w.Close() + + waiter := set["topic"].waiters[flowID]["waiter-4"] + g.TAssertEqual((*waiter.closed)(), true) + + given := <- w.Channel + g.TAssertEqual(given, message.payload) + }) +} + +func test_unsubscribeIfExistsFn() { + g.TestStart("unsubscribeIfExistsFn()") + + g.Testing("noop on empty set", func() { + set := subscriptionsSetM{} + expected := subscriptionsSetM{} + unsubscribeIfExistsFn("topic", "consumer")(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("noop on missing topic", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{}, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{}, + } + + unsubscribeIfExistsFn("other-topic", "consumer")(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("noop on missing consumer", func() { + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{}, + }, + }, + } + + unsubscribeIfExistsFn("topic", "other-consumer")(set) + g.TAssertEqual(set, expected) + }) + + g.Testing("closes consumer and removes it from set", func() { + flowID := guuid.New() + + count := 0 + close := func() { + count++ + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{ + close: &close, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{}, + }, + }, + } + + expected := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID: map[string]waiterT{}, + }, + }, + } + + unsubscribeIfExistsFn("topic", "consumer")(set) + g.TAssertEqual(set, expected) + g.TAssertEqual(count, 1) + }) + + g.Testing("empty topics are also removed", func() { + count := 0 + close := func() { + count++ + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{ + close: &close, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{} + + unsubscribeIfExistsFn("topic", "consumer")(set) + g.TAssertEqual(set, expected) + g.TAssertEqual(count, 1) + }) +} + +func test_queueT_Unsubscribe() { + g.TestStart("queueT.Unsubscribe()") + + g.Testing("calls unsubscribesIfExists() via writeFn()", func() { + closed := false + close := func() { + closed = true + } + + set := subscriptionsSetM{ + "topic": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer": consumerT{ + close: &close, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + expected := subscriptionsSetM{} + + queue := queueT{ + subscriptions: subscriptionsT{ + write: func( + fn func(subscriptionsSetM) error, + ) error { + return fn(set) + }, + }, + } + + queue.Unsubscribe("topic", "consumer") + g.TAssertEqual(set, expected) + g.TAssertEqual(closed, true) + }) +} + +func test_cleanSubscriptions() { + g.TestStart("cleanSubscriptions()") + + g.Testing("all consumers and waiters get close()'d", func() { + flowID1 := guuid.New() + flowID2 := guuid.New() + flowID3 := guuid.New() + + type pairT struct{ + closed func() bool + fn func() + } + + mkclose := func() pairT { + closed := false + return pairT{ + closed: func() bool { + return closed + }, + fn: func() { + closed = true + }, + } + } + + c := mkclose() + g.TAssertEqual(c.closed(), false) + c.fn() + var x bool = c.closed() + g.TAssertEqual(x, true) + + close1 := mkclose() + close2 := mkclose() + close3 := mkclose() + close4 := mkclose() + close5 := mkclose() + close6 := mkclose() + close7 := mkclose() + close8 := mkclose() + + set := subscriptionsSetM{ + "topic-1": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer-1": consumerT{ + close: &close1.fn, + }, + "consumer-2": consumerT{ + close: &close2.fn, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID1: map[string]waiterT{ + "waiter-1": waiterT{ + close: &close3.fn, + }, + }, + flowID2: map[string]waiterT{ + "waiter-2": waiterT{ + close: &close4.fn, + }, + "waiter-3": waiterT{ + close: &close5.fn, + }, + }, + }, + }, + "topic-2": topicSubscriptionT{ + consumers: map[string]consumerT{ + "consumer-3": consumerT{ + close: &close6.fn, + }, + }, + waiters: map[guuid.UUID]map[string]waiterT{ + flowID3: map[string]waiterT{ + "waiter-4": waiterT{ + close: &close7.fn, + }, + }, + }, + }, + "topic-3": topicSubscriptionT{ + consumers: map[string]consumerT{}, + waiters: map[guuid.UUID]map[string]waiterT{}, + }, + } + + cleanSubscriptions(set) + + given := []bool{ + close1.closed(), + close2.closed(), + close3.closed(), + close4.closed(), + close5.closed(), + close6.closed(), + close7.closed(), + close8.closed(), + } + + expected := []bool{ + true, + true, + true, + true, + true, + true, + true, + false, + } + + g.TAssertEqualI(given, expected) + }) +} + +func test_queueT_Close() { + g.TestStart("queueT.Close()") + + g.Testing("clean pinger, subscriptions and queries", func() { + var ( + pingerCount = 0 + subscriptionsCount = 0 + queriesCount = 0 + + subscriptionsErr = errors.New("subscriptionsT{} error") + queriesErr = errors.New("queriesT{} error") + ) + queue := queueT{ + queries: queriesT{ + close: func() error{ + queriesCount++ + return queriesErr + }, + }, + subscriptions: subscriptionsT{ + write: func( + func(subscriptionsSetM) error, + ) error { + subscriptionsCount++ + return subscriptionsErr + }, + }, + pinger: pingerT[struct{}]{ + close: func() { + pingerCount++ + }, + }, + } + + err := queue.Close() + g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr)) + g.TAssertEqual(pingerCount, 1) + g.TAssertEqual(subscriptionsCount, 1) + g.TAssertEqual(queriesCount, 1) + }) +} + + +func test_topicGetopt() { + g.TestStart("topicGetopt()") + + g.Testing("checks for required positional argument", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{}, + } + + argsOut, ok := topicGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "Missing TOPIC.\n") + g.TAssertEqual(ok, false) + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("success otherwise", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"a topic"}, + } + + argsOut, ok := topicGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(ok, true) + argsIn.topic = "a topic" + g.TAssertEqual(argsOut, argsIn) + }) +} + +func test_topicConsumerGetopt() { + g.TestStart("topicConsumerGetopt()") + + g.Testing("checks for TOPIC argument", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{}, + } + + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "Missing TOPIC.\n") + g.TAssertEqual(ok, false) + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("we get an error on unsupported flag", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"-Z"}, + } + + const message = "flag provided but not defined: -Z\n" + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), message) + g.TAssertEqual(ok, false) + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("we also get an error on incorrect usage of flags", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"-C"}, + } + + const message = "flag needs an argument: -C\n" + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), message) + g.TAssertEqual(ok, false) + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("we can customize the CONSUMER", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"-C", "custom consumer", "this topic"}, + } + + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(ok, true) + argsIn.topic = "this topic" + argsIn.consumer = "custom consumer" + g.TAssertEqual(argsOut, argsIn) + }) + + g.Testing("otherwise we get the default one", func() { + var w strings.Builder + argsIn := argsT{ + args: []string{"T"}, + } + + argsOut, ok := topicConsumerGetopt(argsIn, &w) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(ok, true) + argsIn.topic = "T" + argsIn.consumer = "default-consumer" + g.TAssertEqual(argsOut, argsIn) + }) +} + +func test_inExec() { + g.TestStart("inExec()") + + const ( + topic = "inExec topic" + payloadStr = "inExec payload" + ) + var ( + payload = []byte(payloadStr) + args = argsT{ + topic: topic, + } + ) + + var ( + publishErr error + messages []messageT + id int64 = 0 + ) + queries := queriesT{ + publish: func( + unsent UnsentMessage, + messageID guuid.UUID, + ) (messageT, error) { + if publishErr != nil { + return messageT{}, publishErr + } + + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: messageID, + topic: unsent.Topic, + flowID: unsent.FlowID, + payload: unsent.Payload, + } + messages = append(messages, message) + return message, nil + }, + } + + + g.Testing("messageID to output when successful", func() { + r := bytes.NewReader(payload) + var w strings.Builder + rc, err := inExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(messages[0].topic, topic) + g.TAssertEqual(messages[0].payload, payload) + g.TAssertEqual(rc, 0) + g.TAssertEqual(w.String(), messages[0].uuid.String() + "\n") + }) + + g.Testing("if reading fails, we return the error", func() { + var r *os.File + var w strings.Builder + rc, err := inExec(args, queries, r, &w) + g.TAssertEqual( + err.Error(), + "invalid argument", + ) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) + + g.Testing("if publishing fails, we return the error", func() { + publishErr = errors.New("publish() error") + r := strings.NewReader("read but not published") + var w strings.Builder + rc, err := inExec(args, queries, r, &w) + g.TAssertEqual(err, publishErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_outExec() { + g.TestStart("outExec()") + + const ( + topic = "outExec topic" + consumer = "outExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + takeErr error + nextErr error + messages []messageT + id int64 = 0 + ) + queries := queriesT{ + take: func(string, string) error { + return takeErr + }, + next: func(string, string) (messageT, error) { + if nextErr != nil { + return messageT{}, nextErr + } + + if len(messages) == 0 { + return messageT{}, sql.ErrNoRows + } + + return messages[0], nil + }, + } + pub := func(payload []byte) { + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + + + g.Testing("exit code 1 when we can't take", func() { + takeErr = errors.New("outExec() take error") + + var w strings.Builder + + rc, err := outExec(args, queries, r, &w) + g.TAssertEqual(err, takeErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + + takeErr = nil + }) + + g.Testing("exit code 3 when no message is available", func() { + var w strings.Builder + + rc, err := outExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 3) + }) + + g.Testing("we get the same message until we commit", func() { + var ( + w1 strings.Builder + w2 strings.Builder + w3 strings.Builder + ) + args := argsT{ + topic: topic, + consumer: consumer, + } + + pub([]byte("first payload")) + pub([]byte("second payload")) + + rc1, err1 := outExec(args, queries, r, &w1) + rc2, err2 := outExec(args, queries, r, &w2) + messages = messages[1:] + rc3, err3 := outExec(args, queries, r, &w3) + + g.TErrorIf(g.SomeError(err1, err2, err3)) + g.TAssertEqual(w1.String(), "first payload\n") + g.TAssertEqual(w2.String(), "first payload\n") + g.TAssertEqual(w3.String(), "second payload\n") + g.TAssertEqual(rc1, 0) + g.TAssertEqual(rc2, 0) + g.TAssertEqual(rc3, 0) + }) + + g.Testing("we propagate the error when the query fails", func() { + nextErr = errors.New("next() error") + var w strings.Builder + rc, err := outExec(args, queries, r, &w) + g.TAssertEqual(err, nextErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_commitExec() { + g.TestStart("commitExec()") + + const ( + topic = "commitExec topic" + consumer = "commitExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + takeErr error + nextErr error + commitErr error + messages []messageT + id int64 = 0 + ) + queries := queriesT{ + take: func(string, string) error { + return takeErr + }, + next: func(string, string) (messageT, error) { + if nextErr != nil { + return messageT{}, nextErr + } + + if len(messages) == 0 { + return messageT{}, sql.ErrNoRows + } + + return messages[0], nil + }, + commit: func(string, guuid.UUID) error { + if commitErr != nil { + return commitErr + } + + messages = messages[1:] + return nil + }, + } + pub := func(payload []byte) { + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + + + g.Testing("error when we can't take", func() { + takeErr = errors.New("commitExec() take error") + + var w strings.Builder + + rc, err := commitExec(args, queries, r, &w) + g.TAssertEqual(err, takeErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + + takeErr = nil + }) + + g.Testing("error when there is nothing to commit", func() { + var w strings.Builder + + rc, err := commitExec(args, queries, r, &w) + g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) + + g.Testing("messages get committed in order", func() { + var w strings.Builder + + pub([]byte("first payload")) + pub([]byte("second payload")) + pub([]byte("third payload")) + + message1 := messages[0] + g.TAssertEqual(message1.payload, []byte("first payload")) + + rc, err := commitExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + message2 := messages[0] + g.TAssertEqual(message2.payload, []byte("second payload")) + + rc, err = commitExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + message3 := messages[0] + g.TAssertEqual(message3.payload, []byte("third payload")) + + rc, err = commitExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + g.TAssertEqual(len(messages), 0) + }) + + g.Testing("when next() query fails, we propagate its result", func() { + nextErr = errors.New("next() error") + var w strings.Builder + rc, err := commitExec(args, queries, r, &w) + g.TAssertEqual(err, nextErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + nextErr = nil + }) + + g.Testing("we also propagate the error on commit() failure", func() { + commitErr = errors.New("commit() error") + pub([]byte{}) + var w strings.Builder + rc, err := commitExec(args, queries, r, &w) + g.TAssertEqual(err, commitErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_deadExec() { + g.TestStart("deadExec()") + + const ( + topic = "deadExec topic" + consumer = "deadExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + takeErr error + nextErr error + toDeadErr error + messages []messageT + id int64 = 0 + ) + queries := queriesT{ + take: func(string, string) error { + return takeErr + }, + next: func(string, string) (messageT, error) { + if nextErr != nil { + return messageT{}, nextErr + } + + if len(messages) == 0 { + return messageT{}, sql.ErrNoRows + } + + return messages[0], nil + }, + toDead: func( + _ string, + _ guuid.UUID, + deadletterID guuid.UUID, + ) error { + if toDeadErr != nil { + return toDeadErr + } + + messages = messages[1:] + return nil + }, + } + pub := func(payload []byte) { + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + + + g.Testing("error when we can't take", func() { + takeErr = errors.New("deadExec() take error") + + var w strings.Builder + + rc, err := deadExec(args, queries, r, &w) + g.TAssertEqual(err, takeErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + + takeErr = nil + }) + + g.Testing("error when there is nothing to mark as dead", func() { + var w strings.Builder + + rc, err := deadExec(args, queries, r, &w) + g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) + + g.Testing("the latest message becomes a deadletter", func() { + var w strings.Builder + + pub([]byte("first payload")) + pub([]byte("second payload")) + + message1 := messages[0] + g.TAssertEqual(message1.payload, []byte("first payload")) + + rc, err := deadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + message2 := messages[0] + g.TAssertEqual(message2.payload, []byte("second payload")) + + rc, err = deadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + g.TAssertEqual(len(messages), 0) + }) + + g.Testing("next() error is propagated", func() { + nextErr = errors.New("next() error") + var w strings.Builder + rc, err := deadExec(args, queries, r, &w) + g.TAssertEqual(err, nextErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + nextErr = nil + }) + + g.Testing("toDead() error is propagated", func() { + toDeadErr = errors.New("toDead() error") + pub([]byte{}) + var w strings.Builder + rc, err := deadExec(args, queries, r, &w) + g.TAssertEqual(err, toDeadErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_listDeadExec() { + g.TestStart("listDeadExec()") + + const ( + topic = "listDeadExec topic" + consumer = "listDeadExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + messages []messageT + deadletters []deadletterT + id int64 = 0 + errorIndex = -1 + allDeadErr = errors.New("allDead() error") + ) + queries := queriesT{ + allDead: func( + _ string, + _ string, + callback func(deadletterT, messageT) error, + ) error { + for i, deadletter := range deadletters { + if i == errorIndex { + return allDeadErr + } + + callback(deadletter, messageT{}) + } + + return nil + }, + } + pub := func() { + payload := []byte("ignored payload for this test") + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + commit := func() { + messages = messages[1:] + } + dead := func() { + message := messages[0] + now := time.Now() + deadletter := deadletterT{ + uuid: guuid.New(), + timestamp: now, + consumer: consumer, + messageID: message.uuid, + } + + messages = messages[1:] + deadletters = append(deadletters, deadletter) + } + replay := func() { + pub() + deadletters = deadletters[1:] + } + + + g.Testing("nothing is shown if topic is empty", func() { + var w strings.Builder + + rc, err := listDeadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + }) + + g.Testing("deadletters are printed in order", func() { + var w strings.Builder + + pub() + pub() + pub() + + rc, err := listDeadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + + dead() + commit() + dead() + + expected := fmt.Sprintf( + "%s\t%s\t%s\n%s\t%s\t%s\n", + deadletters[0].uuid.String(), + deadletters[0].timestamp.Format(time.RFC3339), + deadletters[0].consumer, + deadletters[1].uuid.String(), + deadletters[1].timestamp.Format(time.RFC3339), + deadletters[1].consumer, + ) + + rc, err = listDeadExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), expected) + g.TAssertEqual(rc, 0) + }) + + g.Testing("deadletters disappear after being replayed", func() { + var ( + w1 strings.Builder + w2 strings.Builder + ) + + replay() + + deadletter := deadletters[0] + expected := fmt.Sprintf( + "%s\t%s\t%s\n", + deadletter.uuid.String(), + deadletter.timestamp.Format(time.RFC3339), + deadletter.consumer, + ) + + rc, err := listDeadExec(args, queries, r, &w1) + g.TErrorIf(err) + g.TAssertEqual(w1.String(), expected) + g.TAssertEqual(rc, 0) + + replay() + + rc, err = listDeadExec(args, queries, r, &w2) + g.TErrorIf(err) + g.TAssertEqual(w2.String(), "") + g.TAssertEqual(rc, 0) + }) + + g.Testing("a database failure interrupts the output", func() { + var w strings.Builder + + pub() + pub() + pub() + dead() + dead() + dead() + + deadletter := deadletters[0] + expected := fmt.Sprintf( + "%s\t%s\t%s\n", + deadletter.uuid.String(), + deadletter.timestamp.Format(time.RFC3339), + deadletter.consumer, + ) + + errorIndex = 1 + rc, err := listDeadExec(args, queries, r, &w) + g.TAssertEqual(err, allDeadErr) + g.TAssertEqual(w.String(), expected) + g.TAssertEqual(rc, 1) + }) +} + +func test_replayExec() { + g.TestStart("replayExec()") + + const ( + topic = "replayExec topic" + consumer = "replayExec consumer" + ) + var ( + w strings.Builder + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var ( + oneDeadErr error + replayErr error + messages []messageT + deadletters []deadletterT + deadMessages []messageT + id int64 = 0 + ) + queries := queriesT{ + oneDead: func(string, string) (deadletterT, error) { + if oneDeadErr != nil { + return deadletterT{}, oneDeadErr + } + + if len(deadletters) == 0 { + return deadletterT{}, sql.ErrNoRows + } + + return deadletters[0], nil + }, + replay: func(guuid.UUID, guuid.UUID) (messageT, error) { + if replayErr != nil { + return messageT{}, replayErr + } + + message := deadMessages[0] + messages = append(messages, message) + deadletters = deadletters[1:] + deadMessages = deadMessages[1:] + return message, nil + }, + } + pub := func(payload []byte) { + id++ + now := time.Now() + message := messageT{ + id: id, + timestamp: now, + uuid: guuid.New(), + topic: topic, + flowID: guuid.New(), + payload: payload, + } + messages = append(messages, message) + } + commit := func() { + messages = messages[1:] + } + dead := func() { + message := messages[0] + deadletter := deadletterT{ uuid: guuid.New() } + + messages = messages[1:] + deadletters = append(deadletters, deadletter) + deadMessages = append(deadMessages, message) + } + next := func() string { + return string(messages[0].payload) + } + + + g.Testing("error when there is nothing to replay", func() { + rc, err := replayExec(args, queries, r, &w) + g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + + pub([]byte("first payload")) + pub([]byte("second payload")) + pub([]byte("third payload")) + pub([]byte("fourth payload")) + + rc, err = replayExec(args, queries, r, &w) + g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) + + g.Testing("deadletters are replayed in order", func() { + dead() + commit() + dead() + commit() + + rc, err := replayExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(next(), "first payload") + + commit() + + rc, err = replayExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(next(), "third payload") + }) + + g.Testing("oneDead() error is forwarded", func() { + oneDeadErr = errors.New("oneDead() error") + rc, err := replayExec(args, queries, r, &w) + g.TAssertEqual(err, oneDeadErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + oneDeadErr = nil + }) + + g.Testing("replay() error is also forwarded", func() { + pub([]byte{}) + dead() + + replayErr = errors.New("replay() error") + rc, err := replayExec(args, queries, r, &w) + g.TAssertEqual(err, replayErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + }) +} + +func test_sizeExec() { + g.TestStart("sizeExec()") + + const ( + topic = "sizeExec topic" + consumer = "sizeExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var sizeErr error + queries := queriesT{ + size: func(string) (int, error) { + if sizeErr != nil { + return -1, sizeErr + } + + return 123, nil + }, + } + + + g.Testing("it propagates the error when the query fails", func() { + sizeErr = errors.New("size() error") + var w strings.Builder + rc, err := sizeExec(args, queries, r, &w) + g.TAssertEqual(err, sizeErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + sizeErr = nil + }) + + g.Testing("otherwise it just prints what is was given", func() { + var w strings.Builder + rc, err := sizeExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "123\n") + g.TAssertEqual(rc, 0) + }) +} + +func test_countExec() { + g.TestStart("countExec()") + + const ( + topic = "countExec topic" + consumer = "countExec consumer" + ) + var ( + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + var countErr error + queries := queriesT{ + count: func(string, string) (int, error) { + if countErr != nil { + return -1, countErr + } + + return 2222, nil + }, + } + + + g.Testing("it propagates the query error", func() { + countErr = errors.New("count() error") + var w strings.Builder + rc, err := countExec(args, queries, r, &w) + g.TAssertEqual(err, countErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + countErr = nil + }) + + g.Testing("otherwise it prints the given count", func() { + var w strings.Builder + rc, err := countExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(w.String(), "2222\n") + g.TAssertEqual(rc, 0) + }) +} + +func test_hasDataExec() { + g.TestStart("hasData()") + + const ( + topic = "hasData topic" + consumer = "hasData consumer" + ) + var ( + w strings.Builder + r = strings.NewReader("") + args = argsT{ + topic: topic, + consumer: consumer, + } + ) + + hasData := true + var hasDataErr error + queries := queriesT{ + hasData: func(string, string) (bool, error) { + if hasDataErr != nil { + return false, hasDataErr + } + + return hasData, nil + }, + } + + + g.Testing("it propagates the query error", func() { + hasDataErr = errors.New("hasData() error") + rc, err := hasDataExec(args, queries, r, &w) + g.TAssertEqual(err, hasDataErr) + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 1) + hasDataErr = nil + }) + + g.Testing("otherwise if just returns (not prints) the flag", func() { + hasData = true + rc, err := hasDataExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 0) + + hasData = false + rc, err = hasDataExec(args, queries, r, &w) + g.TErrorIf(err) + g.TAssertEqual(rc, 1) + + g.TAssertEqual(w.String(), "") + }) +} + +func test_usage() { + g.TestStart("usage()") + + g.Testing("it just writes to io.Writer", func() { + var w strings.Builder + usage("xxx", &w) + const message = + "Usage: xxx [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" + g.TAssertEqual(w.String(), message) + }) + + g.Testing("noop on io.Discard for io.Writer", func() { + usage("AN ERROR IF SEEN ANYWHERE!", io.Discard) + }) +} + +func test_getopt() { + g.TestStart("getopt()") + + const warning = "Missing COMMAND.\n" + const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" + + execFn := func(argsT, queriesT, io.Reader, io.Writer) (int, error) { + return 0, nil + } + commandsMap := map[string]commandT { + "good": commandT{ + name: "good", + getopt: func(args argsT, _ io.Writer) (argsT, bool) { + return args, true + }, + exec: execFn, + }, + "bad": commandT{ + name: "bad", + getopt: func(args argsT, w io.Writer) (argsT, bool) { + if len(args.args) == 0 { + fmt.Fprintln( + w, + "no required arg", + ) + return args, false + } + + if args.args[0] != "required" { + fmt.Fprintln( + w, + "not correct one", + ) + return args, false + } + + args.topic = "a topic" + args.consumer = "a consumer" + return args, true + }, + exec: execFn, + }, + } + + + g.Testing("we suppress the default error message", func() { + var w strings.Builder + argv := []string{"$0", "-h"} + _, _, rc := getopt(argv, commandsMap, &w) + + g.TAssertEqual(w.String(), usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("we get an error on unsupported flag", func() { + var w strings.Builder + argv := []string{"$0", "-X"} + _, _, rc := getopt(argv, commandsMap, &w) + + const message = "flag provided but not defined: -X\n" + g.TAssertEqual(w.String(), message + usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("we also get an error on incorrect usage of flags", func() { + var w strings.Builder + argv := []string{"$0", "-f"} + _, _, rc := getopt(argv, commandsMap, &w) + + const message = "flag needs an argument: -f\n" + g.TAssertEqual(w.String(), message + usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("error when not given a command", func() { + var w strings.Builder + argv := []string{"$0"} + _, _, rc := getopt(argv, commandsMap, &w) + + g.TAssertEqual(w.String(), warning + usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("error on unknown command", func() { + var w strings.Builder + argv := []string{"$0", "unknown"} + _, _, rc := getopt(argv, commandsMap, &w) + + const message = `Bad COMMAND: "unknown".` + "\n" + g.TAssertEqual(w.String(), message + usage) + g.TAssertEqual(rc, 2) + }) + + g.Testing("checks the command usage", func() { + var ( + w1 strings.Builder + w2 strings.Builder + w3 strings.Builder + ) + + argv1 := []string{"$0", "bad"} + argv2 := []string{"$0", "bad", "arg"} + argv3 := []string{"$0", "bad", "required"} + _, _, rc1 := getopt(argv1, commandsMap, &w1) + _, _, rc2 := getopt(argv2, commandsMap, &w2) + args, command, rc3 := getopt(argv3, commandsMap, &w3) + expectedArgs := argsT{ + databasePath: "q.db", + prefix: "q", + command: "bad", + allArgs: argv3, + args: argv3[2:], + topic: "a topic", + consumer: "a consumer", + } + + g.TAssertEqual(w1.String(), "no required arg\n" + usage) + g.TAssertEqual(w2.String(), "not correct one\n" + usage) + g.TAssertEqual(w3.String(), "") + g.TAssertEqual(rc1, 2) + g.TAssertEqual(rc2, 2) + g.TAssertEqual(rc3, 0) + g.TAssertEqual(args, expectedArgs) + g.TAssertEqual(command.name, "bad") + }) + + g.Testing("when given a command we the default values", func() { + var w strings.Builder + args, command, rc := getopt( + []string{"$0", "good"}, + commandsMap, + &w, + ) + expectedArgs := argsT{ + databasePath: "q.db", + prefix: "q", + command: "good", + allArgs: []string{"$0", "good"}, + args: []string{}, + } + + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(args, expectedArgs) + g.TAssertEqual(command.name, "good") + }) + + g.Testing("we can customize both values", func() { + var w strings.Builder + argv := []string{"$0", "-f", "F", "-p", "P", "good"} + args, command, rc := getopt(argv, commandsMap, &w) + expectedArgs := argsT{ + databasePath: "F", + prefix: "P", + command: "good", + allArgs: argv, + args: []string{}, + } + + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(args, expectedArgs) + g.TAssertEqual(command.name, "good") + }) + + g.Testing("a command can have its own commands and options", func() { + var w strings.Builder + argv := []string{"$0", "-f", "F", "good", "-f", "-f", "SUB"} + args, command, rc := getopt(argv, commandsMap, &w) + expectedArgs := argsT{ + databasePath: "F", + prefix: "q", + command: "good", + allArgs: argv, + args: []string{"-f", "-f", "SUB"}, + } + + g.TAssertEqual(w.String(), "") + g.TAssertEqual(rc, 0) + g.TAssertEqual(args, expectedArgs) + g.TAssertEqual(command.name, "good") + }) +} + +func test_runCommand() { + g.TestStart("runCommand()") + + const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" + + + g.Testing("returns an error on bad prefix", func() { + stdin := strings.NewReader("") + var ( + stdout strings.Builder + stderr strings.Builder + ) + args := argsT{ + prefix: "a bad prefix", + command: "in", + allArgs: []string{"$0"}, + args: []string{"some topic name"}, + } + rc := runCommand(args, commands["in"], stdin, &stdout, &stderr) + + g.TAssertEqual(rc, 1) + g.TAssertEqual(stdout.String(), "") + g.TAssertEqual(stderr.String(), "Invalid table prefix\n") + }) + + g.Testing("otherwise it build a queueT and calls command", func() { + stdin := strings.NewReader("") + var ( + stdout1 strings.Builder + stdout2 strings.Builder + stderr1 strings.Builder + stderr2 strings.Builder + ) + args1 := argsT{ + prefix: defaultPrefix, + command: "good", + } + args2 := argsT{ + prefix: defaultPrefix, + command: "bad", + } + myErr := errors.New("an error") + good := commandT{ + exec: func( + argsT, + queriesT, + io.Reader, + io.Writer, + ) (int, error) { + return 0, nil + }, + } + bad := commandT{ + exec: func( + _ argsT, + _ queriesT, + _ io.Reader, + w io.Writer, + ) (int, error) { + fmt.Fprintf(w, "some text\n") + return 1, myErr + }, + } + rc1 := runCommand(args1, good, stdin, &stdout1, &stderr1) + rc2 := runCommand(args2, bad, stdin, &stdout2, &stderr2) + + g.TAssertEqual(stdout1.String(), "") + g.TAssertEqual(stdout2.String(), "some text\n") + g.TAssertEqual(stderr1.String(), "") + g.TAssertEqual(stderr2.String(), "an error\n") + g.TAssertEqual(rc1, 0) + g.TAssertEqual(rc2, 1) + }) +} + + +func dumpQueries() { + queries := []struct{name string; fn func(string) queryT}{ + { "createTables", createTablesSQL }, + { "take", takeSQL }, + { "publish", publishSQL }, + { "find", findSQL }, + { "pending", pendingSQL }, + { "commit", commitSQL }, + { "toDead", toDeadSQL }, + { "replay", replaySQL }, + { "oneDead", oneDeadSQL }, + { "allDead", allDeadSQL }, + { "size", sizeSQL }, + { "count", countSQL }, + { "hasData", hasDataSQL }, + } + for _, query := range queries { + q := query.fn(defaultPrefix) + fmt.Printf("\n-- %s.sql:", query.name) + fmt.Printf("\n-- write: %s\n", q.write) + fmt.Printf("\n-- read: %s\n", q.read) + fmt.Printf("\n-- owner: %s\n", q.owner) + } +} + + + +func MainTest() { + if os.Getenv("TESTING_DUMP_SQL_QUERIES") != "" { + dumpQueries() + return + } + + g.Init() + test_defaultPrefix() + test_inTx() + test_createTables() + test_takeStmt() + test_publishStmt() + test_findStmt() + test_nextStmt() + test_messageEach() + test_pendingStmt() + test_commitStmt() + test_toDeadStmt() + test_replayStmt() + test_oneDeadStmt() + test_deadletterEach() + test_allDeadStmt() + test_sizeStmt() + test_countStmt() + test_hasDataStmt() + test_initDB() + test_queriesTclose() + test_newPinger() + test_makeSubscriptionsFunc() + test_makeNotifyFn() + test_collectClosedWaiters() + test_trimEmptyLeaves() + test_deleteIfEmpty() + test_deleteEmptyTopics() + test_removeClosedWaiter() + test_reapClosedWaiters() + test_everyNthCall() + test_runReaper() + test_NewWithPrefix() + test_New() + test_asPublicMessage() + test_queueT_Publish() + test_registerConsumerFn() + test_registerWaiterFn() + test_makeConsumeOneFn() + test_makeConsumeAllFn() + test_makeWaitFn() + test_runConsumer() + test_tryFinding() + test_queueT_Subscribe() + test_queueT_WaitFor() + test_unsubscribeIfExistsFn() + test_queueT_Unsubscribe() + test_cleanSubscriptions() + test_queueT_Close() + test_topicGetopt() + test_topicConsumerGetopt() + test_inExec() + test_outExec() + test_commitExec() + test_deadExec() + test_listDeadExec() + test_replayExec() + test_sizeExec() + test_countExec() + test_hasDataExec() + test_usage() + test_getopt() + test_runCommand() +} |