diff options
author | EuAndreh <eu@euandre.org> | 2024-10-30 20:26:41 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2024-10-31 19:57:42 -0300 |
commit | 5d53704f450788452837b18b26f54d235d883490 (patch) | |
tree | 91bb61d6f823523675ae9f93bb8561858bf7be14 /tests/q.go | |
parent | tests/q.go: Replace ":memory:" with golite.InMemory (diff) | |
download | fiinha-5d53704f450788452837b18b26f54d235d883490.tar.gz fiinha-5d53704f450788452837b18b26f54d235d883490.tar.xz |
src/q.go: Fix SQLite ~broken~ transactions
As per expected by SQLite, create new connections so they have
independent transaction, and serialize its use via a channel with a
single consumer.
== Other changes
=== Remove `db` attribute from `queueT` type
Now that `initDB()` can create many database connections (A.K.A. opaque
pointers to the `sqlite3*` object), it makes more sense to hand to it
the responsability to create and destroy these databases. So now a
`queries.close()` includes what previously was `db.Close()`, and needing
to do it was the only reason that made us keep the `db` attribute.
The arguments to `initDB()` were adjusted to reflect that, as it no
longer is given an initialized database handle, but only the database
path, and does the creation of the handles by itself.
=== Shoehorn multi-statement queries into single-statement ones
In order to avoid creating one `execSerialized()` function per attribute
of the `queryT` queries, we try to leverage SQLite's built-in
transactionality per individual statement. So compound queries that were
previously done in multiple statements wrapped with a `inTx()` that
could be shoe-horned into a single `SELECT` were rewritten to avoid
having more of this application-level serialization.
The topic/consumer owner validation was also changed: now it is a
trigger on the relevant tables that `ABORT` the implicit/invisible
transaction and makes the `INSERT` fail.
In order to make this possible, the `"%s_owners"` table now has an extra
column: `instance_id`. Despite being extra data, this meta-information
isn't duplicated from anywhere else, and it is an actual useful
information for operators to leverage.
So now the trigger is responsible for stopping the transaction from
going forward without adding an explicit `inTx()` around it, and the
writes can mostly be shrunk to single-statement queries.
Fortunately, all this is enough to fix the `new-instance-takeover`
functional test.
Diffstat (limited to 'tests/q.go')
-rw-r--r-- | tests/q.go | 453 |
1 files changed, 288 insertions, 165 deletions
@@ -21,6 +21,10 @@ import ( +var instanceID = os.Getpid() + + + func test_defaultPrefix() { g.TestStart("defaultPrefix") @@ -29,13 +33,46 @@ func test_defaultPrefix() { }) } -func test_tryRollback() { +func test_serialized() { // FIXME } -func test_inTx() { - /* +func test_execSerialized() { // FIXME +} + +func test_tryRollback() { + g.TestStart("tryRollback()") + + myErr := errors.New("bottom error") + + db, err := sql.Open(golite.DriverName, golite.InMemory) + g.TErrorIf(err) + defer db.Close() + + + g.Testing("the error is propagated if rollback doesn't fail", func() { + tx, err := db.Begin() + g.TErrorIf(err) + + err = tryRollback(tx, myErr) + g.TAssertEqual(err, myErr) + }) + + g.Testing("a wrapped error when rollback fails", func() { + tx, err := db.Begin() + g.TErrorIf(err) + + err = tx.Commit() + g.TErrorIf(err) + + err = tryRollback(tx, myErr) + g.TAssertEqual(reflect.DeepEqual(err, myErr), false) + g.TAssertEqual(errors.Is(err, myErr), true) + }) +} + +func test_inTx() { g.TestStart("inTx()") db, err := sql.Open(golite.DriverName, golite.InMemory) @@ -44,11 +81,11 @@ func test_inTx() { g.Testing("when fn() errors, we propagate it", func() { - myError := errors.New("to be propagated") + myErr := errors.New("to be propagated") err := inTx(db, func(tx *sql.Tx) error { - return myError + return myErr }) - g.TAssertEqual(err, myError) + g.TAssertEqual(err, myErr) }) g.Testing("on nil error we get nil", func() { @@ -57,13 +94,17 @@ func test_inTx() { }) g.TErrorIf(err) }) - */ } func test_createTables() { g.TestStart("createTables()") - db, err := sql.Open(golite.DriverName, golite.InMemory) + const ( + dbpath = golite.InMemory + prefix = defaultPrefix + ) + + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) defer db.Close() @@ -72,12 +113,12 @@ func test_createTables() { const tmpl_read = ` SELECT id FROM "%s_messages" LIMIT 1; ` - qRead := fmt.Sprintf(tmpl_read, defaultPrefix) + qRead := fmt.Sprintf(tmpl_read, prefix) _, err := db.Exec(qRead) g.TErrorNil(err) - err = createTables(db, defaultPrefix) + err = createTables(db, prefix) g.TErrorIf(err) _, err = db.Exec(qRead) @@ -86,9 +127,9 @@ func test_createTables() { g.Testing("we can do it multiple times", func() { g.TErrorIf(g.SomeError( - createTables(db, defaultPrefix), - createTables(db, defaultPrefix), - createTables(db, defaultPrefix), + createTables(db, prefix), + createTables(db, prefix), + createTables(db, prefix), )) }) } @@ -99,15 +140,21 @@ func test_takeStmt() { const ( topic = "take() topic" consumer = "take() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) g.TErrorIf(takeErr) defer g.SomeFnError( takeClose, @@ -137,9 +184,10 @@ func test_takeStmt() { }) g.Testing("if there is already an owner, we overtake it", func() { - otherID := instanceID + 1 + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -153,7 +201,7 @@ func test_takeStmt() { err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID) g.TErrorIf(err) - g.TAssertEqual(ownerID, otherID) + g.TAssertEqual(ownerID, otherCfg.instanceID) }) g.Testing("no error if closed more than once", func() { g.TErrorIf(g.SomeError( @@ -170,6 +218,7 @@ func test_publishStmt() { const ( topic = "publish() topic" payloadStr = "publish() payload" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -182,12 +231,17 @@ func test_publishStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + publish, publishClose, publishErr := publishStmt(cfg) g.TErrorIf(publishErr) defer g.SomeFnError( publishClose, @@ -255,6 +309,7 @@ func test_findStmt() { const ( topic = "find() topic" payloadStr = "find() payload" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -267,13 +322,18 @@ func test_findStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - find, findClose, findErr := findStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + publish, publishClose, publishErr := publishStmt(cfg) + find, findClose, findErr := findStmt(cfg) g.TErrorIf(g.SomeError( publishErr, findErr, @@ -354,6 +414,7 @@ func test_nextStmt() { topic = "next() topic" payloadStr = "next() payload" consumer = "next() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -366,15 +427,24 @@ func test_nextStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - next, nextClose, nextErr := nextStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + next, nextClose, nextErr := nextStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + g.TErrorIf(takeErr) + g.TErrorIf(publishErr) + g.TErrorIf(nextErr) + g.TErrorIf(commitErr) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -440,9 +510,10 @@ func test_nextStmt() { }) g.Testing("error when we're not the owner", func() { - otherID := instanceID + 1 + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -455,7 +526,7 @@ func test_nextStmt() { _, err = next(topic, consumer) g.TAssertEqual(err, fmt.Errorf( notOwnerErrorFmt, - otherID, + otherCfg.instanceID, topic, consumer, instanceID, @@ -478,6 +549,7 @@ func test_messageEach() { topic = "messageEach() topic" payloadStr = "messageEach() payload" consumer = "messageEach() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -490,14 +562,19 @@ func test_messageEach() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + pending, pendingClose, pendingErr := pendingStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -640,6 +717,7 @@ func test_pendingStmt() { topic = "pending() topic" payloadStr = "pending() payload" consumer = "pending() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -652,16 +730,21 @@ func test_pendingStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + pending, pendingClose, pendingErr := pendingStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -820,9 +903,10 @@ func test_pendingStmt() { }) g.Testing("when we're not the owners we get nothing", func() { - otherID := instanceID + 1 + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -871,6 +955,7 @@ func test_commitStmt() { topic = "commit() topic" payloadStr = "commit() payload" consumer = "commit() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -883,15 +968,20 @@ func test_commitStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -937,7 +1027,10 @@ func test_commitStmt() { g.Testing("we can't commit non-existent messages", func() { err := cmt(consumer, guuid.New()) - g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual( + err.(golite.Error).ExtendedCode, + golite.ErrConstraintNotNull, + ) }) g.Testing("multiple consumers may commit a message", func() { @@ -987,8 +1080,10 @@ func test_commitStmt() { }) g.Testing("error if we don't own the topic/consumer", func() { - otherID := instanceID + 1 - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 + + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -998,13 +1093,10 @@ func test_commitStmt() { g.TErrorIf(err) err = commit(consumer, messageID) - g.TAssertEqual(err, fmt.Errorf( - noLongerOwnerErrorFmt, - instanceID, - topic, - consumer, - otherID, - )) + g.TAssertEqual( + err.(golite.Error).ExtendedCode, + golite.ErrConstraintTrigger, + ) }) g.Testing("no actual closing occurs", func() { @@ -1023,6 +1115,7 @@ func test_toDeadStmt() { topic = "toDead() topic" payloadStr = "toDead() payload" consumer = "toDead() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1035,15 +1128,20 @@ func test_toDeadStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1107,7 +1205,10 @@ func test_toDeadStmt() { g.Testing("we can't mark as dead non-existent messages", func() { err := asDead(consumer, guuid.New(), guuid.New()) - g.TAssertEqual(err, sql.ErrNoRows) + g.TAssertEqual( + err.(golite.Error).ExtendedCode, + golite.ErrConstraintNotNull, + ) }) g.Testing("multiple consumers may mark a message as dead", func() { @@ -1173,11 +1274,13 @@ func test_toDeadStmt() { }) g.Testing("error if we don't own the message's consumer/topic", func() { - otherID := instanceID + 1 + otherCfg := cfg + otherCfg.instanceID = instanceID + 1 + messageID1 := pub(topic) messageID2 := pub(topic) - take, takeClose, takeErr := takeStmt(db, prefix, otherID) + take, takeClose, takeErr := takeStmt(otherCfg) g.TErrorIf(takeErr) defer takeClose() @@ -1188,13 +1291,10 @@ func test_toDeadStmt() { g.TErrorIf(err) err = toDead(consumer, messageID2, guuid.New()) - g.TAssertEqual(err, fmt.Errorf( - noLongerOwnerErrorFmt, - instanceID, - topic, - consumer, - otherID, - )) + g.TAssertEqual( + err.(golite.Error).ExtendedCode, + golite.ErrConstraintTrigger, + ) }) g.Testing("no actual closing occurs", func() { @@ -1213,6 +1313,7 @@ func test_replayStmt() { topic = "replay() topic" payloadStr = "replay() payload" consumer = "replay() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1225,15 +1326,20 @@ func test_replayStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + replay, replayClose, replayErr := replayStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1343,6 +1449,7 @@ func test_oneDeadStmt() { topic = "oneDead() topic" payloadStr = "oneDead() payload" consumer = "oneDead() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1355,16 +1462,21 @@ func test_oneDeadStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) - oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + replay, replayClose, replayErr := replayStmt(cfg) + oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1457,6 +1569,7 @@ func test_deadletterEach() { topic = "deadletterEach() topic" payloadStr = "deadletterEach() payload" consumer = "deadletterEach() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1469,15 +1582,20 @@ func test_deadletterEach() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1631,6 +1749,7 @@ func test_allDeadStmt() { topic = "allDead() topic" payloadStr = "allDead() payload" consumer = "allDead() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1643,16 +1762,21 @@ func test_allDeadStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) - allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + replay, replayClose, replayErr := replayStmt(cfg) + allDead, allDeadClose, allDeadErr := allDeadStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1784,6 +1908,7 @@ func test_sizeStmt() { topic = "size() topic" payloadStr = "size() payload" consumer = "size() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1796,17 +1921,22 @@ func test_sizeStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - replay, replayClose, replayErr := replayStmt(db, prefix, instanceID) - oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID) - size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + replay, replayClose, replayErr := replayStmt(cfg) + oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg) + size, sizeClose, sizeErr := sizeStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -1892,6 +2022,7 @@ func test_countStmt() { topic = "count() topic" payloadStr = "count() payload" consumer = "count() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -1904,17 +2035,22 @@ func test_countStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - next, nextClose, nextErr := nextStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - count, countClose, countErr := countStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + next, nextClose, nextErr := nextStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + count, countClose, countErr := countStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -2020,6 +2156,7 @@ func test_hasDataStmt() { topic = "hasData() topic" payloadStr = "hasData() payload" consumer = "hasData() consumer" + dbpath = golite.InMemory prefix = defaultPrefix ) var ( @@ -2032,17 +2169,22 @@ func test_hasDataStmt() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) + db, err := sql.Open(golite.DriverName, dbpath) g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) - instanceID := os.Getpid() - take, takeClose, takeErr := takeStmt(db, prefix, instanceID) - publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - next, nextClose, nextErr := nextStmt(db, prefix, instanceID) - commit, commitClose, commitErr := commitStmt(db, prefix, instanceID) - toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID) - hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID) + cfg := dbconfigT{ + shared: db, + dbpath: dbpath, + prefix: prefix, + instanceID: instanceID, + } + take, takeClose, takeErr := takeStmt(cfg) + publish, publishClose, publishErr := publishStmt(cfg) + next, nextClose, nextErr := nextStmt(cfg) + commit, commitClose, commitErr := commitStmt(cfg) + toDead, toDeadClose, toDeadErr := toDeadStmt(cfg) + hasData, hasDataClose, hasDataErr := hasDataStmt(cfg) g.TErrorIf(g.SomeError( takeErr, publishErr, @@ -2148,6 +2290,8 @@ func test_initDB() { topic = "initDB() topic" payloadStr = "initDB() payload" consumer = "initDB() consumer" + dbpath = golite.InMemory + prefix = defaultPrefix ) var ( flowID = guuid.New() @@ -2159,17 +2303,12 @@ func test_initDB() { } ) - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - defer db.Close() - var messages []messageT notifyFn := func(message messageT) { messages = append(messages, message) } - instanceID := os.Getpid() - queries, err := initDB(db, defaultPrefix, notifyFn, instanceID) + queries, err := initDB(dbpath, prefix, notifyFn, instanceID) g.TErrorIf(err) defer queries.close() @@ -2256,31 +2395,16 @@ func test_initDB() { func test_queriesTclose() { g.TestStart("queriesT.close()") - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - defer db.Close() + const ( + dbpath = golite.InMemory + prefix = defaultPrefix + ) - instanceID := os.Getpid() - queries, err := initDB(db, defaultPrefix, func(messageT) {}, instanceID) + notifyFn := func(messageT) {} + queries, err := initDB(dbpath, prefix, notifyFn, instanceID) g.TErrorIf(err) - g.Testing("after closing, we can't run queries", func() { - unsent := UnsentMessage{ Payload: []byte{}, } - _, err := queries.publish(unsent, guuid.New()) - g.TErrorIf(err) - g.TErrorIf(db.Close()) - - err = queries.close() - g.TErrorIf(err) - - _, err = queries.publish(unsent, guuid.New()) - g.TAssertEqual( - err.Error(), - "sql: database is closed", - ) - }) - g.Testing("closing mre than once does not error", func() { g.TErrorIf(g.SomeError( queries.close(), @@ -2289,7 +2413,6 @@ func test_queriesTclose() { }) } - func test_newPinger() { g.TestStart("newPinger()") @@ -3287,6 +3410,7 @@ func test_queueT_Publish() { const ( topic = "queueT.Publish() topic" payloadStr = "queueT.Publish() payload" + dbpath = golite.InMemory ) var ( flowID = guuid.New() @@ -3298,7 +3422,7 @@ func test_queueT_Publish() { } ) - queue, err := New(golite.InMemory) + queue, err := New(dbpath) g.TErrorIf(err) defer queue.Close() @@ -4350,11 +4474,7 @@ func test_queueT_Close() { queriesErr = errors.New("queriesT{} error") ) - db, err := sql.Open(golite.DriverName, golite.InMemory) - g.TErrorIf(err) - queue := queueT{ - db: db, queries: queriesT{ close: func() error{ queriesCount++ @@ -4376,7 +4496,7 @@ func test_queueT_Close() { }, } - err = queue.Close() + err := queue.Close() g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr)) g.TAssertEqual(pingerCount, 1) g.TAssertEqual(subscriptionsCount, 1) @@ -5674,6 +5794,7 @@ func dumpQueries() { { "take", takeSQL }, { "publish", publishSQL }, { "find", findSQL }, + { "next", nextSQL }, { "pending", pendingSQL }, { "commit", commitSQL }, { "toDead", toDeadSQL }, @@ -5703,6 +5824,8 @@ func MainTest() { g.Init() test_defaultPrefix() + test_serialized() + test_execSerialized() test_tryRollback() test_inTx() test_createTables() |