summaryrefslogtreecommitdiff
path: root/tests/q.go
diff options
context:
space:
mode:
Diffstat (limited to 'tests/q.go')
-rw-r--r--tests/q.go453
1 files changed, 288 insertions, 165 deletions
diff --git a/tests/q.go b/tests/q.go
index 844e722..565ca25 100644
--- a/tests/q.go
+++ b/tests/q.go
@@ -21,6 +21,10 @@ import (
+var instanceID = os.Getpid()
+
+
+
func test_defaultPrefix() {
g.TestStart("defaultPrefix")
@@ -29,13 +33,46 @@ func test_defaultPrefix() {
})
}
-func test_tryRollback() {
+func test_serialized() {
// FIXME
}
-func test_inTx() {
- /*
+func test_execSerialized() {
// FIXME
+}
+
+func test_tryRollback() {
+ g.TestStart("tryRollback()")
+
+ myErr := errors.New("bottom error")
+
+ db, err := sql.Open(golite.DriverName, golite.InMemory)
+ g.TErrorIf(err)
+ defer db.Close()
+
+
+ g.Testing("the error is propagated if rollback doesn't fail", func() {
+ tx, err := db.Begin()
+ g.TErrorIf(err)
+
+ err = tryRollback(tx, myErr)
+ g.TAssertEqual(err, myErr)
+ })
+
+ g.Testing("a wrapped error when rollback fails", func() {
+ tx, err := db.Begin()
+ g.TErrorIf(err)
+
+ err = tx.Commit()
+ g.TErrorIf(err)
+
+ err = tryRollback(tx, myErr)
+ g.TAssertEqual(reflect.DeepEqual(err, myErr), false)
+ g.TAssertEqual(errors.Is(err, myErr), true)
+ })
+}
+
+func test_inTx() {
g.TestStart("inTx()")
db, err := sql.Open(golite.DriverName, golite.InMemory)
@@ -44,11 +81,11 @@ func test_inTx() {
g.Testing("when fn() errors, we propagate it", func() {
- myError := errors.New("to be propagated")
+ myErr := errors.New("to be propagated")
err := inTx(db, func(tx *sql.Tx) error {
- return myError
+ return myErr
})
- g.TAssertEqual(err, myError)
+ g.TAssertEqual(err, myErr)
})
g.Testing("on nil error we get nil", func() {
@@ -57,13 +94,17 @@ func test_inTx() {
})
g.TErrorIf(err)
})
- */
}
func test_createTables() {
g.TestStart("createTables()")
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ const (
+ dbpath = golite.InMemory
+ prefix = defaultPrefix
+ )
+
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
defer db.Close()
@@ -72,12 +113,12 @@ func test_createTables() {
const tmpl_read = `
SELECT id FROM "%s_messages" LIMIT 1;
`
- qRead := fmt.Sprintf(tmpl_read, defaultPrefix)
+ qRead := fmt.Sprintf(tmpl_read, prefix)
_, err := db.Exec(qRead)
g.TErrorNil(err)
- err = createTables(db, defaultPrefix)
+ err = createTables(db, prefix)
g.TErrorIf(err)
_, err = db.Exec(qRead)
@@ -86,9 +127,9 @@ func test_createTables() {
g.Testing("we can do it multiple times", func() {
g.TErrorIf(g.SomeError(
- createTables(db, defaultPrefix),
- createTables(db, defaultPrefix),
- createTables(db, defaultPrefix),
+ createTables(db, prefix),
+ createTables(db, prefix),
+ createTables(db, prefix),
))
})
}
@@ -99,15 +140,21 @@ func test_takeStmt() {
const (
topic = "take() topic"
consumer = "take() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
g.TErrorIf(takeErr)
defer g.SomeFnError(
takeClose,
@@ -137,9 +184,10 @@ func test_takeStmt() {
})
g.Testing("if there is already an owner, we overtake it", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -153,7 +201,7 @@ func test_takeStmt() {
err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
g.TErrorIf(err)
- g.TAssertEqual(ownerID, otherID)
+ g.TAssertEqual(ownerID, otherCfg.instanceID)
})
g.Testing("no error if closed more than once", func() {
g.TErrorIf(g.SomeError(
@@ -170,6 +218,7 @@ func test_publishStmt() {
const (
topic = "publish() topic"
payloadStr = "publish() payload"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -182,12 +231,17 @@ func test_publishStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ publish, publishClose, publishErr := publishStmt(cfg)
g.TErrorIf(publishErr)
defer g.SomeFnError(
publishClose,
@@ -255,6 +309,7 @@ func test_findStmt() {
const (
topic = "find() topic"
payloadStr = "find() payload"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -267,13 +322,18 @@ func test_findStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- find, findClose, findErr := findStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ publish, publishClose, publishErr := publishStmt(cfg)
+ find, findClose, findErr := findStmt(cfg)
g.TErrorIf(g.SomeError(
publishErr,
findErr,
@@ -354,6 +414,7 @@ func test_nextStmt() {
topic = "next() topic"
payloadStr = "next() payload"
consumer = "next() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -366,15 +427,24 @@ func test_nextStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ next, nextClose, nextErr := nextStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ g.TErrorIf(takeErr)
+ g.TErrorIf(publishErr)
+ g.TErrorIf(nextErr)
+ g.TErrorIf(commitErr)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -440,9 +510,10 @@ func test_nextStmt() {
})
g.Testing("error when we're not the owner", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -455,7 +526,7 @@ func test_nextStmt() {
_, err = next(topic, consumer)
g.TAssertEqual(err, fmt.Errorf(
notOwnerErrorFmt,
- otherID,
+ otherCfg.instanceID,
topic,
consumer,
instanceID,
@@ -478,6 +549,7 @@ func test_messageEach() {
topic = "messageEach() topic"
payloadStr = "messageEach() payload"
consumer = "messageEach() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -490,14 +562,19 @@ func test_messageEach() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ pending, pendingClose, pendingErr := pendingStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -640,6 +717,7 @@ func test_pendingStmt() {
topic = "pending() topic"
payloadStr = "pending() payload"
consumer = "pending() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -652,16 +730,21 @@ func test_pendingStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ pending, pendingClose, pendingErr := pendingStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -820,9 +903,10 @@ func test_pendingStmt() {
})
g.Testing("when we're not the owners we get nothing", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -871,6 +955,7 @@ func test_commitStmt() {
topic = "commit() topic"
payloadStr = "commit() payload"
consumer = "commit() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -883,15 +968,20 @@ func test_commitStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -937,7 +1027,10 @@ func test_commitStmt() {
g.Testing("we can't commit non-existent messages", func() {
err := cmt(consumer, guuid.New())
- g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintNotNull,
+ )
})
g.Testing("multiple consumers may commit a message", func() {
@@ -987,8 +1080,10 @@ func test_commitStmt() {
})
g.Testing("error if we don't own the topic/consumer", func() {
- otherID := instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
+
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -998,13 +1093,10 @@ func test_commitStmt() {
g.TErrorIf(err)
err = commit(consumer, messageID)
- g.TAssertEqual(err, fmt.Errorf(
- noLongerOwnerErrorFmt,
- instanceID,
- topic,
- consumer,
- otherID,
- ))
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintTrigger,
+ )
})
g.Testing("no actual closing occurs", func() {
@@ -1023,6 +1115,7 @@ func test_toDeadStmt() {
topic = "toDead() topic"
payloadStr = "toDead() payload"
consumer = "toDead() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1035,15 +1128,20 @@ func test_toDeadStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1107,7 +1205,10 @@ func test_toDeadStmt() {
g.Testing("we can't mark as dead non-existent messages", func() {
err := asDead(consumer, guuid.New(), guuid.New())
- g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintNotNull,
+ )
})
g.Testing("multiple consumers may mark a message as dead", func() {
@@ -1173,11 +1274,13 @@ func test_toDeadStmt() {
})
g.Testing("error if we don't own the message's consumer/topic", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
+
messageID1 := pub(topic)
messageID2 := pub(topic)
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -1188,13 +1291,10 @@ func test_toDeadStmt() {
g.TErrorIf(err)
err = toDead(consumer, messageID2, guuid.New())
- g.TAssertEqual(err, fmt.Errorf(
- noLongerOwnerErrorFmt,
- instanceID,
- topic,
- consumer,
- otherID,
- ))
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintTrigger,
+ )
})
g.Testing("no actual closing occurs", func() {
@@ -1213,6 +1313,7 @@ func test_replayStmt() {
topic = "replay() topic"
payloadStr = "replay() payload"
consumer = "replay() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1225,15 +1326,20 @@ func test_replayStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1343,6 +1449,7 @@ func test_oneDeadStmt() {
topic = "oneDead() topic"
payloadStr = "oneDead() payload"
consumer = "oneDead() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1355,16 +1462,21 @@ func test_oneDeadStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1457,6 +1569,7 @@ func test_deadletterEach() {
topic = "deadletterEach() topic"
payloadStr = "deadletterEach() payload"
consumer = "deadletterEach() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1469,15 +1582,20 @@ func test_deadletterEach() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1631,6 +1749,7 @@ func test_allDeadStmt() {
topic = "allDead() topic"
payloadStr = "allDead() payload"
consumer = "allDead() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1643,16 +1762,21 @@ func test_allDeadStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
- allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1784,6 +1908,7 @@ func test_sizeStmt() {
topic = "size() topic"
payloadStr = "size() payload"
consumer = "size() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1796,17 +1921,22 @@ func test_sizeStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
- size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
+ size, sizeClose, sizeErr := sizeStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1892,6 +2022,7 @@ func test_countStmt() {
topic = "count() topic"
payloadStr = "count() payload"
consumer = "count() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1904,17 +2035,22 @@ func test_countStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- count, countClose, countErr := countStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ next, nextClose, nextErr := nextStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ count, countClose, countErr := countStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -2020,6 +2156,7 @@ func test_hasDataStmt() {
topic = "hasData() topic"
payloadStr = "hasData() payload"
consumer = "hasData() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -2032,17 +2169,22 @@ func test_hasDataStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ next, nextClose, nextErr := nextStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ hasData, hasDataClose, hasDataErr := hasDataStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -2148,6 +2290,8 @@ func test_initDB() {
topic = "initDB() topic"
payloadStr = "initDB() payload"
consumer = "initDB() consumer"
+ dbpath = golite.InMemory
+ prefix = defaultPrefix
)
var (
flowID = guuid.New()
@@ -2159,17 +2303,12 @@ func test_initDB() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
-
var messages []messageT
notifyFn := func(message messageT) {
messages = append(messages, message)
}
- instanceID := os.Getpid()
- queries, err := initDB(db, defaultPrefix, notifyFn, instanceID)
+ queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
g.TErrorIf(err)
defer queries.close()
@@ -2256,31 +2395,16 @@ func test_initDB() {
func test_queriesTclose() {
g.TestStart("queriesT.close()")
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
+ const (
+ dbpath = golite.InMemory
+ prefix = defaultPrefix
+ )
- instanceID := os.Getpid()
- queries, err := initDB(db, defaultPrefix, func(messageT) {}, instanceID)
+ notifyFn := func(messageT) {}
+ queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
g.TErrorIf(err)
- g.Testing("after closing, we can't run queries", func() {
- unsent := UnsentMessage{ Payload: []byte{}, }
- _, err := queries.publish(unsent, guuid.New())
- g.TErrorIf(err)
- g.TErrorIf(db.Close())
-
- err = queries.close()
- g.TErrorIf(err)
-
- _, err = queries.publish(unsent, guuid.New())
- g.TAssertEqual(
- err.Error(),
- "sql: database is closed",
- )
- })
-
g.Testing("closing mre than once does not error", func() {
g.TErrorIf(g.SomeError(
queries.close(),
@@ -2289,7 +2413,6 @@ func test_queriesTclose() {
})
}
-
func test_newPinger() {
g.TestStart("newPinger()")
@@ -3287,6 +3410,7 @@ func test_queueT_Publish() {
const (
topic = "queueT.Publish() topic"
payloadStr = "queueT.Publish() payload"
+ dbpath = golite.InMemory
)
var (
flowID = guuid.New()
@@ -3298,7 +3422,7 @@ func test_queueT_Publish() {
}
)
- queue, err := New(golite.InMemory)
+ queue, err := New(dbpath)
g.TErrorIf(err)
defer queue.Close()
@@ -4350,11 +4474,7 @@ func test_queueT_Close() {
queriesErr = errors.New("queriesT{} error")
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
-
queue := queueT{
- db: db,
queries: queriesT{
close: func() error{
queriesCount++
@@ -4376,7 +4496,7 @@ func test_queueT_Close() {
},
}
- err = queue.Close()
+ err := queue.Close()
g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr))
g.TAssertEqual(pingerCount, 1)
g.TAssertEqual(subscriptionsCount, 1)
@@ -5674,6 +5794,7 @@ func dumpQueries() {
{ "take", takeSQL },
{ "publish", publishSQL },
{ "find", findSQL },
+ { "next", nextSQL },
{ "pending", pendingSQL },
{ "commit", commitSQL },
{ "toDead", toDeadSQL },
@@ -5703,6 +5824,8 @@ func MainTest() {
g.Init()
test_defaultPrefix()
+ test_serialized()
+ test_execSerialized()
test_tryRollback()
test_inTx()
test_createTables()