diff options
Diffstat (limited to 'tests/q.go')
-rw-r--r-- | tests/q.go | 453 |
1 files changed, 288 insertions, 165 deletions
@@ -21,6 +21,10 @@ import ( +var instanceID = os.Getpid() + + + func test_defaultPrefix() { g.TestStart("defaultPrefix") @@ -29,13 +33,46 @@ func test_defaultPrefix() { }) } -func test_tryRollback() { +func test_serialized() { // FIXME } -func test_inTx() { - /* +func test_execSerialized() { // FIXME +} + +func test_tryRollback() { + g.TestStart("tryRollback()") + + myErr := errors.New("bottom error") + + db, err := sql.Open(golite.DriverName, golite.InMemory) + g.TErrorIf(err) + defer db.Close() + + + g.Testing("the error is propagated if rollback doesn't fail", func() { + tx, err := db.Begin() + g.TErrorIf(err) + + err = tryRollback(tx, myErr) + g.TAssertEqual(err, myErr) + }) + + g.Testing("a wrapped error when rollback fails", func() { + tx, err := db.Begin() + g.TErrorIf(err) + + err = tx.Commit() + g.TErrorIf(err) + + err = tryRollback(tx, myErr) + g.TAssertEqual(reflect.DeepEqual(err, myErr), false) + g.TAssertEqual(errors.Is(err, myErr), true) + }) +} + +func test_inTx() { g.TestStart("inTx()") db, err := sql.Open(golite.DriverName, golite.InMemory) @@ -44,11 +81,11 @@ func test_inTx() { g.Testing("when fn() errors, we propagate it", func() { - myError := errors.New("to be propagated") + myErr := errors.New("to be propagated") err := inTx(db, func(tx *sql.Tx) error { - return myError + return myErr }) - g.TAssertEqual(err, myError) + g.TAssertEqual(err, myErr) }) g.Testing("on nil error we get nil", func() { @@ -57,13 +94,17 @@ func test_inTx() { }) g.TErrorIf(err) }) - */ } func test_createTables() { g.TestStart("createTables()") - db, err := sql.Open(golite.DriverName, golite.InMemory) + const ( + dbpath = golite.InMemory + prefix = defaultPrefix + ) + + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) defer db.Close() @@ -72,12 +113,12 @@ func test_createTables() { const tmpl_read = ` SELECT id FROM "%s_messages" LIMIT 1; ` - qRead := fmt.Sprintf(tmpl_read, defaultPrefix) + qRead := fmt.Sprintf(tmpl_read, prefix) _, err := db.Exec(qRead) g.TErrorNil(err) - err = createTables(db, defaultPrefix) + err = createTables(db, prefix) g.TErrorIf(err) _, err = db.Exec(qRead) @@ -86,9 +127,9 @@ func test_createTables() { g.Testing("we can do it multiple times", func() { g.TErrorIf(g.SomeError( - createTables(db, defaultPrefix), - createTables(db, defaultPrefix), - createTables(db, defaultPrefix), + createTables(db, prefix), + createTables(db, prefix), + createTables(db, prefix), )) }) } @@ -99,15 +140,21 @@ func test_takeStmt() { const ( topic = "take() topic" consumer = "take() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) g.TErrorIf(takeErr) defer g.SomeFnError( takeClose, @@ -137,9 +184,10 @@ func test_takeStmt() { }) g.Testing("if there is already an owner, we overtake it", func() { - otherID := instanceID + 1 + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -153,7 +201,7 @@ func test_takeStmt() { err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) g.TErrorIf(err) - g.TAssertEqual(ownerID, otherID) + g.TAssertEqual(ownerID, otherCfg.instanceID) }) g.Testing("no error if closed more than once", func() { g.TErrorIf(g.SomeError( @@ -170,6 +218,7 @@ func test_publishStmt() { const ( topic = "publish() topic" payloadStr = "publish() payload" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -182,12 +231,17 @@ func test_publishStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + publish, publishClose, publishErr := publishStmt(cfg) g.TErrorIf(publishErr) defer g.SomeFnError( publishClose, @@ -255,6 +309,7 @@ func test_findStmt() { const ( topic = "find() topic" payloadStr = "find() payload" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -267,13 +322,18 @@ func test_findStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - find, findClose, findErr := findStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + publish, publishClose, publishErr := publishStmt(cfg) + find, findClose, findErr := findStmt(cfg) g.TErrorIf(g.SomeError( publishErr, findErr, @@ -354,6 +414,7 @@ func test_nextStmt() { topic = "next() topic" payloadStr = "next() payload" consumer = "next() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -366,15 +427,24 @@ func test_nextStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - next, nextClose, nextErr := nextStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + next, nextClose, nextErr := nextStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + g.TErrorIf(takeErr) + g.TErrorIf(publishErr) + g.TErrorIf(nextErr) + g.TErrorIf(commitErr) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -440,9 +510,10 @@ func test_nextStmt() { }) g.Testing("error when we're not the owner", func() { - otherID := instanceID + 1 + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -455,7 +526,7 @@ func test_nextStmt() { _, err = next(topic, consumer) g.TAssertEqual(err, fmt.Errorf( notOwnerErrorFmt, - otherID, + otherCfg.instanceID, topic, consumer, instanceID, @@ -478,6 +549,7 @@ func test_messageEach() { topic = "messageEach() topic" payloadStr = "messageEach() payload" consumer = "messageEach() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -490,14 +562,19 @@ func test_messageEach() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + pending, pendingClose, pendingErr := pendingStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -640,6 +717,7 @@ func test_pendingStmt() { topic = "pending() topic" payloadStr = "pending() payload" consumer = "pending() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -652,16 +730,21 @@ func test_pendingStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + pending, pendingClose, pendingErr := pendingStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -820,9 +903,10 @@ func test_pendingStmt() { }) g.Testing("when we're not the owners we get nothing", func() { - otherID := instanceID + 1 + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -871,6 +955,7 @@ func test_commitStmt() { topic = "commit() topic" payloadStr = "commit() payload" consumer = "commit() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -883,15 +968,20 @@ func test_commitStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -937,7 +1027,10 @@ func test_commitStmt() { g.Testing("we can't commit non-existent messages", func() { err := cmt(consumer, guuid.New()) - g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual( + err.(golite.Error).ExtendedCode, + golite.ErrConstraintNotNull, + ) }) g.Testing("multiple consumers may commit a message", func() { @@ -987,8 +1080,10 @@ func test_commitStmt() { }) g.Testing("error if we don't own the topic/consumer", func() { - otherID := instanceID + 1 - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 + + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -998,13 +1093,10 @@ func test_commitStmt() { g.TErrorIf(err) err = commit(consumer, messageID) - g.TAssertEqual(err, fmt.Errorf( - noLongerOwnerErrorFmt, - instanceID, - topic, - consumer, - otherID, - )) + g.TAssertEqual( + err.(golite.Error).ExtendedCode, + golite.ErrConstraintTrigger, + ) }) g.Testing("no actual closing occurs", func() { @@ -1023,6 +1115,7 @@ func test_toDeadStmt() { topic = "toDead() topic" payloadStr = "toDead() payload" consumer = "toDead() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1035,15 +1128,20 @@ func test_toDeadStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1107,7 +1205,10 @@ func test_toDeadStmt() { g.Testing("we can't mark as dead non-existent messages", func() { err := asDead(consumer, guuid.New(), guuid.New()) - g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual( + err.(golite.Error).ExtendedCode, + golite.ErrConstraintNotNull, + ) }) g.Testing("multiple consumers may mark a message as dead", func() { @@ -1173,11 +1274,13 @@ func test_toDeadStmt() { }) g.Testing("error if we don't own the message's consumer/topic", func() { - otherID := instanceID + 1 + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 + messageID1 := pub(topic) messageID2 := pub(topic) - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -1188,13 +1291,10 @@ func test_toDeadStmt() { g.TErrorIf(err) err = toDead(consumer, messageID2, guuid.New()) - g.TAssertEqual(err, fmt.Errorf( - noLongerOwnerErrorFmt, - instanceID, - topic, - consumer, - otherID, - )) + g.TAssertEqual( + err.(golite.Error).ExtendedCode, + golite.ErrConstraintTrigger, + ) }) g.Testing("no actual closing occurs", func() { @@ -1213,6 +1313,7 @@ func test_replayStmt() { topic = "replay() topic" payloadStr = "replay() payload" consumer = "replay() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1225,15 +1326,20 @@ func test_replayStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + replay, replayClose, replayErr := replayStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1343,6 +1449,7 @@ func test_oneDeadStmt() { topic = "oneDead() topic" payloadStr = "oneDead() payload" consumer = "oneDead() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1355,16 +1462,21 @@ func test_oneDeadStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) - oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + replay, replayClose, replayErr := replayStmt(cfg) + oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1457,6 +1569,7 @@ func test_deadletterEach() { topic = "deadletterEach() topic" payloadStr = "deadletterEach() payload" consumer = "deadletterEach() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1469,15 +1582,20 @@ func test_deadletterEach() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1631,6 +1749,7 @@ func test_allDeadStmt() { topic = "allDead() topic" payloadStr = "allDead() payload" consumer = "allDead() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1643,16 +1762,21 @@ func test_allDeadStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) - allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + replay, replayClose, replayErr := replayStmt(cfg) + allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1784,6 +1908,7 @@ func test_sizeStmt() { topic = "size() topic" payloadStr = "size() payload" consumer = "size() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1796,17 +1921,22 @@ func test_sizeStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) - oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID) - size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + replay, replayClose, replayErr := replayStmt(cfg) + oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) + size, sizeClose, sizeErr := sizeStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1892,6 +2022,7 @@ func test_countStmt() { topic = "count() topic" payloadStr = "count() payload" consumer = "count() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1904,17 +2035,22 @@ func test_countStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - next, nextClose, nextErr := nextStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - count, countClose, countErr := countStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + next, nextClose, nextErr := nextStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + count, countClose, countErr := countStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -2020,6 +2156,7 @@ func test_hasDataStmt() { topic = "hasData() topic" payloadStr = "hasData() payload" consumer = "hasData() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -2032,17 +2169,22 @@ func test_hasDataStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - next, nextClose, nextErr := nextStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + next, nextClose, nextErr := nextStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + hasData, hasDataClose, hasDataErr := hasDataStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -2148,6 +2290,8 @@ func test_initDB() { topic = "initDB() topic" payloadStr = "initDB() payload" consumer = "initDB() consumer" + dbpath = golite.InMemory + prefix = defaultPrefix ) var ( flowID = guuid.New() @@ -2159,17 +2303,12 @@ func test_initDB() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - defer db.Close() - var messages []messageT notifyFn := func(message messageT) { messages = append(messages, message) } - instanceID := os.Getpid() - queries, err := initDB(db, defaultPrefix, notifyFn, instanceID) + queries, err := initDB(dbpath, prefix, notifyFn, instanceID) g.TErrorIf(err) defer queries.close() @@ -2256,31 +2395,16 @@ func test_initDB() { func test_queriesTclose() { g.TestStart("queriesT.close()") - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - defer db.Close() + const ( + dbpath = golite.InMemory + prefix = defaultPrefix + ) - instanceID := os.Getpid() - queries, err := initDB(db, defaultPrefix, func(messageT) {}, instanceID) + notifyFn := func(messageT) {} + queries, err := initDB(dbpath, prefix, notifyFn, instanceID) g.TErrorIf(err) - g.Testing("after closing, we can't run queries", func() { - unsent := UnsentMessage{ Payload: []byte{}, } - _, err := queries.publish(unsent, guuid.New()) - g.TErrorIf(err) - g.TErrorIf(db.Close()) - - err = queries.close() - g.TErrorIf(err) - - _, err = queries.publish(unsent, guuid.New()) - g.TAssertEqual( - err.Error(), - "sql: database is closed", - ) - }) - g.Testing("closing mre than once does not error", func() { g.TErrorIf(g.SomeError( queries.close(), @@ -2289,7 +2413,6 @@ func test_queriesTclose() { }) } - func test_newPinger() { g.TestStart("newPinger()") @@ -3287,6 +3410,7 @@ func test_queueT_Publish() { const ( topic = "queueT.Publish() topic" payloadStr = "queueT.Publish() payload" + dbpath = golite.InMemory ) var ( flowID = guuid.New() @@ -3298,7 +3422,7 @@ func test_queueT_Publish() { } ) - queue, err := New(golite.InMemory) + queue, err := New(dbpath) g.TErrorIf(err) defer queue.Close() @@ -4350,11 +4474,7 @@ func test_queueT_Close() { queriesErr = errors.New("queriesT{} error") ) - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - queue := queueT{ - db: db, queries: queriesT{ close: func() error{ queriesCount++ @@ -4376,7 +4496,7 @@ func test_queueT_Close() { }, } - err = queue.Close() + err := queue.Close() g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr)) g.TAssertEqual(pingerCount, 1) g.TAssertEqual(subscriptionsCount, 1) @@ -5674,6 +5794,7 @@ func dumpQueries() { { "take", takeSQL }, { "publish", publishSQL }, { "find", findSQL }, + { "next", nextSQL }, { "pending", pendingSQL }, { "commit", commitSQL }, { "toDead", toDeadSQL }, @@ -5703,6 +5824,8 @@ func MainTest() { g.Init() test_defaultPrefix() + test_serialized() + test_execSerialized() test_tryRollback() test_inTx() test_createTables() |