package fiinha import ( "bytes" "database/sql" "errors" "fmt" "io" "log/slog" "os" "reflect" "sort" "strings" "sync" "time" "golite" "uuid" g "gobang" ) var instanceID = os.Getpid() func test_defaultPrefix() { g.TestStart("defaultPrefix") g.Testing("the defaultPrefix is valid", func() { g.TErrorIf(g.ValidateSQLTablePrefix(defaultPrefix)) }) } func test_tryRollback() { g.TestStart("tryRollback()") myErr := errors.New("bottom error") db, err := sql.Open(golite.DriverName, golite.InMemory) g.TErrorIf(err) defer db.Close() g.Testing("the error is propagated if rollback doesn't fail", func() { tx, err := db.Begin() g.TErrorIf(err) err = tryRollback(tx, myErr) g.TAssertEqual(err, myErr) }) g.Testing("a wrapped error when rollback fails", func() { tx, err := db.Begin() g.TErrorIf(err) err = tx.Commit() g.TErrorIf(err) err = tryRollback(tx, myErr) g.TAssertEqual(reflect.DeepEqual(err, myErr), false) g.TAssertEqual(errors.Is(err, myErr), true) }) } func test_inTx() { g.TestStart("inTx()") db, err := sql.Open(golite.DriverName, golite.InMemory) g.TErrorIf(err) defer db.Close() g.Testing("when fn() errors, we propagate it", func() { myErr := errors.New("to be propagated") err := inTx(db, func(tx *sql.Tx) error { return myErr }) g.TAssertEqual(err, myErr) }) g.Testing("on nil error we get nil", func() { err := inTx(db, func(tx *sql.Tx) error { return nil }) g.TErrorIf(err) }) } func test_serialized() { // FIXME } func test_execSerialized() { // FIXME } func test_createTables() { g.TestStart("createTables()") const ( dbpath = golite.InMemory prefix = defaultPrefix ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) defer db.Close() g.Testing("tables exist afterwards", func() { const tmpl_read = ` SELECT id FROM "%s_messages" LIMIT 1; ` qRead := fmt.Sprintf(tmpl_read, prefix) _, err := db.Exec(qRead) g.TErrorNil(err) err = createTables(db, prefix) g.TErrorIf(err) _, err = db.Exec(qRead) g.TErrorIf(err) }) g.Testing("we can do it multiple times", func() { g.TErrorIf(g.SomeError( createTables(db, prefix), createTables(db, prefix), createTables(db, prefix), )) }) } func test_takeStmt() { g.TestStart("takeStmt()") const ( topic = "take() topic" consumer = "take() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) g.TErrorIf(takeErr) defer g.SomeFnError( takeClose, db.Close, ) const tmpl = ` SELECT owner_id from "%s_owners" WHERE topic = ? AND consumer = ?; ` sqlOwner := fmt.Sprintf(tmpl, prefix) g.Testing("when there is no owner, we become it", func() { var ownerID int err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) g.TAssertEqual(err, sql.ErrNoRows) err = take(topic, consumer) g.TErrorIf(err) err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) g.TErrorIf(err) g.TAssertEqual(ownerID, instanceID) }) g.Testing("if there is already an owner, we overtake it", func() { otherCfg := cfg otherCfg.instanceID = instanceID + 1 take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() var ownerID int err := db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) g.TErrorIf(err) g.TAssertEqual(ownerID, instanceID) err = take(topic, consumer) g.TErrorIf(err) err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) g.TErrorIf(err) g.TAssertEqual(ownerID, otherCfg.instanceID) }) g.Testing("no error if closed more than once", func() { g.TErrorIf(g.SomeError( takeClose(), takeClose(), takeClose(), )) }) } func test_publishStmt() { g.TestStart("publishStmt()") const ( topic = "publish() topic" payloadStr = "publish() payload" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } publish, publishClose, publishErr := publishStmt(cfg) g.TErrorIf(publishErr) defer g.SomeFnError( publishClose, db.Close, ) g.Testing("we can publish a message", func() { messageID := uuid.New() message, err := publish(unsent, messageID) g.TErrorIf(err) g.TAssertEqual(message.id, int64(1)) g.TAssertEqual(message.uuid, messageID) g.TAssertEqual(message.topic, topic) g.TAssertEqual(message.flowID, flowID) g.TAssertEqual(message.payload, payload) }) g.Testing("we can publish the same message repeatedly", func() { messageID1 := uuid.New() messageID2 := uuid.New() message1, err1 := publish(unsent, messageID1) message2, err2 := publish(unsent, messageID2) g.TErrorIf(g.SomeError(err1, err2)) g.TAssertEqual(message1.id, message2.id - 1) g.TAssertEqual(message1.topic, message2.topic) g.TAssertEqual(message1.flowID, message2.flowID) g.TAssertEqual(message1.payload, message2.payload) g.TAssertEqual(message1.uuid, messageID1) g.TAssertEqual(message2.uuid, messageID2) }) g.Testing("publishing a message with the same UUID errors", func() { messageID := uuid.New() message1, err1 := publish(unsent, messageID) _, err2 := publish(unsent, messageID) g.TErrorIf(err1) g.TAssertEqual(message1.uuid, messageID) g.TAssertEqual(message1.topic, topic) g.TAssertEqual(message1.flowID, flowID) g.TAssertEqual(message1.payload, payload) g.TAssertEqual( err2.(golite.Error).ExtendedCode, golite.ErrConstraintUnique, ) }) g.Testing("no actual closing occurs", func() { g.TErrorIf(g.SomeError( publishClose(), publishClose(), publishClose(), )) }) } func test_findStmt() { g.TestStart("findStmt()") const ( topic = "find() topic" payloadStr = "find() payload" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } publish, publishClose, publishErr := publishStmt(cfg) find, findClose, findErr := findStmt(cfg) g.TErrorIf(g.SomeError( publishErr, findErr, )) defer g.SomeFnError( publishClose, findClose, db.Close, ) pub := func(flowID uuid.UUID) uuid.UUID { unsentWithFlowID := unsent unsentWithFlowID.FlowID = flowID messageID := uuid.New() _, err := publish(unsentWithFlowID, messageID) g.TErrorIf(err) return messageID } g.Testing("we can find a message by topic and flowID", func() { flowID := uuid.New() messageID := pub(flowID) message, err := find(topic, flowID) g.TErrorIf(err) g.TAssertEqual(message.uuid, messageID) g.TAssertEqual(message.topic, topic) g.TAssertEqual(message.flowID, flowID) g.TAssertEqual(message.payload, payload) }) g.Testing("a non-existent message gives us an error", func() { message, err := find(topic, uuid.New()) g.TAssertEqual(message, messageT{}) g.TAssertEqual(err, sql.ErrNoRows) }) g.Testing("findig twice yields the exact same message", func() { flowID := uuid.New() messageID := pub(flowID) message1, err1 := find(topic, flowID) message2, err2 := find(topic, flowID) g.TErrorIf(g.SomeError(err1, err2)) g.TAssertEqual(message1.uuid, messageID) g.TAssertEqual(message1, message2) }) g.Testing("returns the latest entry if multiple are available", func() { flowID := uuid.New() _ , err0 := find(topic, flowID) pub(flowID) message1, err1 := find(topic, flowID) pub(flowID) message2, err2 := find(topic, flowID) g.TAssertEqual(err0, sql.ErrNoRows) g.TErrorIf(g.SomeError(err1, err2)) g.TAssertEqual(message1.uuid == message2.uuid, false) g.TAssertEqual(message1.id < message2.id, true) }) g.Testing("no error if closed more than once", func() { g.TErrorIf(g.SomeError( findClose(), findClose(), findClose(), )) }) } func test_nextStmt() { g.TestStart("nextStmt()") const ( topic = "next() topic" payloadStr = "next() payload" consumer = "next() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) next, nextClose, nextErr := nextStmt(cfg) commit, commitClose, commitErr := commitStmt(cfg) g.TErrorIf(takeErr) g.TErrorIf(publishErr) g.TErrorIf(nextErr) g.TErrorIf(commitErr) g.TErrorIf(g.SomeError( takeErr, publishErr, nextErr, commitErr, )) defer g.SomeFnError( takeClose, publishClose, nextClose, commitClose, db.Close, ) pub := func(topic string) messageT { g.TErrorIf(take(topic, consumer)) unsentWithTopic := unsent unsentWithTopic.Topic = topic message, err := publish(unsentWithTopic, uuid.New()) g.TErrorIf(err) return message } g.Testing("we get an error on empty topic", func() { _, err := next(topic, consumer) g.TAssertEqual(err, sql.ErrNoRows) }) g.Testing("we don't get messages from other topics", func() { pub("other topic") _, err := next(topic, consumer) g.TAssertEqual(err, sql.ErrNoRows) }) g.Testing("we can get the next message", func() { expectedMessage := pub(topic) pub(topic) pub(topic) message, err := next(topic, consumer) g.TErrorIf(err) g.TAssertEqual(message, expectedMessage) }) g.Testing("we keep getting the next until we commit", func() { message1, err1 := next(topic, consumer) message2, err2 := next(topic, consumer) g.TErrorIf(commit(consumer, message1.uuid)) message3, err3 := next(topic, consumer) g.TErrorIf(g.SomeError(err1, err2, err3)) g.TAssertEqual(message1, message2) g.TAssertEqual(message2.uuid != message3.uuid, true) }) g.Testing("each consumer has its own next message", func() { g.TErrorIf(take(topic, "other consumer")) message1, err1 := next(topic, consumer) message2, err2 := next(topic, "other consumer") g.TErrorIf(g.SomeError(err1, err2)) g.TAssertEqual(message1.uuid != message2.uuid, true) }) g.Testing("error when we're not the owner", func() { otherCfg := cfg otherCfg.instanceID = instanceID + 1 take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() _, err := next(topic, consumer) g.TErrorIf(err) err = take(topic, consumer) g.TErrorIf(err) _, err = next(topic, consumer) g.TAssertEqual(err, fmt.Errorf( notOwnerErrorFmt, otherCfg.instanceID, topic, consumer, instanceID, )) }) g.Testing("we can close more than once", func() { g.TErrorIf(g.SomeError( nextClose(), nextClose(), nextClose(), )) }) } func test_messageEach() { g.TestStart("messageEach()") const ( topic = "messageEach() topic" payloadStr = "messageEach() payload" consumer = "messageEach() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) pending, pendingClose, pendingErr := pendingStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, pendingErr, )) defer g.SomeFnError( takeClose, publishClose, pendingClose, db.Close, ) pub := func() uuid.UUID { message, err := publish(unsent, uuid.New()) g.TErrorIf(err) return message.uuid } g.TErrorIf(take(topic, consumer)) g.Testing("not called on empty set", func() { rows, err := pending(topic, consumer) g.TErrorIf(err) messageEach(rows, func(messageT) error { g.Unreachable() return nil }) }) g.Testing("the callback is called once for each entry", func() { messageIDs := []uuid.UUID{ pub(), pub(), pub(), } rows, err := pending(topic, consumer) g.TErrorIf(err) var collectedIDs []uuid.UUID err = messageEach(rows, func(message messageT) error { collectedIDs = append(collectedIDs, message.uuid) return nil }) g.TErrorIf(err) g.TAssertEqual(collectedIDs, messageIDs) }) g.Testing("we halt if the timestamp is ill-formatted", func() { messageID := pub() message_id_bytes := messageID[:] pub() pub() pub() const tmplUpdate = ` UPDATE "%s_messages" SET timestamp = '01/01/1970' WHERE uuid = ?; ` sqlUpdate := fmt.Sprintf(tmplUpdate, prefix) _, err := db.Exec(sqlUpdate, message_id_bytes) g.TErrorIf(err) rows, err := pending(topic, consumer) g.TErrorIf(err) n := 0 err = messageEach(rows, func(messageT) error { n++ return nil }) g.TAssertEqual( err, &time.ParseError{ Layout: time.RFC3339Nano, Value: "01/01/1970", LayoutElem: "2006", ValueElem: "01/01/1970", Message: "", }, ) g.TAssertEqual(n, 3) const tmplDelete = ` DELETE FROM "%s_messages" WHERE uuid = ?; ` sqlDelete := fmt.Sprintf(tmplDelete, prefix) _, err = db.Exec(sqlDelete, message_id_bytes) g.TErrorIf(err) }) g.Testing("we halt if the callback returns an error", func() { myErr := errors.New("callback error early return") rows1, err1 := pending(topic, consumer) g.TErrorIf(err1) n1 := 0 err1 = messageEach(rows1, func(messageT) error { n1++ if n1 == 4 { return myErr } return nil }) rows2, err2 := pending(topic, consumer) g.TErrorIf(err2) n2 := 0 err2 = messageEach(rows2, func(messageT) error { n2++ return nil }) g.TAssertEqual(err1, myErr) g.TErrorIf(err2) g.TAssertEqual(n1, 4) g.TAssertEqual(n2, 6) }) g.Testing("noop when given nil for *sql.Rows", func() { err := messageEach(nil, func(messageT) error { g.Unreachable() return nil }) g.TErrorIf(err) }) } func test_pendingStmt() { g.TestStart("pendingStmt()") const ( topic = "pending() topic" payloadStr = "pending() payload" consumer = "pending() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) pending, pendingClose, pendingErr := pendingStmt(cfg) commit, commitClose, commitErr := commitStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, pendingErr, commitErr, toDeadErr, )) defer g.SomeFnError( takeClose, publishClose, pendingClose, commitClose, toDeadClose, db.Close, ) pub := func(topic string) messageT { g.TErrorIf(take(topic, consumer)) unsentWithTopic := unsent unsentWithTopic.Topic = topic message, err := publish(unsentWithTopic, uuid.New()) g.TErrorIf(err) return message } g.TErrorIf(take(topic, consumer)) collectPending := func(topic string, consumer string) []messageT { rows, err := pending(topic, consumer) g.TErrorIf(err) var messages []messageT err = messageEach(rows, func(message messageT) error { messages = append(messages, message) return nil }) g.TErrorIf(err) return messages } g.Testing("an empty database has 0 pending items", func() { g.TAssertEqual(len(collectPending(topic, consumer)), 0) }) g.Testing("after publishing we get all messages", func() { expected := []messageT{ pub(topic), pub(topic), } g.TAssertEqualI(collectPending(topic, consumer), expected) }) g.Testing("we get the same messages when calling again", func() { messages1 := collectPending(topic, consumer) messages2 := collectPending(topic, consumer) g.TAssertEqual(len(messages1), 2) g.TAssertEqualI(messages1, messages2) }) g.Testing("we don't get messages from other topics", func() { pub("other topic") g.TAssertEqual(len(collectPending(topic, consumer)), 2) g.TAssertEqual(len(collectPending("other topic", consumer)), 1) }) g.Testing("after others commit, pending still returns them", func() { g.TErrorIf(take(topic, "other consumer")) messages1 := collectPending(topic, consumer) g.TAssertEqual(len(messages1), 2) g.TErrorIf( commit("other consumer", messages1[0].uuid), ) messages2 := collectPending(topic, consumer) g.TAssertEqualI(messages1, messages2) }) g.Testing("committing other topic doesn't change current", func() { messages1 := collectPending(topic, consumer) g.TAssertEqual(len(messages1), 2) message := pub("other topic") g.TErrorIf(commit(consumer, message.uuid)) messages2 := collectPending(topic, consumer) g.TAssertEqualI(messages1, messages2) }) g.Testing("after commiting, pending doesn't return them again", func() { messages1 := collectPending(topic, consumer) g.TAssertEqual(len(messages1), 2) g.TErrorIf(commit(consumer, messages1[0].uuid)) messages2 := collectPending(topic, consumer) g.TAssertEqual(len(messages2), 1) g.TAssertEqual(messages2[0], messages1[1]) g.TErrorIf(commit(consumer, messages1[1].uuid)) messages3 := collectPending(topic, consumer) g.TAssertEqual(len(messages3), 0) }) g.Testing("on deadletter, pending also doesn't return them", func() { messages0 := collectPending(topic, consumer) g.TAssertEqual(len(messages0), 0) message1 := pub(topic) message2 := pub(topic) messages1 := collectPending(topic, consumer) g.TAssertEqual(len(messages1), 2) err = toDead(consumer, message1.uuid, uuid.New()) g.TErrorIf(err) messages2 := collectPending(topic, consumer) g.TAssertEqual(len(messages2), 1) g.TAssertEqual(messages2[0], message2) err = toDead(consumer, message2.uuid, uuid.New()) g.TErrorIf(err) messages3 := collectPending(topic, consumer) g.TAssertEqual(len(messages3), 0) }) g.Testing("if commits are unordered, pending is still sorted", func() { message1 := pub(topic) message2 := pub(topic) message3 := pub(topic) g.TAssertEqual(collectPending(topic, consumer), []messageT{ message1, message2, message3, }) g.TErrorIf(commit(consumer, message2.uuid)) g.TAssertEqual(collectPending(topic, consumer), []messageT{ message1, message3, }) g.TErrorIf(commit(consumer, message1.uuid)) g.TAssertEqual(collectPending(topic, consumer), []messageT{ message3, }) g.TErrorIf(commit(consumer, message3.uuid)) g.TAssertEqual(len(collectPending(topic, consumer)), 0) }) g.Testing("when we're not the owners we get nothing", func() { otherCfg := cfg otherCfg.instanceID = instanceID + 1 take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() message1 := pub(topic) message2 := pub(topic) message3 := pub(topic) message4 := pub(topic) message5 := pub(topic) expected := []messageT{ message1, message2, message3, message4, message5, } g.TAssertEqual(collectPending(topic, consumer), expected) err := take(topic, consumer) g.TErrorIf(err) rows, err := pending(topic, consumer) g.TErrorIf(err) err = messageEach(rows, func(messageT) error { g.Unreachable() return nil }) g.TErrorIf(err) }) g.Testing("no actual closing occurs", func() { g.TErrorIf(g.SomeError( pendingClose(), pendingClose(), pendingClose(), )) }) } func test_commitStmt() { g.TestStart("commitStmt()") const ( topic = "commit() topic" payloadStr = "commit() payload" consumer = "commit() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) commit, commitClose, commitErr := commitStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, commitErr, toDeadErr, )) defer g.SomeFnError( takeClose, publishClose, commitClose, toDeadClose, db.Close, ) pub := func(topic string) uuid.UUID { g.TErrorIf(take(topic, consumer)) unsentWithTopic := unsent unsentWithTopic.Topic = topic message, err := publish(unsentWithTopic, uuid.New()) g.TErrorIf(err) return message.uuid } cmt := func(consumer string, messageID uuid.UUID) error { g.TErrorIf(take(topic, consumer)) return commit(consumer, messageID) } g.Testing("we can't commit twice", func() { messageID := pub(topic) err1 := cmt(consumer, messageID) err2 := cmt(consumer, messageID) g.TErrorIf(err1) g.TAssertEqual( err2.(golite.Error).ExtendedCode, golite.ErrConstraintUnique, ) }) g.Testing("we can't commit non-existent messages", func() { err := cmt(consumer, uuid.New()) g.TAssertEqual( err.(golite.Error).ExtendedCode, golite.ErrConstraintNotNull, ) }) g.Testing("multiple consumers may commit a message", func() { messageID := pub(topic) g.TErrorIf(g.SomeError( cmt(consumer, messageID), cmt("other consumer", messageID), cmt("yet another consumer", messageID), )) }) g.Testing("a consumer can commit to multiple topics", func() { messageID1 := pub(topic) messageID2 := pub("other topic") messageID3 := pub("yet another topic") g.TErrorIf(g.SomeError( cmt(consumer, messageID1), cmt(consumer, messageID2), cmt(consumer, messageID3), )) }) g.Testing("a consumer can consume many messages from a topic", func() { messageID1 := pub(topic) messageID2 := pub(topic) messageID3 := pub(topic) g.TErrorIf(g.SomeError( cmt(consumer, messageID1), cmt(consumer, messageID2), cmt(consumer, messageID3), )) }) g.Testing("we can't commit a dead message", func() { messageID := pub(topic) err1 := toDead(consumer, messageID, uuid.New()) err2 := cmt(consumer, messageID) g.TErrorIf(err1) g.TAssertEqual( err2.(golite.Error).ExtendedCode, golite.ErrConstraintUnique, ) }) g.Testing("error if we don't own the topic/consumer", func() { otherCfg := cfg otherCfg.instanceID = instanceID + 1 take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() messageID := pub(topic) err := take(topic, consumer) g.TErrorIf(err) err = commit(consumer, messageID) g.TAssertEqual( err.(golite.Error).ExtendedCode, golite.ErrConstraintTrigger, ) }) g.Testing("no actual closing occurs", func() { g.TErrorIf(g.SomeError( commitClose(), commitClose(), commitClose(), )) }) } func test_toDeadStmt() { g.TestStart("toDeadStmt()") const ( topic = "toDead() topic" payloadStr = "toDead() payload" consumer = "toDead() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) commit, commitClose, commitErr := commitStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, commitErr, toDeadErr, )) defer g.SomeFnError( takeClose, publishClose, commitClose, toDeadClose, db.Close, ) pub := func(topic string) uuid.UUID { g.TErrorIf(take(topic, consumer)) unsentWithTopic := unsent unsentWithTopic.Topic = topic message, err := publish(unsentWithTopic, uuid.New()) g.TErrorIf(err) return message.uuid } asDead := func( consumer string, messageID uuid.UUID, deadletterID uuid.UUID, ) error { g.TErrorIf(take(topic, consumer)) return toDead(consumer, messageID, deadletterID) } g.Testing("we can't mark as dead twice", func() { messageID := pub(topic) err1 := asDead(consumer, messageID, uuid.New()) err2 := asDead(consumer, messageID, uuid.New()) g.TErrorIf(err1) g.TAssertEqual( err2.(golite.Error).ExtendedCode, golite.ErrConstraintUnique, ) }) g.Testing("we can't reuse a deadletter id", func() { messageID1 := pub(topic) messageID2 := pub(topic) deadletterID := uuid.New() err1 := asDead(consumer, messageID1, deadletterID) err2 := asDead(consumer, messageID2, deadletterID) g.TErrorIf(err1) g.TAssertEqual( err2.(golite.Error).ExtendedCode, golite.ErrConstraintUnique, ) }) g.Testing("we can't mark as dead non-existent messages", func() { err := asDead(consumer, uuid.New(), uuid.New()) g.TAssertEqual( err.(golite.Error).ExtendedCode, golite.ErrConstraintNotNull, ) }) g.Testing("multiple consumers may mark a message as dead", func() { messageID := pub(topic) g.TErrorIf(g.SomeError( asDead(consumer, messageID, uuid.New()), asDead("another consumer", messageID, uuid.New()), asDead("yet another consumer", messageID, uuid.New()), )) }) g.Testing("a consumer can mark as dead in multiple topics", func() { messageID1 := pub(topic) messageID2 := pub("other topic") messageID3 := pub("yet other topic") g.TErrorIf(g.SomeError( asDead(consumer, messageID1, uuid.New()), asDead(consumer, messageID2, uuid.New()), asDead(consumer, messageID3, uuid.New()), )) }) g.Testing("a consumer can produce many deadletters in a topic", func() { messageID1 := pub(topic) messageID2 := pub(topic) messageID3 := pub(topic) g.TErrorIf(g.SomeError( asDead(consumer, messageID1, uuid.New()), asDead(consumer, messageID2, uuid.New()), asDead(consumer, messageID3, uuid.New()), )) }) g.Testing("a consumer can intercalate commits and deadletters", func() { messageID1 := pub(topic) messageID2 := pub(topic) messageID3 := pub(topic) messageID4 := pub(topic) messageID5 := pub(topic) g.TErrorIf(g.SomeError( asDead(consumer, messageID1, uuid.New()), commit(consumer, messageID2), commit(consumer, messageID3), asDead(consumer, messageID4, uuid.New()), commit(consumer, messageID5), )) }) g.Testing("we can't mark a committed message as dead", func() { messageID := pub(topic) err1 := commit(consumer, messageID) err2 := asDead(consumer, messageID, uuid.New()) g.TErrorIf(err1) g.TAssertEqual( err2.(golite.Error).ExtendedCode, golite.ErrConstraintUnique, ) }) g.Testing("error if we don't own the message's consumer/topic", func() { otherCfg := cfg otherCfg.instanceID = instanceID + 1 messageID1 := pub(topic) messageID2 := pub(topic) take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() err := toDead(consumer, messageID1, uuid.New()) g.TErrorIf(err) err = take(topic, consumer) g.TErrorIf(err) err = toDead(consumer, messageID2, uuid.New()) g.TAssertEqual( err.(golite.Error).ExtendedCode, golite.ErrConstraintTrigger, ) }) g.Testing("no actual closing occurs", func() { g.TErrorIf(g.SomeError( toDeadClose(), toDeadClose(), toDeadClose(), )) }) } func test_replayStmt() { g.TestStart("replayStmt()") const ( topic = "replay() topic" payloadStr = "replay() payload" consumer = "replay() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) replay, replayClose, replayErr := replayStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, toDeadErr, replayErr, )) defer g.SomeFnError( takeClose, publishClose, toDeadClose, replayClose, db.Close, ) pub := func() messageT { message, err := publish(unsent, uuid.New()) g.TErrorIf(err) return message } g.TErrorIf(take(topic, consumer)) g.Testing("we can replay a message", func() { message := pub() deadletterID := uuid.New() replayedID := uuid.New() err1 := toDead(consumer, message.uuid, deadletterID) replayed, err2 := replay(deadletterID, replayedID) g.TErrorIf(g.SomeError(err1, err2)) g.TAssertEqual(replayed.uuid, replayedID) g.TAssertEqual(replayed.id == message.id, false) g.TAssertEqual(replayed.uuid == message.uuid, false) }) g.Testing("a replayed message keeps its payload", func() { message := pub() deadletterID := uuid.New() err := toDead(consumer, message.uuid, deadletterID) g.TErrorIf(err) replayed, err := replay(deadletterID, uuid.New()) g.TErrorIf(err) g.TAssertEqual(message.flowID, replayed.flowID) g.TAssertEqual(message.payload, replayed.payload) }) g.Testing("we can't replay a dead message twice", func() { message := pub() deadletterID := uuid.New() err := toDead(consumer, message.uuid, deadletterID) g.TErrorIf(err) _, err1 := replay(deadletterID, uuid.New()) _, err2 := replay(deadletterID, uuid.New()) g.TErrorIf(err1) g.TAssertEqual( err2.(golite.Error).ExtendedCode, golite.ErrConstraintUnique, ) }) g.Testing("we cant replay non-existent messages", func() { _, err := replay(uuid.New(), uuid.New()) g.TAssertEqual( err.(golite.Error).ExtendedCode, golite.ErrConstraintNotNull, ) }) g.Testing("messages can die and then be replayed many times", func() { message := pub() deadletterID1 := uuid.New() deadletterID2 := uuid.New() err := toDead(consumer, message.uuid, deadletterID1) g.TErrorIf(err) replayed1, err := replay(deadletterID1, uuid.New()) g.TErrorIf(err) err = toDead(consumer, replayed1.uuid, deadletterID2) g.TErrorIf(err) replayed2, err := replay(deadletterID2, uuid.New()) g.TErrorIf(err) g.TAssertEqual(message.flowID, replayed1.flowID) g.TAssertEqual(replayed1.flowID, replayed2.flowID) }) g.Testing("no actual closing occurs", func() { g.TErrorIf(g.SomeError( replayClose(), replayClose(), replayClose(), )) }) } func test_oneDeadStmt() { g.TestStart("oneDeadStmt()") const ( topic = "oneDead() topic" payloadStr = "oneDead() payload" consumer = "oneDead() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) replay, replayClose, replayErr := replayStmt(cfg) oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, toDeadErr, replayErr, oneDeadErr, )) defer g.SomeFnError( takeClose, publishClose, toDeadClose, replayClose, oneDeadClose, db.Close, ) pub := func(topic string) uuid.UUID { g.TErrorIf(take(topic, consumer)) unsentWithTopic := unsent unsentWithTopic.Topic = topic message, err := publish(unsentWithTopic, uuid.New()) g.TErrorIf(err) return message.uuid } g.Testing("error on missing deadletters", func() { _, err := oneDead(topic, consumer) g.TAssertEqual(err, sql.ErrNoRows) }) g.Testing("deadletters on other topics don't show for us", func() { err := toDead(consumer, pub("other topic"), uuid.New()) g.TErrorIf(err) _, err = oneDead(topic, consumer) g.TAssertEqual(err, sql.ErrNoRows) }) g.Testing("deadletters for other consumers don't show for use", func() { g.TErrorIf(take(topic, "other consumer")) err := toDead("other consumer", pub(topic), uuid.New()) g.TErrorIf(err) _, err = oneDead(topic, consumer) g.TAssertEqual(err, sql.ErrNoRows) }) g.Testing("after being replayed deadletters aren't returned", func() { messageID1 := uuid.New() messageID2 := uuid.New() messageID3 := uuid.New() err1 := toDead(consumer, pub(topic), messageID1) err2 := toDead(consumer, pub(topic), messageID2) err3 := toDead(consumer, pub(topic), messageID3) g.TErrorIf(g.SomeError(err1, err2, err3)) deadletter, err := oneDead(topic, consumer) g.TErrorIf(err) g.TAssertEqual(deadletter.uuid, messageID1) _, err = replay(messageID2, uuid.New()) g.TErrorIf(err) deadletter, err = oneDead(topic, consumer) g.TErrorIf(err) g.TAssertEqual(deadletter.uuid, messageID1) _, err = replay(messageID1, uuid.New()) g.TErrorIf(err) deadletter, err = oneDead(topic, consumer) g.TErrorIf(err) g.TAssertEqual(deadletter.uuid, messageID3) _, err = replay(messageID3, uuid.New()) g.TErrorIf(err) _, err = oneDead(topic, consumer) g.TAssertEqual(err, sql.ErrNoRows) }) } func test_deadletterEach() { g.TestStart("deadletterEach") const ( topic = "deadletterEach() topic" payloadStr = "deadletterEach() payload" consumer = "deadletterEach() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, toDeadErr, allDeadErr, )) defer g.SomeFnError( takeClose, publishClose, toDeadClose, allDeadClose, db.Close, ) pub := func() uuid.UUID { message, err := publish(unsent, uuid.New()) g.TErrorIf(err) return message.uuid } dead := func(messageID uuid.UUID) uuid.UUID { deadletterID := uuid.New() err := toDead(consumer, messageID, deadletterID) g.TErrorIf(err) return deadletterID } g.TErrorIf(take(topic, consumer)) g.Testing("not called on empty set", func() { rows, err := allDead(topic, consumer) g.TErrorIf(err) n := 0 deadletterEach(rows, func(deadletterT, messageT) error { n++ return nil }) }) g.Testing("the callback is called once for each entry", func() { expected := []uuid.UUID{ dead(pub()), dead(pub()), dead(pub()), } rows, err := allDead(topic, consumer) g.TErrorIf(err) var deadletterIDs []uuid.UUID deadletterEach(rows, func( deadletter deadletterT, _ messageT, ) error { deadletterIDs = append(deadletterIDs, deadletter.uuid) return nil }) g.TAssertEqual(deadletterIDs, expected) }) g.Testing("we halt if the timestamp is ill-formatted", func() { messageID := pub() message_id_bytes := messageID[:] dead(messageID) dead(pub()) dead(pub()) dead(pub()) dead(pub()) const tmplUpdate = ` UPDATE "%s_offsets" SET timestamp = '01-01-1970' WHERE message_id IN ( SELECT id FROM "%s_messages" WHERE uuid = ? ); ` sqlUpdate := fmt.Sprintf(tmplUpdate, prefix, prefix) _, err := db.Exec(sqlUpdate, message_id_bytes) g.TErrorIf(err) rows, err := allDead(topic, consumer) g.TErrorIf(err) n := 0 err = deadletterEach(rows, func(deadletterT, messageT) error { n++ return nil }) g.TAssertEqual( err, &time.ParseError{ Layout: time.RFC3339Nano, Value: "01-01-1970", LayoutElem: "2006", ValueElem: "01-01-1970", Message: "", }, ) g.TAssertEqual(n, 3) const tmplDelete = ` DELETE FROM "%s_offsets" WHERE message_id IN ( SELECT id FROM "%s_messages" WHERE uuid = ? ); ` sqlDelete := fmt.Sprintf(tmplDelete, prefix, prefix) _, err = db.Exec(sqlDelete, message_id_bytes) g.TErrorIf(err) }) g.Testing("we halt if the callback returns an error", func() { myErr := errors.New("early return error") rows1, err1 := allDead(topic, consumer) g.TErrorIf(err1) n1 := 0 err1 = deadletterEach(rows1, func(deadletterT, messageT) error { n1++ if n1 == 1 { return myErr } return nil }) rows2, err2 := allDead(topic, consumer) g.TErrorIf(err2) n2 := 0 err = deadletterEach(rows2, func(deadletterT, messageT) error { n2++ return nil }) g.TAssertEqual(err1, myErr) g.TErrorIf(err2) g.TAssertEqual(n1, 1) g.TAssertEqual(n2, 7) }) } func test_allDeadStmt() { g.TestStart("allDeadStmt()") const ( topic = "allDead() topic" payloadStr = "allDead() payload" consumer = "allDead() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) replay, replayClose, replayErr := replayStmt(cfg) allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, toDeadErr, replayErr, allDeadErr, )) defer g.SomeFnError( takeClose, publishClose, toDeadClose, replayClose, allDeadClose, db.Close, ) pub := func(topic string) uuid.UUID { g.TErrorIf(take(topic, consumer)) unsentWithTopic := unsent unsentWithTopic.Topic = topic message, err := publish(unsentWithTopic, uuid.New()) g.TErrorIf(err) return message.uuid } collectAll := func( topic string, consumer string, ) ([]deadletterT, []messageT) { var ( deadletters []deadletterT messages []messageT ) eachFn := func( deadletter deadletterT, message messageT, ) error { deadletters = append(deadletters, deadletter) messages = append(messages, message) return nil } rows, err := allDead(topic, consumer) g.TErrorIf(err) err = deadletterEach(rows, eachFn) g.TErrorIf(err) return deadletters, messages } g.Testing("no entry on empty deadletters", func() { deadletterIDs, _ := collectAll(topic, consumer) g.TAssertEqual(len(deadletterIDs), 0) }) g.Testing("deadletters on other topics don't show up", func() { err := toDead(consumer, pub("other topic"), uuid.New()) g.TErrorIf(err) deadletters, _ := collectAll(topic, consumer) g.TAssertEqual(len(deadletters), 0) }) g.Testing("deadletters of other consumers don't show up", func() { g.TErrorIf(take(topic, "other consumer")) err := toDead("other consumer", pub(topic), uuid.New()) g.TErrorIf(err) deadletterIDs, _ := collectAll(topic, consumer) g.TAssertEqual(len(deadletterIDs), 0) }) g.Testing("deadletters are given in order", func() { deadletterIDs := []uuid.UUID{ uuid.New(), uuid.New(), uuid.New(), } messageIDs := []uuid.UUID{ pub(topic), pub(topic), pub(topic), } err1 := toDead(consumer, messageIDs[0], deadletterIDs[0]) err2 := toDead(consumer, messageIDs[1], deadletterIDs[1]) err3 := toDead(consumer, messageIDs[2], deadletterIDs[2]) g.TErrorIf(g.SomeError(err1, err2, err3)) deadletters, messages := collectAll(topic, consumer) g.TAssertEqual(deadletters[0].uuid, deadletterIDs[0]) g.TAssertEqual(deadletters[1].uuid, deadletterIDs[1]) g.TAssertEqual(deadletters[2].uuid, deadletterIDs[2]) g.TAssertEqual( messages[0].uuid, messageIDs[0]) g.TAssertEqual( messages[1].uuid, messageIDs[1]) g.TAssertEqual( messages[2].uuid, messageIDs[2]) g.TAssertEqual(len(deadletters), 3) g.TAssertEqual(len(messages), 3) }) g.Testing("after being replayed, they stop appearing", func() { deadletters, _ := collectAll(topic, consumer) g.TAssertEqual(len(deadletters), 3) _, err := replay(deadletters[0].uuid, uuid.New()) g.TErrorIf(err) collecteds, _ := collectAll(topic, consumer) g.TAssertEqual(len(collecteds), 2) _, err = replay(deadletters[1].uuid, uuid.New()) g.TErrorIf(err) collecteds, _ = collectAll(topic, consumer) g.TAssertEqual(len(collecteds), 1) _, err = replay(deadletters[2].uuid, uuid.New()) g.TErrorIf(err) collecteds, _ = collectAll(topic, consumer) g.TAssertEqual(len(collecteds), 0) }) } func test_sizeStmt() { g.TestStart("sizeStmt()") const ( topic = "size() topic" payloadStr = "size() payload" consumer = "size() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) replay, replayClose, replayErr := replayStmt(cfg) oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) size, sizeClose, sizeErr := sizeStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, toDeadErr, replayErr, oneDeadErr, sizeErr, )) defer g.SomeFnError( takeClose, publishClose, toDeadClose, replayClose, sizeClose, oneDeadClose, db.Close, ) pub := func(topic string) uuid.UUID { g.TErrorIf(take(topic, consumer)) unsentWithTopic := unsent unsentWithTopic.Topic = topic message, err := publish(unsentWithTopic, uuid.New()) g.TErrorIf(err) return message.uuid } g.Testing("0 on empty topic", func() { n, err := size(topic) g.TErrorIf(err) g.TAssertEqual(n, 0) }) g.Testing("other topics don't fall into our count", func() { pub("other topic") n, err := size(topic) g.TErrorIf(err) g.TAssertEqual(n, 0) }) g.Testing("otherwise we just get the sum", func() { pub(topic) pub(topic) pub(topic) pub(topic) pub(topic) n, err := size(topic) g.TErrorIf(err) g.TAssertEqual(n, 5) }) g.Testing("deadletters aren't taken into account", func() { sixthMessageID := pub(topic) err := toDead(consumer, sixthMessageID, uuid.New()) g.TErrorIf(err) n, err := size(topic) g.TErrorIf(err) g.TAssertEqual(n, 6) }) g.Testing("after replay, deadletters increases the size", func() { deadletter, err := oneDead(topic, consumer) g.TErrorIf(err) _, err = replay(deadletter.uuid, uuid.New()) g.TErrorIf(err) n, err := size(topic) g.TErrorIf(err) g.TAssertEqual(n, 7) }) } func test_countStmt() { g.TestStart("countStmt()") const ( topic = "count() topic" payloadStr = "count() payload" consumer = "count() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) next, nextClose, nextErr := nextStmt(cfg) commit, commitClose, commitErr := commitStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) count, countClose, countErr := countStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, nextErr, commitErr, toDeadErr, countErr, )) defer g.SomeFnError( takeClose, publishClose, nextClose, commitClose, toDeadClose, countClose, db.Close, ) pub := func(topic string) uuid.UUID { g.TErrorIf(take(topic, consumer)) unsentWithTopic := unsent unsentWithTopic.Topic = topic message, err := publish(unsentWithTopic, uuid.New()) g.TErrorIf(err) return message.uuid } g.Testing("0 on empty topic", func() { n, err := count(topic, consumer) g.TErrorIf(err) g.TAssertEqual(n, 0) }) g.Testing("other topics don't add to our count", func() { err := commit(consumer, pub("other topic")) g.TErrorIf(err) n, err := count(topic, consumer) g.TErrorIf(err) g.TAssertEqual(n, 0) }) g.Testing("other consumers don't influence our count", func() { g.TErrorIf(take(topic, "other consumer")) err := commit("other consumer", pub(topic)) g.TErrorIf(err) n, err := count(topic, consumer) g.TErrorIf(err) g.TAssertEqual(n, 0) }) g.Testing("unconsumed messages don't count", func() { pub(topic) pub(topic) pub(topic) n, err := count(topic, consumer) g.TErrorIf(err) g.TAssertEqual(n, 0) }) g.Testing("consumed messages do count", func() { message, err := next(topic, consumer) g.TErrorIf(err) err = commit(consumer, message.uuid) g.TErrorIf(err) message, err = next(topic, consumer) g.TErrorIf(err) err = commit(consumer, message.uuid) g.TErrorIf(err) message, err = next(topic, consumer) g.TErrorIf(err) err = commit(consumer, message.uuid) g.TErrorIf(err) n, err := count(topic, consumer) g.TErrorIf(err) g.TAssertEqual(n, 3) }) g.Testing("deadletters count as consumed", func() { message, err := next(topic, consumer) g.TErrorIf(err) err = toDead(consumer, message.uuid, uuid.New()) g.TErrorIf(err) n, err := count(topic, consumer) g.TErrorIf(err) g.TAssertEqual(n, 4) }) } func test_hasDataStmt() { g.TestStart("hasDataStmt()") const ( topic = "hasData() topic" payloadStr = "hasData() payload" consumer = "hasData() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) cfg := dbconfigT{ shared: db, dbpath: dbpath, prefix: prefix, instanceID: instanceID, } take, takeClose, takeErr := takeStmt(cfg) publish, publishClose, publishErr := publishStmt(cfg) next, nextClose, nextErr := nextStmt(cfg) commit, commitClose, commitErr := commitStmt(cfg) toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) hasData, hasDataClose, hasDataErr := hasDataStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, nextErr, commitErr, toDeadErr, hasDataErr, )) defer g.SomeFnError( takeClose, publishClose, nextClose, commitClose, toDeadClose, hasDataClose, db.Close, ) pub := func(topic string) uuid.UUID { g.TErrorIf(take(topic, consumer)) unsentWithTopic := unsent unsentWithTopic.Topic = topic message, err := publish(unsentWithTopic, uuid.New()) g.TErrorIf(err) return message.uuid } g.Testing("false on empty topic", func() { has, err := hasData(topic, consumer) g.TErrorIf(err) g.TAssertEqual(has, false) }) g.Testing("other topics don't change the response", func() { pub("other topic") has, err := hasData(topic, consumer) g.TErrorIf(err) g.TAssertEqual(has, false) }) g.Testing("published messages flip the flag", func() { pub(topic) has, err := hasData(topic, consumer) g.TErrorIf(err) g.TAssertEqual(has, true) }) g.Testing("other consumers don't influence us", func() { g.TErrorIf(take(topic, "other consumer")) message, err := next(topic, "other consumer") g.TErrorIf(err) err = commit("other consumer", message.uuid) g.TErrorIf(err) has, err := hasData(topic, consumer) g.TErrorIf(err) g.TAssertEqual(has, true) }) g.Testing("consuming messages unflips the result", func() { message, err := next(topic, consumer) g.TErrorIf(err) err = commit(consumer, message.uuid) g.TErrorIf(err) has, err := hasData(topic, consumer) g.TErrorIf(err) g.TAssertEqual(has, false) }) g.Testing("same for deadletters", func() { has0, err := hasData(topic, consumer) g.TErrorIf(err) g.TAssertEqual(has0, false) messageID1 := pub(topic) messageID2 := pub(topic) has1, err := hasData(topic, consumer) g.TErrorIf(err) g.TAssertEqual(has1, true) err = toDead(consumer, messageID1, uuid.New()) g.TErrorIf(err) err = commit(consumer, messageID2) g.TErrorIf(err) has2, err := hasData(topic, consumer) g.TErrorIf(err) g.TAssertEqual(has2, false) }) } func test_initDB() { g.TestStart("initDB()") const ( topic = "initDB() topic" payloadStr = "initDB() payload" consumer = "initDB() consumer" dbpath = golite.InMemory prefix = defaultPrefix ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) var messages []messageT notifyFn := func(message messageT) { messages = append(messages, message) } queries, err := initDB(dbpath, prefix, notifyFn, instanceID) g.TErrorIf(err) defer queries.close() g.TErrorIf(queries.take(topic, consumer)) g.Testing("we can perform all the wrapped operations", func() { messageID := uuid.New() newMessageID := uuid.New() deadletterID := uuid.New() messageV1, err := queries.publish(unsent, messageID) g.TErrorIf(err) messageV2, err := queries.next(topic, consumer) g.TErrorIf(err) var messagesV3 []messageT pendingFn := func(message messageT) error { messagesV3 = append(messagesV3, message) return nil } err = queries.pending(topic, consumer, pendingFn) g.TErrorIf(err) g.TAssertEqual(len(messagesV3), 1) messageV3 := messagesV3[0] err = queries.toDead(consumer, messageID, deadletterID) g.TErrorIf(err) deadletterV1, err := queries.oneDead(topic, consumer) g.TErrorIf(err) g.TAssertEqual(deadletterV1.uuid, deadletterID) var ( deadlettersV2 []deadletterT messagesV4 []messageT ) deadletterFn := func( deadletter deadletterT, message messageT, ) error { deadlettersV2 = append(deadlettersV2, deadletter) messagesV4 = append(messagesV4, message) return nil } err = queries.allDead(topic, consumer, deadletterFn) g.TErrorIf(err) g.TAssertEqual(len(deadlettersV2), 1) g.TAssertEqual(deadlettersV2[0].uuid, deadletterID) g.TAssertEqual(len(messagesV4), 1) messageV4 := messagesV4[0] g.TAssertEqual(messageV1, messageV2) g.TAssertEqual(messageV1, messageV3) g.TAssertEqual(messageV1, messageV4) newMessageV1, err := queries.replay(deadletterID, newMessageID) g.TErrorIf(err) err = queries.commit(consumer, newMessageID) g.TErrorIf(err) newMessageV0 := messageV1 newMessageV0.id = newMessageV1.id newMessageV0.uuid = newMessageID newMessageV0.timestamp = newMessageV1.timestamp g.TAssertEqual(newMessageV1, newMessageV0) size, err := queries.size(topic) g.TErrorIf(err) g.TAssertEqual(size, 2) count, err := queries.count(topic, consumer) g.TErrorIf(err) g.TAssertEqual(count, 2) hasData, err := queries.hasData(topic, consumer) g.TErrorIf(err) g.TAssertEqual(hasData, false) }) } func test_queriesTclose() { g.TestStart("queriesT.close()") const ( dbpath = golite.InMemory prefix = defaultPrefix ) notifyFn := func(messageT) {} queries, err := initDB(dbpath, prefix, notifyFn, instanceID) g.TErrorIf(err) g.Testing("closing more than once does not error", func() { g.TErrorIf(g.SomeError( queries.close(), queries.close(), )) }) } func test_newPinger() { g.TestStart("newPinger()") g.Testing("onPing() on a closed pinger is a noop", func() { pinger := newPinger[string]() pinger.close() pinger.onPing(func(s string) { panic(s) }) pinger.tryPing("ignored") }) g.Testing("onPing() on a closed pinger with data gets that", func() { pinger := newPinger[string]() pinger.tryPing("received") pinger.tryPing("ignored") g.TAssertEqual(pinger.closed(), false) pinger.close() g.TAssertEqual(pinger.closed(), true) c := make(chan string) count := 0 go pinger.onPing(func(s string) { if count > 0 { panic(s) } count++ c <- s }) given := <- c g.TAssertEqual(given, "received") }) g.Testing("when onPing is late, we loose messages", func() { pinger := newPinger[string]() pinger.tryPing("first seen") pinger.tryPing("second dropped") pinger.tryPing("third dropped") c := make(chan string) go pinger.onPing(func(s string) { c <- s }) s := <- c close(c) pinger.close() g.TAssertEqual(s, "first seen") }) g.Testing("if onPing is on time, it may not loose", func() { pinger := newPinger[string]() pinger.tryPing("first seen") c := make(chan string) go pinger.onPing(func(s string) { c <- s }) s1 := <- c pinger.tryPing("second seen") s2 := <- c close(c) pinger.close() g.TAssertEqual(s1, "first seen") g.TAssertEqual(s2, "second seen") }) g.Testing("if onPing takes too long, it still looses messages", func() { pinger := newPinger[string]() pinger.tryPing("first seen") c1 := make(chan string) c2 := make(chan struct{}) go pinger.onPing(func(s string) { c1 <- s c2 <- struct{}{} }) s1 := <- c1 pinger.tryPing("second seen") pinger.tryPing("third dropped") <- c2 s2 := <- c1 <- c2 close(c2) close(c1) pinger.close() g.TAssertEqual(s1, "first seen") g.TAssertEqual(s2, "second seen") }) } func test_makeSubscriptionsFunc() { g.TestStart("makeSubscriptionsFunc()") g.Testing("we can have multiple readers", func() { subscriptions := makeSubscriptionsFuncs() var ( readStarted sync.WaitGroup readFinished sync.WaitGroup ) c := make(chan struct{}) readFn := func(subscriptionsSetM) error { readStarted.Done() <- c return nil } addReader := func() { readStarted.Add(1) readFinished.Add(1) go func() { subscriptions.read(readFn) readFinished.Done() }() } addReader() addReader() addReader() readStarted.Wait() c <- struct{}{} c <- struct{}{} c <- struct{}{} readFinished.Wait() }) g.Testing("writers are exclusive", func() { subscriptions := makeSubscriptionsFuncs() var ( readStarted sync.WaitGroup readFinished sync.WaitGroup writeWillStart sync.WaitGroup writeFinished sync.WaitGroup ) c := make(chan string) readFn := func(subscriptionsSetM) error { readStarted.Done() c <- "reader" return nil } addReader := func() { readStarted.Add(1) readFinished.Add(1) go func() { subscriptions.read(readFn) readFinished.Done() }() } writeFn := func(subscriptionsSetM) error { c <- "writer" return nil } addWriter := func() { writeWillStart.Add(1) writeFinished.Add(1) go func() { writeWillStart.Done() subscriptions.write(writeFn) writeFinished.Done() }() } addReader() addReader() addReader() readStarted.Wait() addWriter() writeWillStart.Wait() g.TAssertEqual(<-c, "reader") g.TAssertEqual(<-c, "reader") g.TAssertEqual(<-c, "reader") readFinished.Wait() g.TAssertEqual(<-c, "writer") writeFinished.Wait() }) } func test_makeNotifyFn() { g.TestStart("makeNotifyFn()") g.Testing("when topic is nil only top pinger gets pinged", func() { pinger1 := newPinger[struct{}]() pinger2 := newPinger[[]byte]() defer pinger1.close() defer pinger2.close() go pinger1.onPing(func(struct{}) { panic("consumer pinger") }) go pinger2.onPing(func(payload []byte) { panic("waiter pinger") }) flowID := uuid.New() set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer-1": consumerT{ pinger: pinger1, }, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{ "waiter-1": waiterT{ pinger: pinger2, }, }, }, }, } subsFn := func(fn func(subscriptionsSetM) error) error { return fn(set) } topPinger := newPinger[struct{}]() defer topPinger.close() var wg sync.WaitGroup wg.Add(1) go topPinger.onPing(func(struct{}) { wg.Done() }) notifyFn := makeNotifyFn(subsFn, topPinger) message := messageT{ uuid: uuid.New(), topic: "nobody is subscribed to this one", payload: []byte("nobody with get this payload"), } notifyFn(message) wg.Wait() }) g.Testing("otherwise all interested subscribers get pinged", func() { const topic = "the topic name" pinger1 := newPinger[struct{}]() pinger2 := newPinger[[]byte]() defer pinger1.close() defer pinger2.close() var wg sync.WaitGroup go pinger1.onPing(func(struct{}) { wg.Done() }) go pinger2.onPing(func([]byte) { wg.Done() }) wg.Add(2) flowID := uuid.New() set := subscriptionsSetM{ topic: topicSubscriptionT{ consumers: map[string]consumerT{ "consumer-1": consumerT{ pinger: pinger1, }, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{ "waiter-1": waiterT{ pinger: pinger2, }, }, }, }, } subsFn := func(fn func(subscriptionsSetM) error) error { return fn(set) } topPinger := newPinger[struct{}]() defer topPinger.close() go topPinger.onPing(func(struct{}) { wg.Done() }) wg.Add(1) notifyFn := makeNotifyFn(subsFn, topPinger) message := messageT{ uuid: uuid.New(), topic: topic, flowID: flowID, payload: []byte("ignored in this test"), } notifyFn(message) wg.Wait() }) } func test_collectClosedWaiters() { g.TestStart("collectClosedWaiter()") g.Testing("collects all the reports to be closed", func() { flowID1 := uuid.New() flowID2 := uuid.New() flowID3 := uuid.New() flowID4 := uuid.New() flowID5 := uuid.New() mkwaiter := func(closed bool) waiterT { fn := func() bool { return closed } return waiterT{ closed: &fn, } } set := subscriptionsSetM{ "topic-1": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{ "waiter-1": mkwaiter(false), "waiter-2": mkwaiter(true), "waiter-3": mkwaiter(true), }, flowID2: map[string]waiterT{ "waiter-4": mkwaiter(true), "waiter-5": mkwaiter(false), }, }, }, "topic-2": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID3: map[string]waiterT{ "waiter-1": mkwaiter(false), "waiter-2": mkwaiter(false), }, flowID4: map[string]waiterT{ "waiter-3": mkwaiter(true), "waiter-4": mkwaiter(true), "waiter-5": mkwaiter(true), "waiter-6": mkwaiter(true), }, }, }, "topic-3": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID5: map[string]waiterT{}, }, }, "topic-4": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{}, }, } expected := map[string]map[uuid.UUID][]string{ "topic-1": map[uuid.UUID][]string{ flowID1: []string{ "waiter-2", "waiter-3", }, flowID2: []string{ "waiter-4", }, }, "topic-2": map[uuid.UUID][]string{ flowID3: []string{}, flowID4: []string{ "waiter-3", "waiter-4", "waiter-5", "waiter-6", }, }, "topic-3": map[uuid.UUID][]string{ flowID5: []string{}, }, "topic-4": map[uuid.UUID][]string{}, } given := collectClosedWaiters(set) // sort names for equality for _, waitersIndex := range given { for _, names := range waitersIndex { sort.Strings(names) } } g.TAssertEqual(given, expected) }) } func test_trimEmptyLeaves() { g.TestStart("trimEmptyLeaves()") g.Testing("noop on an empty index", func() { input := map[string]map[uuid.UUID][]string{} expected := map[string]map[uuid.UUID][]string{} trimEmptyLeaves(input) g.TAssertEqual(input, expected) }) g.Testing("simplifies tree when it can", func() { flowID1 := uuid.New() flowID2 := uuid.New() flowID3 := uuid.New() flowID4 := uuid.New() input := map[string]map[uuid.UUID][]string{ "topic-1": map[uuid.UUID][]string{ flowID1: []string{ "waiter-1", }, flowID2: []string{}, }, "topic-2": map[uuid.UUID][]string{ flowID3: []string{}, flowID4: []string{}, }, "topic-3": map[uuid.UUID][]string{}, } expected := map[string]map[uuid.UUID][]string{ "topic-1": map[uuid.UUID][]string{ flowID1: []string{ "waiter-1", }, }, } trimEmptyLeaves(input) g.TAssertEqual(input, expected) }) g.Testing("fully prune tree if possible", func() { flowID1 := uuid.New() flowID2 := uuid.New() flowID3 := uuid.New() flowID4 := uuid.New() flowID5 := uuid.New() input := map[string]map[uuid.UUID][]string{ "topic-1": map[uuid.UUID][]string{}, "topic-2": map[uuid.UUID][]string{}, "topic-3": map[uuid.UUID][]string{}, "topic-4": map[uuid.UUID][]string{ flowID1: []string{}, }, "topic-5": map[uuid.UUID][]string{ flowID2: []string{}, flowID3: []string{}, flowID4: []string{}, flowID5: []string{}, }, } expected := map[string]map[uuid.UUID][]string{} trimEmptyLeaves(input) g.TAssertEqual(input, expected) }) } func test_deleteIfEmpty() { g.TestStart("deleteIfEmpty()") g.Testing("noop if there are consumers", func() { set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, }, } expected1 := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, }, } expected2 := subscriptionsSetM{} deleteIfEmpty(set, "topic") g.TAssertEqual(set, expected1) delete(set["topic"].consumers, "consumer") deleteIfEmpty(set, "topic") g.TAssertEqual(set, expected2) }) g.Testing("noop if there are waiters", func() { flowID := uuid.New() set := subscriptionsSetM{ "topic": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID: nil, }, }, } expected1 := subscriptionsSetM{ "topic": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID: nil, }, }, } expected2 := subscriptionsSetM{} deleteIfEmpty(set, "topic") g.TAssertEqual(set, expected1) delete(set["topic"].waiters, flowID) deleteIfEmpty(set, "topic") g.TAssertEqual(set, expected2) }) g.Testing("otherwise deletes when empty", func() { set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{}, }, } expected := subscriptionsSetM{} deleteIfEmpty(set, "topic") g.TAssertEqual(set, expected) }) g.Testing("unrelated topics are left untouched", func() { set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{}, }, } expected := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{}, }, } deleteIfEmpty(set, "another-topic") g.TAssertEqual(set, expected) }) } func test_deleteEmptyTopics() { g.TestStart("deleteEmptyTopics()") g.Testing("cleans up all empty topics from the set", func() { flowID1 := uuid.New() flowID2 := uuid.New() set := subscriptionsSetM{ "empty": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{}, }, "has-consumers": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{}, }, "has-waiters": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID1: nil, }, }, "has-both": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID2: nil, }, }, "has-neither": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{}, }, } expected := subscriptionsSetM{ "has-consumers": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{}, }, "has-waiters": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID1: nil, }, }, "has-both": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID2: nil, }, }, } deleteEmptyTopics(set) g.TAssertEqual(set, expected) }) } func test_removeClosedWaiter() { g.TestStart("removeClosedWaiter()") g.Testing("removes from set all that we request", func() { flowID0 := uuid.New() flowID1 := uuid.New() flowID2 := uuid.New() flowID3 := uuid.New() set := subscriptionsSetM{ "topic-1": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{ "waiter-1": waiterT{}, "waiter-2": waiterT{}, }, flowID2: map[string]waiterT{ "waiter-3": waiterT{}, "waiter-4": waiterT{}, "waiter-5": waiterT{}, }, }, }, "topic-2": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID3: map[string]waiterT{ "waiter-6": waiterT{}, "waiter-7": waiterT{}, "waiter-8": waiterT{}, }, }, }, "topic-3": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{}, }, } input := map[string]map[uuid.UUID][]string{ "topic-0": map[uuid.UUID][]string{ flowID0: []string{ "waiter-0", }, }, "topic-1": map[uuid.UUID][]string{ flowID1: []string{ "waiter-2", }, flowID2: []string{ "waiter-3", "waiter-4", "waiter-5", }, }, "topic-2": map[uuid.UUID][]string{ flowID3: []string{ "waiter-6", "waiter-7", "waiter-8", }, }, } expected := subscriptionsSetM{ "topic-1": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{ "waiter-1": waiterT{}, }, }, }, } removeClosedWaiters(set, input) g.TAssertEqual(set, expected) }) g.Testing("empty flowIDs from input GET LEAKED", func() { flowID1 := uuid.New() flowID2 := uuid.New() input := map[string]map[uuid.UUID][]string{ "topic-2": map[uuid.UUID][]string{ flowID2: []string{ "waiter", }, }, } set := subscriptionsSetM{ "topic-1": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{}, }, }, "topic-2": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID2: map[string]waiterT{ "waiter": waiterT{}, }, }, }, } expected := subscriptionsSetM{ "topic-1": topicSubscriptionT{ waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{}, }, }, } removeClosedWaiters(set, input) g.TAssertEqual(set, expected) }) } func test_reapClosedWaiters() { g.TestStart("reapClosedWaiters()") g.Testing("trimEmptyLeaves() prevents removal of empty flows", func() { var ( set subscriptionsSetM readCount = 0 writeCount = 0 ) readFn := func(fn func(subscriptionsSetM) error) error { readCount++ return fn(set) } writeFn := func(fn func(subscriptionsSetM) error) error { writeCount++ return fn(set) } openFn := func() bool { return false } closedFn := func() bool { return true } open := waiterT{ closed: &openFn } closed := waiterT{ closed: &closedFn } flowID1 := uuid.New() flowID2 := uuid.New() set = subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{ "waiter-1": open, "waiter-2": open, "waiter-3": open, }, flowID2: map[string]waiterT{ "waiter-4": open, }, }, }, } expected1 := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{ "waiter-1": open, "waiter-2": open, "waiter-3": open, }, flowID2: map[string]waiterT{ "waiter-4": open, }, }, }, } expected2 := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{ "waiter-2": open, "waiter-3": open, }, flowID2: map[string]waiterT{ "waiter-4": open, }, }, }, } expected3 := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{ "waiter-2": open, "waiter-3": open, }, }, }, } reapClosedWaiters(readFn, writeFn) g.TAssertEqual(readCount, 1) g.TAssertEqual(writeCount, 0) g.TAssertEqual(set, expected1) set["topic"].waiters[flowID1]["waiter-1"] = closed reapClosedWaiters(readFn, writeFn) g.TAssertEqual(readCount, 2) g.TAssertEqual(writeCount, 1) g.TAssertEqual(set, expected2) set["topic"].waiters[flowID2]["waiter-4"] = closed reapClosedWaiters(readFn, writeFn) g.TAssertEqual(readCount, 3) g.TAssertEqual(writeCount, 2) g.TAssertEqual(set, expected3) }) } func test_everyNthCall() { g.TestStart("everyNthCall()") g.Testing("0 (incorrect) would make fn never be called", func() { never := everyNthCall[int](0, func(int) { panic("unreachable") }) for i := 0; i < 100; i++ { never(i) } }) g.Testing("the first call is delayed to be the nth", func() { count := 0 values := []string{} fn := everyNthCall[string](3, func(s string) { count++ values = append(values, s) }) fn("1 ignored") g.TAssertEqual(count, 0) fn("2 ignored") g.TAssertEqual(count, 0) fn("3 not ignored") g.TAssertEqual(count, 1) fn("4 ignored") fn("5 ignored") g.TAssertEqual(count, 1) fn("6 not ignored") fn("7 ignored") fn("8 ignored") g.TAssertEqual(count, 2) fn("9 not ignored") g.TAssertEqual(count, 3) expected := []string{ "3 not ignored", "6 not ignored", "9 not ignored", } g.TAssertEqual(values, expected) }) } func test_runReaper() { g.TestStart("runReaper()") g.Testing("debounce reapClosedWaiters `reaperSkipCount` times", func() { set := subscriptionsSetM{} var ( readCount = 0 writeCount = 0 ) readFn := func(fn func(subscriptionsSetM) error) error { readCount++ return fn(set) } writeFn := func(fn func(subscriptionsSetM) error) error { writeCount++ return fn(set) } var iterCount int onPing := func(fn func(struct{})) { for i := 0; i < iterCount; i++ { fn(struct{}{}) } } iterCount = reaperSkipCount - 1 runReaper(onPing, readFn, writeFn) g.TAssertEqual(readCount, 0) g.TAssertEqual(writeCount, 0) iterCount = reaperSkipCount runReaper(onPing, readFn, writeFn) g.TAssertEqual(readCount, 1) g.TAssertEqual(writeCount, 0) }) } func test_NewWithPrefix() { g.TestStart("NewWithPrefix()") g.Testing("we get an error with a bad prefix", func() { _, err := NewWithPrefix(golite.InMemory, "a bad prefix") g.TAssertEqual(err, g.ErrBadSQLTablePrefix) }) g.Testing("otherwise we have a queueT and no errors", func() { queue, err := NewWithPrefix(golite.InMemory, "good") g.TErrorIf(err) queue.Close() g.TAssertEqual(queue.(queueT).pinger.closed(), true) }) } func test_New() { g.TestStart("New()") g.Testing("smoke test that we get a queueT", func() { queue, err := New(golite.InMemory) g.TErrorIf(err) queue.Close() g.TAssertEqual(queue.(queueT).pinger.closed(), true) }) } func test_asPublicMessage() { g.TestStart("asPublicMessage()") g.Testing("it picks the correct fields 🤷", func() { input := messageT{ uuid: uuid.New(), timestamp: time.Now(), topic: "topic", flowID: uuid.New(), payload: []byte("payload"), } expected := Message{ ID: input.uuid, Timestamp: input.timestamp, Topic: input.topic, FlowID: input.flowID, Payload: input.payload, } given := asPublicMessage(input) g.TAssertEqual(given, expected) }) } func test_queueT_Publish() { g.TestStart("queueT.Publish()") const ( topic = "queueT.Publish() topic" payloadStr = "queueT.Publish() payload" dbpath = golite.InMemory ) var ( flowID = uuid.New() payload = []byte(payloadStr) unsent = UnsentMessage{ Topic: topic, FlowID: flowID, Payload: payload, } ) queue, err := New(dbpath) g.TErrorIf(err) defer queue.Close() g.Testing("it just publishes and returns", func() { message, err := queue.Publish(unsent) g.TErrorIf(err) g.TAssertEqual(message.Timestamp == time.Time{}, false) g.TAssertEqual(message.Topic, topic) g.TAssertEqual(message.FlowID, flowID) g.TAssertEqual(message.Payload, payload) }) queue.Close() g.TAssertEqual(queue.(queueT).pinger.closed(), true) } func test_registerConsumerFn() { g.TestStart("registerConsumerFn()") g.Testing("adds a new topicSubscriptionT{} if needed", func() { consumer := consumerT{ data: consumerDataT{ topic: "topic", name: "consumer", }, } set := subscriptionsSetM{} expected := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumer, }, waiters: map[uuid.UUID]map[string]waiterT{}, }, } registerConsumerFn(consumer)(set) g.TAssertEqual(set, expected) }) g.Testing("otherwise it just uses what exists", func() { flowID := uuid.New() consumer := consumerT{ data: consumerDataT{ topic: "topic", name: "consumer", }, } set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "other-consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{}, }, }, } expected := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumer, "other-consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{}, }, }, } registerConsumerFn(consumer)(set) g.TAssertEqual(set, expected) }) g.Testing("overwrites existing consumer if desired", func() { close1 := func() {} consumer1 := consumerT{ data: consumerDataT{ topic: "topic", name: "consumer", }, close: &close1, } close2 := func() {} consumer2 := consumerT{ data: consumerDataT{ topic: "topic", name: "consumer", }, close: &close2, } set := subscriptionsSetM{} expected1 := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumer1, }, waiters: map[uuid.UUID]map[string]waiterT{}, }, } expected2 := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumer2, }, waiters: map[uuid.UUID]map[string]waiterT{}, }, } registerConsumerFn(consumer1)(set) g.TAssertEqual(set, expected1) g.TAssertEqual(reflect.DeepEqual(set, expected2), false) registerConsumerFn(consumer2)(set) g.TAssertEqual(set, expected2) g.TAssertEqual(reflect.DeepEqual(set, expected1), false) }) } func test_registerWaiterFn() { g.TestStart("registerWaiterFn()") g.Testing("adds a new topicSubscriptionT{} if needed", func() { flowID := uuid.New() waiter := waiterT{ data: waiterDataT{ topic: "topic", flowID: flowID, name: "waiter", }, } set := subscriptionsSetM{} expected := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{ "waiter": waiter, }, }, }, } registerWaiterFn(waiter)(set) g.TAssertEqual(set, expected) }) g.Testing("adds a new waiters map if needed", func() { flowID := uuid.New() waiter := waiterT{ data: waiterDataT{ topic: "topic", flowID: flowID, name: "waiter", }, } set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{}, }, } expected := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{ "waiter": waiter, }, }, }, } registerWaiterFn(waiter)(set) g.TAssertEqual(set, expected) }) g.Testing("otherwise it just uses what exists", func() { flowID := uuid.New() waiter := waiterT{ data: waiterDataT{ topic: "topic", flowID: flowID, name: "waiter", }, } set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{ "other-waiter": waiterT{}, }, }, }, } expected := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{ "waiter": waiter, "other-waiter": waiterT{}, }, }, }, } registerWaiterFn(waiter)(set) g.TAssertEqual(set, expected) }) g.Testing("overwrites existing waiter if desired", func() { flowID := uuid.New() close1 := func() {} waiter1 := waiterT{ data: waiterDataT{ topic: "topic", flowID: flowID, name: "waiter", }, close: &close1, } close2 := func() {} waiter2 := waiterT{ data: waiterDataT{ topic: "topic", flowID: flowID, name: "waiter", }, close: &close2, } set := subscriptionsSetM{} expected1 := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{ "waiter": waiter1, }, }, }, } expected2 := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{ "waiter": waiter2, }, }, }, } registerWaiterFn(waiter1)(set) g.TAssertEqual(set, expected1) g.TAssertEqual(reflect.DeepEqual(set, expected2), false) registerWaiterFn(waiter2)(set) g.TAssertEqual(set, expected2) g.TAssertEqual(reflect.DeepEqual(set, expected1), false) }) } func test_makeConsumeOneFn() { g.TestStart("makeConsumeOneFn()") savedLogger := slog.Default() s := new(strings.Builder) g.SetLoggerOutput(s) var ( successCount int errorCount int callbackErr error successFnErr error errorFnErr error messages []Message successNames []string successIDs []uuid.UUID errorNames []string errorIDs []uuid.UUID deadletterIDs []uuid.UUID ) data := consumerDataT{ topic: "topic", name: "name", } callback := func(message Message) error { messages = append(messages, message) return callbackErr } successFn := func(name string, messageID uuid.UUID) error { successCount++ successNames = append(successNames, name) successIDs = append(successIDs, messageID) return successFnErr } errorFn := func( name string, messageID uuid.UUID, deadletterID uuid.UUID, ) error { errorCount++ errorNames = append(errorNames, name) errorIDs = append(errorIDs, messageID) deadletterIDs = append(deadletterIDs, deadletterID) return errorFnErr } consumeOneFn := makeConsumeOneFn( data, callback, successFn, errorFn, ) message1 := messageT{ uuid: uuid.New() } message2 := messageT{ uuid: uuid.New() } message3 := messageT{ uuid: uuid.New() } message4 := messageT{ uuid: uuid.New() } g.Testing("error from successFn() is propagated", func() { err := consumeOneFn(message1) g.TErrorIf(err) g.TAssertEqual(successCount, 1) g.TAssertEqual(errorCount, 0) successFnErr = errors.New("successFn() error") err = consumeOneFn(message2) g.TAssertEqual(err, successFnErr) g.TAssertEqual(successCount, 2) g.TAssertEqual(errorCount, 0) g.TAssertEqual(s.String(), "") }) g.Testing("error from callback() is logged and dropped", func() { *s = strings.Builder{} callbackErr = errors.New("callback() error") err := consumeOneFn(message3) g.TErrorIf(err) g.TAssertEqual(successCount, 2) g.TAssertEqual(errorCount, 1) g.TAssertEqual(s.String() == "", false) }) g.Testing("error from errorFn() is propagated", func() { *s = strings.Builder{} errorFnErr = errors.New("errorFn() error") err := consumeOneFn(message4) g.TAssertEqual(err, errorFnErr) g.TAssertEqual(successCount, 2) g.TAssertEqual(errorCount, 2) g.TAssertEqual(s.String() == "", false) }) g.Testing("calls happened with the expected arguments", func() { expectedMessages := []Message{ asPublicMessage(message1), asPublicMessage(message2), asPublicMessage(message3), asPublicMessage(message4), } g.TAssertEqual(messages, expectedMessages) g.TAssertEqual(successNames, []string{ "name", "name" }) g.TAssertEqual(errorNames, []string{ "name", "name" }) g.TAssertEqual(successIDs, []uuid.UUID{ message1.uuid, message2.uuid, }) g.TAssertEqual(errorIDs, []uuid.UUID{ message3.uuid, message4.uuid, }) g.TAssertEqual(deadletterIDs[0] == message3.uuid, false) g.TAssertEqual(deadletterIDs[1] == message4.uuid, false) }) slog.SetDefault(savedLogger) } func test_makeConsumeAllFn() { g.TestStart("makeConsumeAllFn()") savedLogger := slog.Default() s := new(strings.Builder) g.SetLoggerOutput(s) data := consumerDataT{} consumeOneFn := func(messageT) error { return nil } var eachFnErr error eachFn := func(string, string, func(messageT) error) error { return eachFnErr } consumeAllFn := makeConsumeAllFn(data, consumeOneFn, eachFn) g.Testing("silent when eachFn() returns no error", func() { consumeAllFn(struct{}{}) g.TAssertEqual(s.String(), "") }) g.Testing("logs warning otherwise", func() { eachFnErr = errors.New("eachFn() error") consumeAllFn(struct{}{}) g.TAssertEqual(s.String() == "", false) }) slog.SetDefault(savedLogger) } func test_makeWaitFn() { g.TestStart("makeWaitFn()") g.Testing("all it does is send the data and close things", func() { callCount := 0 closeFn := func() { callCount++ } c := make(chan []byte, 1) waitFn := makeWaitFn(c, closeFn) payload := []byte("payload") waitFn(payload) given := <- c g.TAssertEqual(given, payload) g.TAssertEqual(callCount, 1) }) g.Testing("you can call it twice for cases of race condition", func() { callCount := 0 closeFn := func() { callCount++ } c := make(chan []byte, 1) waitFn := makeWaitFn(c, closeFn) payload := []byte("something") waitFn(payload) waitFn(payload) given1 := <- c given2 := <- c g.TAssertEqual(given1, payload) g.TAssertEqual(given2, []byte(nil)) }) } func test_runConsumer() { g.TestStart("runConsumer()") g.Testing("calls consumeAllFn() at least one", func() { onPing := func(fn func(struct{})) {} count := 0 consumeAllFn := func(struct{}) { count++ } runConsumer(onPing, consumeAllFn) g.TAssertEqual(count, 1) }) g.Testing("can call consumeAllFn() (onPing + 1) times", func() { onPing := func(fn func(struct{})) { fn(struct{}{}) fn(struct{}{}) fn(struct{}{}) } count := 0 consumeAllFn := func(struct{}) { count++ } runConsumer(onPing, consumeAllFn) g.TAssertEqual(count, 4) }) } func test_tryFinding() { g.TestStart("tryFinding()") g.Testing("noop in case of failure", func() { myErr := errors.New("find() error") findFn := func(string, uuid.UUID) (messageT, error) { return messageT{}, myErr } count := 0 waitFn := func([]byte) { count++ } tryFinding(findFn, "topic", uuid.New(), waitFn) g.TAssertEqual(count, 0) }) g.Testing("calls waitFn in case of success", func() { payload := []byte("find() payload") findFn := func(string, uuid.UUID) (messageT, error) { return messageT{ payload: payload }, nil } payloads := [][]byte{} waitFn := func(payload []byte) { payloads = append(payloads, payload) } tryFinding(findFn, "topic", uuid.New(), waitFn) g.TAssertEqual(payloads, [][]byte{ payload }) }) } func test_queueT_Subscribe() { g.TestStart("queueT.Subscribe()") set := subscriptionsSetM{} consumed := []uuid.UUID{} messages := []messageT{ messageT{ uuid: uuid.New() }, messageT{ uuid: uuid.New() }, } var takeErr error queue := queueT{ queries: queriesT{ take: func(string, string) error { return takeErr }, pending: func( topic string, consumer string, fn func(messageT) error, ) error { for _, message := range messages { consumed = append( consumed, message.uuid, ) _ = fn(message) } return nil }, commit: func( consumer string, messageID uuid.UUID, ) error { return nil }, toDead: func(string, uuid.UUID, uuid.UUID) error { g.Unreachable() return nil }, }, subscriptions: subscriptionsT{ write: func(fn func(subscriptionsSetM) error) error { return fn(set) }, }, } g.Testing("registers our callback in the set and calls it", func() { var wg sync.WaitGroup wg.Add(2) queue.Subscribe("topic", "consumer-1", func(Message) error { wg.Done() return nil }) defer queue.Unsubscribe("topic", "consumer-1") wg.Wait() g.TAssertEqual(consumed, []uuid.UUID{ messages[0].uuid, messages[1].uuid, }) }) g.Testing("our callback also gets called when pinged", func() { consumed = []uuid.UUID{} var wg sync.WaitGroup wg.Add(4) queue.Subscribe("topic", "consumer-2", func(m Message) error { wg.Done() return nil }) defer queue.Unsubscribe("topic", "consumer-2") consumer := set["topic"].consumers["consumer-2"] consumer.pinger.tryPing(struct{}{}) wg.Wait() g.TAssertEqual(consumed, []uuid.UUID{ messages[0].uuid, messages[1].uuid, messages[0].uuid, messages[1].uuid, }) }) g.Testing("we try to own the topic", func() { takeErr = errors.New("queueT.Subscribe() 1") err := queue.Subscribe("topic", "name", func(Message) error { g.Unreachable() return nil }) g.TAssertEqual(err, takeErr) takeErr = nil }) g.Testing("if we can't own the topic, we don't get registered", func() { takeErr = errors.New("queueT.Subscribe() 2") err := queue.Subscribe("topic", "name", func(Message) error { g.Unreachable() return nil }) g.TErrorNil(err) expected := subscriptionsSetM{} g.TAssertEqual(set, expected) takeErr = nil }) } func test_queueT_WaitFor() { g.TestStart("queueT.WaitFor()") findErr := errors.New("find() error") message := messageT{ payload: []byte("queueT.WaitFor() payload"), } set := subscriptionsSetM{} queue := queueT{ queries: queriesT{ find: func( topic string, flowID uuid.UUID, ) (messageT, error) { return message, findErr }, }, subscriptions: subscriptionsT{ write: func(fn func(subscriptionsSetM) error) error { return fn(set) }, read: func(fn func(subscriptionsSetM) error) error { return fn(set) }, }, } g.Testing("registers the waiter in the set", func() { flowID := uuid.New() defer queue.WaitFor("topic", flowID, "waiter-1").Close() expected := waiterDataT{ topic: "topic", flowID: flowID, name: "waiter-1", } g.TAssertEqual( set["topic"].waiters[flowID]["waiter-1"].data, expected, ) }) g.Testing("the channel gets a message when waiter is pinged", func() { flowID := uuid.New() payload := []byte("sent payload") w := queue.WaitFor("topic", flowID, "waiter-2") defer w.Close() waiter := set["topic"].waiters[flowID]["waiter-2"] waiter.pinger.tryPing(payload) given := <- w.Channel g.TAssertEqual(given, payload) g.TAssertEqual((*waiter.closed)(), true) }) g.Testing("we can also WaitFor() after publishing the message", func() { findErr = nil flowID := uuid.New() w := queue.WaitFor("topic", flowID, "waiter-3") defer w.Close() given := <- w.Channel waiter := set["topic"].waiters[flowID]["waiter-3"] waiter.pinger.tryPing([]byte("ignored")) g.TAssertEqual(given, message.payload) g.TAssertEqual((*waiter.closed)(), true) }) g.Testing("if the data already exists we get it immediatelly", func() { flowID := uuid.New() w := queue.WaitFor("topic", flowID, "waiter-4") defer w.Close() waiter := set["topic"].waiters[flowID]["waiter-4"] g.TAssertEqual((*waiter.closed)(), true) given := <- w.Channel g.TAssertEqual(given, message.payload) }) } func test_unsubscribeIfExistsFn() { g.TestStart("unsubscribeIfExistsFn()") g.Testing("noop on empty set", func() { set := subscriptionsSetM{} expected := subscriptionsSetM{} unsubscribeIfExistsFn("topic", "consumer")(set) g.TAssertEqual(set, expected) }) g.Testing("noop on missing topic", func() { set := subscriptionsSetM{ "topic": topicSubscriptionT{}, } expected := subscriptionsSetM{ "topic": topicSubscriptionT{}, } unsubscribeIfExistsFn("other-topic", "consumer")(set) g.TAssertEqual(set, expected) }) g.Testing("noop on missing consumer", func() { set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, }, } expected := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{}, }, }, } unsubscribeIfExistsFn("topic", "other-consumer")(set) g.TAssertEqual(set, expected) }) g.Testing("closes consumer and removes it from set", func() { flowID := uuid.New() count := 0 close := func() { count++ } set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{ close: &close, }, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{}, }, }, } expected := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{ flowID: map[string]waiterT{}, }, }, } unsubscribeIfExistsFn("topic", "consumer")(set) g.TAssertEqual(set, expected) g.TAssertEqual(count, 1) }) g.Testing("empty topics are also removed", func() { count := 0 close := func() { count++ } set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{ close: &close, }, }, waiters: map[uuid.UUID]map[string]waiterT{}, }, } expected := subscriptionsSetM{} unsubscribeIfExistsFn("topic", "consumer")(set) g.TAssertEqual(set, expected) g.TAssertEqual(count, 1) }) } func test_queueT_Unsubscribe() { g.TestStart("queueT.Unsubscribe()") g.Testing("calls unsubscribesIfExists() via writeFn()", func() { closed := false close := func() { closed = true } set := subscriptionsSetM{ "topic": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer": consumerT{ close: &close, }, }, waiters: map[uuid.UUID]map[string]waiterT{}, }, } expected := subscriptionsSetM{} queue := queueT{ subscriptions: subscriptionsT{ write: func( fn func(subscriptionsSetM) error, ) error { return fn(set) }, }, } queue.Unsubscribe("topic", "consumer") g.TAssertEqual(set, expected) g.TAssertEqual(closed, true) }) } func test_cleanSubscriptions() { g.TestStart("cleanSubscriptions()") g.Testing("all consumers and waiters get close()'d", func() { flowID1 := uuid.New() flowID2 := uuid.New() flowID3 := uuid.New() type pairT struct{ closed func() bool fn func() } mkclose := func() pairT { closed := false return pairT{ closed: func() bool { return closed }, fn: func() { closed = true }, } } c := mkclose() g.TAssertEqual(c.closed(), false) c.fn() var x bool = c.closed() g.TAssertEqual(x, true) close1 := mkclose() close2 := mkclose() close3 := mkclose() close4 := mkclose() close5 := mkclose() close6 := mkclose() close7 := mkclose() close8 := mkclose() set := subscriptionsSetM{ "topic-1": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer-1": consumerT{ close: &close1.fn, }, "consumer-2": consumerT{ close: &close2.fn, }, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID1: map[string]waiterT{ "waiter-1": waiterT{ close: &close3.fn, }, }, flowID2: map[string]waiterT{ "waiter-2": waiterT{ close: &close4.fn, }, "waiter-3": waiterT{ close: &close5.fn, }, }, }, }, "topic-2": topicSubscriptionT{ consumers: map[string]consumerT{ "consumer-3": consumerT{ close: &close6.fn, }, }, waiters: map[uuid.UUID]map[string]waiterT{ flowID3: map[string]waiterT{ "waiter-4": waiterT{ close: &close7.fn, }, }, }, }, "topic-3": topicSubscriptionT{ consumers: map[string]consumerT{}, waiters: map[uuid.UUID]map[string]waiterT{}, }, } cleanSubscriptions(set) given := []bool{ close1.closed(), close2.closed(), close3.closed(), close4.closed(), close5.closed(), close6.closed(), close7.closed(), close8.closed(), } expected := []bool{ true, true, true, true, true, true, true, false, } g.TAssertEqualI(given, expected) }) } func test_queueT_Close() { g.TestStart("queueT.Close()") g.Testing("clean pinger, subscriptions and queries", func() { var ( pingerCount = 0 subscriptionsCount = 0 queriesCount = 0 subscriptionsErr = errors.New("subscriptionsT{} error") queriesErr = errors.New("queriesT{} error") ) queue := queueT{ queries: queriesT{ close: func() error{ queriesCount++ return queriesErr }, }, subscriptions: subscriptionsT{ write: func( func(subscriptionsSetM) error, ) error { subscriptionsCount++ return subscriptionsErr }, }, pinger: pingerT[struct{}]{ close: func() { pingerCount++ }, }, } err := queue.Close() g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr)) g.TAssertEqual(pingerCount, 1) g.TAssertEqual(subscriptionsCount, 1) g.TAssertEqual(queriesCount, 1) }) } func test_topicGetopt() { g.TestStart("topicGetopt()") g.Testing("checks for required positional argument", func() { var w strings.Builder argsIn := argsT{ args: []string{}, } argsOut, ok := topicGetopt(argsIn, &w) g.TAssertEqual(w.String(), "Missing TOPIC.\n") g.TAssertEqual(ok, false) g.TAssertEqual(argsOut, argsIn) }) g.Testing("success otherwise", func() { var w strings.Builder argsIn := argsT{ args: []string{"a topic"}, } argsOut, ok := topicGetopt(argsIn, &w) g.TAssertEqual(w.String(), "") g.TAssertEqual(ok, true) argsIn.topic = "a topic" g.TAssertEqual(argsOut, argsIn) }) } func test_topicConsumerGetopt() { g.TestStart("topicConsumerGetopt()") g.Testing("checks for TOPIC argument", func() { var w strings.Builder argsIn := argsT{ args: []string{}, } argsOut, ok := topicConsumerGetopt(argsIn, &w) g.TAssertEqual(w.String(), "Missing TOPIC.\n") g.TAssertEqual(ok, false) g.TAssertEqual(argsOut, argsIn) }) g.Testing("we get an error on unsupported flag", func() { var w strings.Builder argsIn := argsT{ args: []string{"-Z"}, } const message = "flag provided but not defined: -Z\n" argsOut, ok := topicConsumerGetopt(argsIn, &w) g.TAssertEqual(w.String(), message) g.TAssertEqual(ok, false) g.TAssertEqual(argsOut, argsIn) }) g.Testing("we also get an error on incorrect usage of flags", func() { var w strings.Builder argsIn := argsT{ args: []string{"-C"}, } const message = "flag needs an argument: -C\n" argsOut, ok := topicConsumerGetopt(argsIn, &w) g.TAssertEqual(w.String(), message) g.TAssertEqual(ok, false) g.TAssertEqual(argsOut, argsIn) }) g.Testing("we can customize the CONSUMER", func() { var w strings.Builder argsIn := argsT{ args: []string{"-C", "custom consumer", "this topic"}, } argsOut, ok := topicConsumerGetopt(argsIn, &w) g.TAssertEqual(w.String(), "") g.TAssertEqual(ok, true) argsIn.topic = "this topic" argsIn.consumer = "custom consumer" g.TAssertEqual(argsOut, argsIn) }) g.Testing("otherwise we get the default one", func() { var w strings.Builder argsIn := argsT{ args: []string{"T"}, } argsOut, ok := topicConsumerGetopt(argsIn, &w) g.TAssertEqual(w.String(), "") g.TAssertEqual(ok, true) argsIn.topic = "T" argsIn.consumer = "default-consumer" g.TAssertEqual(argsOut, argsIn) }) } func test_inExec() { g.TestStart("inExec()") const ( topic = "inExec topic" payloadStr = "inExec payload" ) var ( payload = []byte(payloadStr) args = argsT{ topic: topic, } ) var ( publishErr error messages []messageT id int64 = 0 ) queries := queriesT{ publish: func( unsent UnsentMessage, messageID uuid.UUID, ) (messageT, error) { if publishErr != nil { return messageT{}, publishErr } id++ now := time.Now() message := messageT{ id: id, timestamp: now, uuid: messageID, topic: unsent.Topic, flowID: unsent.FlowID, payload: unsent.Payload, } messages = append(messages, message) return message, nil }, } g.Testing("messageID to output when successful", func() { r := bytes.NewReader(payload) var w strings.Builder rc, err := inExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(messages[0].topic, topic) g.TAssertEqual(messages[0].payload, payload) g.TAssertEqual(rc, 0) g.TAssertEqual(w.String(), messages[0].uuid.String() + "\n") }) g.Testing("if reading fails, we return the error", func() { var r *os.File var w strings.Builder rc, err := inExec(args, queries, r, &w) g.TAssertEqual( err.Error(), "invalid argument", ) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) }) g.Testing("if publishing fails, we return the error", func() { publishErr = errors.New("publish() error") r := strings.NewReader("read but not published") var w strings.Builder rc, err := inExec(args, queries, r, &w) g.TAssertEqual(err, publishErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) }) } func test_outExec() { g.TestStart("outExec()") const ( topic = "outExec topic" consumer = "outExec consumer" ) var ( r = strings.NewReader("") args = argsT{ topic: topic, consumer: consumer, } ) var ( takeErr error nextErr error messages []messageT id int64 = 0 ) queries := queriesT{ take: func(string, string) error { return takeErr }, next: func(string, string) (messageT, error) { if nextErr != nil { return messageT{}, nextErr } if len(messages) == 0 { return messageT{}, sql.ErrNoRows } return messages[0], nil }, } pub := func(payload []byte) { id++ now := time.Now() message := messageT{ id: id, timestamp: now, uuid: uuid.New(), topic: topic, flowID: uuid.New(), payload: payload, } messages = append(messages, message) } g.Testing("exit code 1 when we can't take", func() { takeErr = errors.New("outExec() take error") var w strings.Builder rc, err := outExec(args, queries, r, &w) g.TAssertEqual(err, takeErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) takeErr = nil }) g.Testing("exit code 3 when no message is available", func() { var w strings.Builder rc, err := outExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 3) }) g.Testing("we get the same message until we commit", func() { var ( w1 strings.Builder w2 strings.Builder w3 strings.Builder ) args := argsT{ topic: topic, consumer: consumer, } pub([]byte("first payload")) pub([]byte("second payload")) rc1, err1 := outExec(args, queries, r, &w1) rc2, err2 := outExec(args, queries, r, &w2) messages = messages[1:] rc3, err3 := outExec(args, queries, r, &w3) g.TErrorIf(g.SomeError(err1, err2, err3)) g.TAssertEqual(w1.String(), "first payload\n") g.TAssertEqual(w2.String(), "first payload\n") g.TAssertEqual(w3.String(), "second payload\n") g.TAssertEqual(rc1, 0) g.TAssertEqual(rc2, 0) g.TAssertEqual(rc3, 0) }) g.Testing("we propagate the error when the query fails", func() { nextErr = errors.New("next() error") var w strings.Builder rc, err := outExec(args, queries, r, &w) g.TAssertEqual(err, nextErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) }) } func test_commitExec() { g.TestStart("commitExec()") const ( topic = "commitExec topic" consumer = "commitExec consumer" ) var ( r = strings.NewReader("") args = argsT{ topic: topic, consumer: consumer, } ) var ( takeErr error nextErr error commitErr error messages []messageT id int64 = 0 ) queries := queriesT{ take: func(string, string) error { return takeErr }, next: func(string, string) (messageT, error) { if nextErr != nil { return messageT{}, nextErr } if len(messages) == 0 { return messageT{}, sql.ErrNoRows } return messages[0], nil }, commit: func(string, uuid.UUID) error { if commitErr != nil { return commitErr } messages = messages[1:] return nil }, } pub := func(payload []byte) { id++ now := time.Now() message := messageT{ id: id, timestamp: now, uuid: uuid.New(), topic: topic, flowID: uuid.New(), payload: payload, } messages = append(messages, message) } g.Testing("error when we can't take", func() { takeErr = errors.New("commitExec() take error") var w strings.Builder rc, err := commitExec(args, queries, r, &w) g.TAssertEqual(err, takeErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) takeErr = nil }) g.Testing("error when there is nothing to commit", func() { var w strings.Builder rc, err := commitExec(args, queries, r, &w) g.TAssertEqual(err, sql.ErrNoRows) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) }) g.Testing("messages get committed in order", func() { var w strings.Builder pub([]byte("first payload")) pub([]byte("second payload")) pub([]byte("third payload")) message1 := messages[0] g.TAssertEqual(message1.payload, []byte("first payload")) rc, err := commitExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(rc, 0) message2 := messages[0] g.TAssertEqual(message2.payload, []byte("second payload")) rc, err = commitExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(rc, 0) message3 := messages[0] g.TAssertEqual(message3.payload, []byte("third payload")) rc, err = commitExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(rc, 0) g.TAssertEqual(len(messages), 0) }) g.Testing("when next() query fails, we propagate its result", func() { nextErr = errors.New("next() error") var w strings.Builder rc, err := commitExec(args, queries, r, &w) g.TAssertEqual(err, nextErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) nextErr = nil }) g.Testing("we also propagate the error on commit() failure", func() { commitErr = errors.New("commit() error") pub([]byte{}) var w strings.Builder rc, err := commitExec(args, queries, r, &w) g.TAssertEqual(err, commitErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) }) } func test_deadExec() { g.TestStart("deadExec()") const ( topic = "deadExec topic" consumer = "deadExec consumer" ) var ( r = strings.NewReader("") args = argsT{ topic: topic, consumer: consumer, } ) var ( takeErr error nextErr error toDeadErr error messages []messageT id int64 = 0 ) queries := queriesT{ take: func(string, string) error { return takeErr }, next: func(string, string) (messageT, error) { if nextErr != nil { return messageT{}, nextErr } if len(messages) == 0 { return messageT{}, sql.ErrNoRows } return messages[0], nil }, toDead: func( _ string, _ uuid.UUID, deadletterID uuid.UUID, ) error { if toDeadErr != nil { return toDeadErr } messages = messages[1:] return nil }, } pub := func(payload []byte) { id++ now := time.Now() message := messageT{ id: id, timestamp: now, uuid: uuid.New(), topic: topic, flowID: uuid.New(), payload: payload, } messages = append(messages, message) } g.Testing("error when we can't take", func() { takeErr = errors.New("deadExec() take error") var w strings.Builder rc, err := deadExec(args, queries, r, &w) g.TAssertEqual(err, takeErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) takeErr = nil }) g.Testing("error when there is nothing to mark as dead", func() { var w strings.Builder rc, err := deadExec(args, queries, r, &w) g.TAssertEqual(err, sql.ErrNoRows) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) }) g.Testing("the latest message becomes a deadletter", func() { var w strings.Builder pub([]byte("first payload")) pub([]byte("second payload")) message1 := messages[0] g.TAssertEqual(message1.payload, []byte("first payload")) rc, err := deadExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(rc, 0) message2 := messages[0] g.TAssertEqual(message2.payload, []byte("second payload")) rc, err = deadExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(rc, 0) g.TAssertEqual(len(messages), 0) }) g.Testing("next() error is propagated", func() { nextErr = errors.New("next() error") var w strings.Builder rc, err := deadExec(args, queries, r, &w) g.TAssertEqual(err, nextErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) nextErr = nil }) g.Testing("toDead() error is propagated", func() { toDeadErr = errors.New("toDead() error") pub([]byte{}) var w strings.Builder rc, err := deadExec(args, queries, r, &w) g.TAssertEqual(err, toDeadErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) }) } func test_listDeadExec() { g.TestStart("listDeadExec()") const ( topic = "listDeadExec topic" consumer = "listDeadExec consumer" ) var ( r = strings.NewReader("") args = argsT{ topic: topic, consumer: consumer, } ) var ( messages []messageT deadletters []deadletterT id int64 = 0 errorIndex = -1 allDeadErr = errors.New("allDead() error") ) queries := queriesT{ allDead: func( _ string, _ string, callback func(deadletterT, messageT) error, ) error { for i, deadletter := range deadletters { if i == errorIndex { return allDeadErr } callback(deadletter, messageT{}) } return nil }, } pub := func() { payload := []byte("ignored payload for this test") id++ now := time.Now() message := messageT{ id: id, timestamp: now, uuid: uuid.New(), topic: topic, flowID: uuid.New(), payload: payload, } messages = append(messages, message) } commit := func() { messages = messages[1:] } dead := func() { message := messages[0] now := time.Now() deadletter := deadletterT{ uuid: uuid.New(), timestamp: now, consumer: consumer, messageID: message.uuid, } messages = messages[1:] deadletters = append(deadletters, deadletter) } replay := func() { pub() deadletters = deadletters[1:] } g.Testing("nothing is shown if topic is empty", func() { var w strings.Builder rc, err := listDeadExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 0) }) g.Testing("deadletters are printed in order", func() { var w strings.Builder pub() pub() pub() rc, err := listDeadExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 0) dead() commit() dead() expected := fmt.Sprintf( "%s\t%s\t%s\n%s\t%s\t%s\n", deadletters[0].uuid.String(), deadletters[0].timestamp.Format(time.RFC3339), deadletters[0].consumer, deadletters[1].uuid.String(), deadletters[1].timestamp.Format(time.RFC3339), deadletters[1].consumer, ) rc, err = listDeadExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(w.String(), expected) g.TAssertEqual(rc, 0) }) g.Testing("deadletters disappear after being replayed", func() { var ( w1 strings.Builder w2 strings.Builder ) replay() deadletter := deadletters[0] expected := fmt.Sprintf( "%s\t%s\t%s\n", deadletter.uuid.String(), deadletter.timestamp.Format(time.RFC3339), deadletter.consumer, ) rc, err := listDeadExec(args, queries, r, &w1) g.TErrorIf(err) g.TAssertEqual(w1.String(), expected) g.TAssertEqual(rc, 0) replay() rc, err = listDeadExec(args, queries, r, &w2) g.TErrorIf(err) g.TAssertEqual(w2.String(), "") g.TAssertEqual(rc, 0) }) g.Testing("a database failure interrupts the output", func() { var w strings.Builder pub() pub() pub() dead() dead() dead() deadletter := deadletters[0] expected := fmt.Sprintf( "%s\t%s\t%s\n", deadletter.uuid.String(), deadletter.timestamp.Format(time.RFC3339), deadletter.consumer, ) errorIndex = 1 rc, err := listDeadExec(args, queries, r, &w) g.TAssertEqual(err, allDeadErr) g.TAssertEqual(w.String(), expected) g.TAssertEqual(rc, 1) }) } func test_replayExec() { g.TestStart("replayExec()") const ( topic = "replayExec topic" consumer = "replayExec consumer" ) var ( w strings.Builder r = strings.NewReader("") args = argsT{ topic: topic, consumer: consumer, } ) var ( oneDeadErr error replayErr error messages []messageT deadletters []deadletterT deadMessages []messageT id int64 = 0 ) queries := queriesT{ oneDead: func(string, string) (deadletterT, error) { if oneDeadErr != nil { return deadletterT{}, oneDeadErr } if len(deadletters) == 0 { return deadletterT{}, sql.ErrNoRows } return deadletters[0], nil }, replay: func(uuid.UUID, uuid.UUID) (messageT, error) { if replayErr != nil { return messageT{}, replayErr } message := deadMessages[0] messages = append(messages, message) deadletters = deadletters[1:] deadMessages = deadMessages[1:] return message, nil }, } pub := func(payload []byte) { id++ now := time.Now() message := messageT{ id: id, timestamp: now, uuid: uuid.New(), topic: topic, flowID: uuid.New(), payload: payload, } messages = append(messages, message) } commit := func() { messages = messages[1:] } dead := func() { message := messages[0] deadletter := deadletterT{ uuid: uuid.New() } messages = messages[1:] deadletters = append(deadletters, deadletter) deadMessages = append(deadMessages, message) } next := func() string { return string(messages[0].payload) } g.Testing("error when there is nothing to replay", func() { rc, err := replayExec(args, queries, r, &w) g.TAssertEqual(err, sql.ErrNoRows) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) pub([]byte("first payload")) pub([]byte("second payload")) pub([]byte("third payload")) pub([]byte("fourth payload")) rc, err = replayExec(args, queries, r, &w) g.TAssertEqual(err, sql.ErrNoRows) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) }) g.Testing("deadletters are replayed in order", func() { dead() commit() dead() commit() rc, err := replayExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 0) g.TAssertEqual(next(), "first payload") commit() rc, err = replayExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 0) g.TAssertEqual(next(), "third payload") }) g.Testing("oneDead() error is forwarded", func() { oneDeadErr = errors.New("oneDead() error") rc, err := replayExec(args, queries, r, &w) g.TAssertEqual(err, oneDeadErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) oneDeadErr = nil }) g.Testing("replay() error is also forwarded", func() { pub([]byte{}) dead() replayErr = errors.New("replay() error") rc, err := replayExec(args, queries, r, &w) g.TAssertEqual(err, replayErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) }) } func test_sizeExec() { g.TestStart("sizeExec()") const ( topic = "sizeExec topic" consumer = "sizeExec consumer" ) var ( r = strings.NewReader("") args = argsT{ topic: topic, consumer: consumer, } ) var sizeErr error queries := queriesT{ size: func(string) (int, error) { if sizeErr != nil { return -1, sizeErr } return 123, nil }, } g.Testing("it propagates the error when the query fails", func() { sizeErr = errors.New("size() error") var w strings.Builder rc, err := sizeExec(args, queries, r, &w) g.TAssertEqual(err, sizeErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) sizeErr = nil }) g.Testing("otherwise it just prints what is was given", func() { var w strings.Builder rc, err := sizeExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(w.String(), "123\n") g.TAssertEqual(rc, 0) }) } func test_countExec() { g.TestStart("countExec()") const ( topic = "countExec topic" consumer = "countExec consumer" ) var ( r = strings.NewReader("") args = argsT{ topic: topic, consumer: consumer, } ) var countErr error queries := queriesT{ count: func(string, string) (int, error) { if countErr != nil { return -1, countErr } return 2222, nil }, } g.Testing("it propagates the query error", func() { countErr = errors.New("count() error") var w strings.Builder rc, err := countExec(args, queries, r, &w) g.TAssertEqual(err, countErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) countErr = nil }) g.Testing("otherwise it prints the given count", func() { var w strings.Builder rc, err := countExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(w.String(), "2222\n") g.TAssertEqual(rc, 0) }) } func test_hasDataExec() { g.TestStart("hasData()") const ( topic = "hasData topic" consumer = "hasData consumer" ) var ( w strings.Builder r = strings.NewReader("") args = argsT{ topic: topic, consumer: consumer, } ) hasData := true var hasDataErr error queries := queriesT{ hasData: func(string, string) (bool, error) { if hasDataErr != nil { return false, hasDataErr } return hasData, nil }, } g.Testing("it propagates the query error", func() { hasDataErr = errors.New("hasData() error") rc, err := hasDataExec(args, queries, r, &w) g.TAssertEqual(err, hasDataErr) g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 1) hasDataErr = nil }) g.Testing("otherwise if just returns (not prints) the flag", func() { hasData = true rc, err := hasDataExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(rc, 0) hasData = false rc, err = hasDataExec(args, queries, r, &w) g.TErrorIf(err) g.TAssertEqual(rc, 1) g.TAssertEqual(w.String(), "") }) } func test_usage() { g.TestStart("usage()") g.Testing("it just writes to io.Writer", func() { var w strings.Builder usage("xxx", &w) const message = "Usage: xxx [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" g.TAssertEqual(w.String(), message) }) g.Testing("noop on io.Discard for io.Writer", func() { usage("AN ERROR IF SEEN ANYWHERE!", io.Discard) }) } func test_getopt() { g.TestStart("getopt()") const warning = "Missing COMMAND.\n" const usage = "Usage: $0 [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n" commandsMap := map[string]commandT{ "good": commandT{ name: "good", getopt: func(args argsT, _ io.Writer) (argsT, bool) { return args, true }, }, "bad": commandT{ name: "bad", getopt: func(args argsT, w io.Writer) (argsT, bool) { if len(args.args) == 0 { fmt.Fprintln(w, "no required arg") return args, false } if args.args[0] != "required" { fmt.Fprintln(w, "not correct one") return args, false } args.topic = "a topic" args.consumer = "a consumer" return args, true }, }, } g.Testing("we suppress the default error message", func() { var w strings.Builder argv := []string{"$0", "-h"} _, _, rc := getopt(argv, commandsMap, &w) g.TAssertEqual(w.String(), usage) g.TAssertEqual(rc, 2) }) g.Testing("we get an error on unsupported flag", func() { var w strings.Builder argv := []string{"$0", "-X"} _, _, rc := getopt(argv, commandsMap, &w) const message = "flag provided but not defined: -X\n" g.TAssertEqual(w.String(), message + usage) g.TAssertEqual(rc, 2) }) g.Testing("we also get an error on incorrect usage of flags", func() { var w strings.Builder argv := []string{"$0", "-f"} _, _, rc := getopt(argv, commandsMap, &w) const message = "flag needs an argument: -f\n" g.TAssertEqual(w.String(), message + usage) g.TAssertEqual(rc, 2) }) g.Testing("error when not given a command", func() { var w strings.Builder argv := []string{"$0"} _, _, rc := getopt(argv, commandsMap, &w) g.TAssertEqual(w.String(), warning + usage) g.TAssertEqual(rc, 2) }) g.Testing("error on unknown command", func() { var w strings.Builder argv := []string{"$0", "unknown"} _, _, rc := getopt(argv, commandsMap, &w) const message = `Bad COMMAND: "unknown".` + "\n" g.TAssertEqual(w.String(), message + usage) g.TAssertEqual(rc, 2) }) g.Testing("checks the command usage", func() { var ( w1 strings.Builder w2 strings.Builder w3 strings.Builder ) argv1 := []string{"$0", "bad"} argv2 := []string{"$0", "bad", "arg"} argv3 := []string{"$0", "bad", "required"} _, _, rc1 := getopt(argv1, commandsMap, &w1) _, _, rc2 := getopt(argv2, commandsMap, &w2) args, command, rc3 := getopt(argv3, commandsMap, &w3) expectedArgs := argsT{ databasePath: "fiinha.db", prefix: "fiinha", command: "bad", allArgs: argv3, args: argv3[2:], topic: "a topic", consumer: "a consumer", } g.TAssertEqual(w1.String(), "no required arg\n" + usage) g.TAssertEqual(w2.String(), "not correct one\n" + usage) g.TAssertEqual(w3.String(), "") g.TAssertEqual(rc1, 2) g.TAssertEqual(rc2, 2) g.TAssertEqual(rc3, 0) g.TAssertEqual(args, expectedArgs) g.TAssertEqual(command.name, "bad") }) g.Testing("when given a command we the default values", func() { var w strings.Builder args, command, rc := getopt( []string{"$0", "good"}, commandsMap, &w, ) expectedArgs := argsT{ databasePath: "fiinha.db", prefix: "fiinha", command: "good", allArgs: []string{"$0", "good"}, args: []string{}, } g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 0) g.TAssertEqual(args, expectedArgs) g.TAssertEqual(command.name, "good") }) g.Testing("we can customize both values", func() { var w strings.Builder argv := []string{"$0", "-f", "F", "-p", "P", "good"} args, command, rc := getopt(argv, commandsMap, &w) expectedArgs := argsT{ databasePath: "F", prefix: "P", command: "good", allArgs: argv, args: []string{}, } g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 0) g.TAssertEqual(args, expectedArgs) g.TAssertEqual(command.name, "good") }) g.Testing("a command can have its own commands and options", func() { var w strings.Builder argv := []string{"$0", "-f", "F", "good", "-f", "-f", "SUB"} args, command, rc := getopt(argv, commandsMap, &w) expectedArgs := argsT{ databasePath: "F", prefix: "fiinha", command: "good", allArgs: argv, args: []string{"-f", "-f", "SUB"}, } g.TAssertEqual(w.String(), "") g.TAssertEqual(rc, 0) g.TAssertEqual(args, expectedArgs) g.TAssertEqual(command.name, "good") }) } func test_runCommand() { g.TestStart("runCommand()") g.Testing("returns an error on bad prefix", func() { stdin := strings.NewReader("") var ( stdout strings.Builder stderr strings.Builder ) args := argsT{ prefix: "a bad prefix", command: "in", allArgs: []string{"$0"}, args: []string{"some topic name"}, } rc := runCommand(args, commands["in"], stdin, &stdout, &stderr) g.TAssertEqual(rc, 1) g.TAssertEqual(stdout.String(), "") g.TAssertEqual(stderr.String(), "Invalid table prefix\n") }) g.Testing("otherwise it build a queueT and calls command", func() { stdin := strings.NewReader("") var ( stdout1 strings.Builder stdout2 strings.Builder stderr1 strings.Builder stderr2 strings.Builder ) args1 := argsT{ prefix: defaultPrefix, command: "good", } args2 := argsT{ prefix: defaultPrefix, command: "bad", } myErr := errors.New("an error") good := commandT{ exec: func( _ argsT, _ queriesT, _ io.Reader, w io.Writer, ) (int, error) { fmt.Fprintf(w, "good text\n") return 0, nil }, } bad := commandT{ exec: func( _ argsT, _ queriesT, _ io.Reader, w io.Writer, ) (int, error) { fmt.Fprintf(w, "bad text\n") return 123, myErr }, } rc1 := runCommand(args1, good, stdin, &stdout1, &stderr1) rc2 := runCommand(args2, bad, stdin, &stdout2, &stderr2) g.TAssertEqual(stdout1.String(), "good text\n") g.TAssertEqual(stdout2.String(), "bad text\n") g.TAssertEqual(stderr1.String(), "") g.TAssertEqual(stderr2.String(), "an error\n") g.TAssertEqual(rc1, 0) g.TAssertEqual(rc2, 123) }) } func test_commands() { g.TestStart("commands") g.Testing("ensure map key and name are in sync", func() { for key, command := range commands { g.TAssertEqual(command.name, key) } }) } func dumpQueries() { queries := []struct{name string; fn func(string) queryT}{ { "createTables", createTablesSQL }, { "take", takeSQL }, { "publish", publishSQL }, { "find", findSQL }, { "next", nextSQL }, { "pending", pendingSQL }, { "commit", commitSQL }, { "toDead", toDeadSQL }, { "replay", replaySQL }, { "oneDead", oneDeadSQL }, { "allDead", allDeadSQL }, { "size", sizeSQL }, { "count", countSQL }, { "hasData", hasDataSQL }, } for _, query := range queries { q := query.fn(defaultPrefix) fmt.Printf("\n-- %s.sql:", query.name) fmt.Printf("\n-- write:%s\n", q.write) fmt.Printf("\n-- read:%s\n", q.read) fmt.Printf("\n-- owner:%s\n", q.owner) } } func MainTest() { if os.Getenv("TESTING_DUMP_SQL_QUERIES") != "" { dumpQueries() return } g.Init() test_defaultPrefix() test_serialized() test_execSerialized() test_tryRollback() test_inTx() test_createTables() test_takeStmt() test_publishStmt() test_findStmt() test_nextStmt() test_messageEach() test_pendingStmt() test_commitStmt() test_toDeadStmt() test_replayStmt() test_oneDeadStmt() test_deadletterEach() test_allDeadStmt() test_sizeStmt() test_countStmt() test_hasDataStmt() test_initDB() test_queriesTclose() test_newPinger() test_makeSubscriptionsFunc() test_makeNotifyFn() test_collectClosedWaiters() test_trimEmptyLeaves() test_deleteIfEmpty() test_deleteEmptyTopics() test_removeClosedWaiter() test_reapClosedWaiters() test_everyNthCall() test_runReaper() test_NewWithPrefix() test_New() test_asPublicMessage() test_queueT_Publish() test_registerConsumerFn() test_registerWaiterFn() test_makeConsumeOneFn() test_makeConsumeAllFn() test_makeWaitFn() test_runConsumer() test_tryFinding() test_queueT_Subscribe() test_queueT_WaitFor() test_unsubscribeIfExistsFn() test_queueT_Unsubscribe() test_cleanSubscriptions() test_queueT_Close() test_topicGetopt() test_topicConsumerGetopt() test_inExec() test_outExec() test_commitExec() test_deadExec() test_listDeadExec() test_replayExec() test_sizeExec() test_countExec() test_hasDataExec() test_usage() test_getopt() test_runCommand() test_commands() }