summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/functional/consumer-with-deadletter/q.go6
-rw-r--r--tests/functional/new-instance-takeover/q.go20
-rw-r--r--tests/functional/wait-after-publish/q.go5
-rw-r--r--tests/q.go453
-rw-r--r--tests/queries.sql107
5 files changed, 371 insertions, 220 deletions
diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go
index 25391c5..a79ad5b 100644
--- a/tests/functional/consumer-with-deadletter/q.go
+++ b/tests/functional/consumer-with-deadletter/q.go
@@ -2,7 +2,6 @@ package q
import (
"errors"
- "os"
"runtime"
"guuid"
@@ -44,15 +43,10 @@ func MainTest() {
g.TAssertEqualS(ok, true, "can't get filename")
databasePath := file + ".db"
- os.Remove(databasePath)
- os.Remove(databasePath + "-shm")
- os.Remove(databasePath + "-wal")
-
queue, err := New(databasePath)
g.TErrorIf(err)
defer queue.Close()
-
pub := func(payload []byte, flowID guuid.UUID) {
unsent := UnsentMessage{
Topic: topicX,
diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go
index 6e04e5f..76ed59f 100644
--- a/tests/functional/new-instance-takeover/q.go
+++ b/tests/functional/new-instance-takeover/q.go
@@ -1,7 +1,6 @@
package q
import (
- "fmt"
"runtime"
"os"
@@ -33,16 +32,16 @@ func handlerFn(publish func(guuid.UUID)) func(Message) error {
}
func startInstance(
- databasePath string,
+ dbpath string,
instanceID int,
name string,
) (IQueue, error) {
- iqueue, err := New(databasePath)
+ iqueue, err := New(dbpath)
g.TErrorIf(err)
queue := iqueue.(queueT)
notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
- queries, err := initDB(queue.db, defaultPrefix, notifyFn, instanceID)
+ queries, err := initDB(dbpath, defaultPrefix, notifyFn, instanceID)
g.TErrorIf(err)
err = queue.queries.close()
@@ -68,20 +67,12 @@ func startInstance(
func MainTest() {
- // https://sqlite.org/forum/forumpost/2507664507
g.Init()
_, file, _, ok := runtime.Caller(0)
g.TAssertEqualS(ok, true, "can't get filename")
dbpath := file + ".db"
- dbpath = "/mnt/dois/andreh/t.db"
- os.Remove(dbpath)
- os.Remove(dbpath + "-shm")
- os.Remove(dbpath + "-wal")
-
- // FIXME
- return
instanceID1 := os.Getpid()
instanceID2 := instanceID1 + 1
@@ -89,10 +80,6 @@ func MainTest() {
flowID2 := guuid.New()
g.Testing("new instances take ownership of topic+name combo", func() {
- if false {
- fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1)
- }
-
q1, err := startInstance(dbpath, instanceID1, "first")
g.TErrorIf(err)
defer q1.Close()
@@ -103,7 +90,6 @@ func MainTest() {
<- q1.WaitFor("individual-first", flowID1, "w").Channel
<- q1.WaitFor( "shared-first", flowID1, "w").Channel
- // println("waited 1")
q2, err := startInstance(dbpath, instanceID2, "second")
g.TErrorIf(err)
diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go
index 15a532d..b70d27e 100644
--- a/tests/functional/wait-after-publish/q.go
+++ b/tests/functional/wait-after-publish/q.go
@@ -1,7 +1,6 @@
package q
import (
- "os"
"runtime"
"guuid"
@@ -19,10 +18,6 @@ func MainTest() {
g.TAssertEqualS(ok, true, "can't get filename")
databasePath := file + ".db"
- os.Remove(databasePath)
- os.Remove(databasePath + "-shm")
- os.Remove(databasePath + "-wal")
-
queue, err := New(databasePath)
g.TErrorIf(err)
defer queue.Close()
diff --git a/tests/q.go b/tests/q.go
index 844e722..565ca25 100644
--- a/tests/q.go
+++ b/tests/q.go
@@ -21,6 +21,10 @@ import (
+var instanceID = os.Getpid()
+
+
+
func test_defaultPrefix() {
g.TestStart("defaultPrefix")
@@ -29,13 +33,46 @@ func test_defaultPrefix() {
})
}
-func test_tryRollback() {
+func test_serialized() {
// FIXME
}
-func test_inTx() {
- /*
+func test_execSerialized() {
// FIXME
+}
+
+func test_tryRollback() {
+ g.TestStart("tryRollback()")
+
+ myErr := errors.New("bottom error")
+
+ db, err := sql.Open(golite.DriverName, golite.InMemory)
+ g.TErrorIf(err)
+ defer db.Close()
+
+
+ g.Testing("the error is propagated if rollback doesn't fail", func() {
+ tx, err := db.Begin()
+ g.TErrorIf(err)
+
+ err = tryRollback(tx, myErr)
+ g.TAssertEqual(err, myErr)
+ })
+
+ g.Testing("a wrapped error when rollback fails", func() {
+ tx, err := db.Begin()
+ g.TErrorIf(err)
+
+ err = tx.Commit()
+ g.TErrorIf(err)
+
+ err = tryRollback(tx, myErr)
+ g.TAssertEqual(reflect.DeepEqual(err, myErr), false)
+ g.TAssertEqual(errors.Is(err, myErr), true)
+ })
+}
+
+func test_inTx() {
g.TestStart("inTx()")
db, err := sql.Open(golite.DriverName, golite.InMemory)
@@ -44,11 +81,11 @@ func test_inTx() {
g.Testing("when fn() errors, we propagate it", func() {
- myError := errors.New("to be propagated")
+ myErr := errors.New("to be propagated")
err := inTx(db, func(tx *sql.Tx) error {
- return myError
+ return myErr
})
- g.TAssertEqual(err, myError)
+ g.TAssertEqual(err, myErr)
})
g.Testing("on nil error we get nil", func() {
@@ -57,13 +94,17 @@ func test_inTx() {
})
g.TErrorIf(err)
})
- */
}
func test_createTables() {
g.TestStart("createTables()")
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ const (
+ dbpath = golite.InMemory
+ prefix = defaultPrefix
+ )
+
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
defer db.Close()
@@ -72,12 +113,12 @@ func test_createTables() {
const tmpl_read = `
SELECT id FROM "%s_messages" LIMIT 1;
`
- qRead := fmt.Sprintf(tmpl_read, defaultPrefix)
+ qRead := fmt.Sprintf(tmpl_read, prefix)
_, err := db.Exec(qRead)
g.TErrorNil(err)
- err = createTables(db, defaultPrefix)
+ err = createTables(db, prefix)
g.TErrorIf(err)
_, err = db.Exec(qRead)
@@ -86,9 +127,9 @@ func test_createTables() {
g.Testing("we can do it multiple times", func() {
g.TErrorIf(g.SomeError(
- createTables(db, defaultPrefix),
- createTables(db, defaultPrefix),
- createTables(db, defaultPrefix),
+ createTables(db, prefix),
+ createTables(db, prefix),
+ createTables(db, prefix),
))
})
}
@@ -99,15 +140,21 @@ func test_takeStmt() {
const (
topic = "take() topic"
consumer = "take() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
g.TErrorIf(takeErr)
defer g.SomeFnError(
takeClose,
@@ -137,9 +184,10 @@ func test_takeStmt() {
})
g.Testing("if there is already an owner, we overtake it", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -153,7 +201,7 @@ func test_takeStmt() {
err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
g.TErrorIf(err)
- g.TAssertEqual(ownerID, otherID)
+ g.TAssertEqual(ownerID, otherCfg.instanceID)
})
g.Testing("no error if closed more than once", func() {
g.TErrorIf(g.SomeError(
@@ -170,6 +218,7 @@ func test_publishStmt() {
const (
topic = "publish() topic"
payloadStr = "publish() payload"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -182,12 +231,17 @@ func test_publishStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ publish, publishClose, publishErr := publishStmt(cfg)
g.TErrorIf(publishErr)
defer g.SomeFnError(
publishClose,
@@ -255,6 +309,7 @@ func test_findStmt() {
const (
topic = "find() topic"
payloadStr = "find() payload"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -267,13 +322,18 @@ func test_findStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- find, findClose, findErr := findStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ publish, publishClose, publishErr := publishStmt(cfg)
+ find, findClose, findErr := findStmt(cfg)
g.TErrorIf(g.SomeError(
publishErr,
findErr,
@@ -354,6 +414,7 @@ func test_nextStmt() {
topic = "next() topic"
payloadStr = "next() payload"
consumer = "next() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -366,15 +427,24 @@ func test_nextStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ next, nextClose, nextErr := nextStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ g.TErrorIf(takeErr)
+ g.TErrorIf(publishErr)
+ g.TErrorIf(nextErr)
+ g.TErrorIf(commitErr)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -440,9 +510,10 @@ func test_nextStmt() {
})
g.Testing("error when we're not the owner", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -455,7 +526,7 @@ func test_nextStmt() {
_, err = next(topic, consumer)
g.TAssertEqual(err, fmt.Errorf(
notOwnerErrorFmt,
- otherID,
+ otherCfg.instanceID,
topic,
consumer,
instanceID,
@@ -478,6 +549,7 @@ func test_messageEach() {
topic = "messageEach() topic"
payloadStr = "messageEach() payload"
consumer = "messageEach() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -490,14 +562,19 @@ func test_messageEach() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ pending, pendingClose, pendingErr := pendingStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -640,6 +717,7 @@ func test_pendingStmt() {
topic = "pending() topic"
payloadStr = "pending() payload"
consumer = "pending() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -652,16 +730,21 @@ func test_pendingStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ pending, pendingClose, pendingErr := pendingStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -820,9 +903,10 @@ func test_pendingStmt() {
})
g.Testing("when we're not the owners we get nothing", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -871,6 +955,7 @@ func test_commitStmt() {
topic = "commit() topic"
payloadStr = "commit() payload"
consumer = "commit() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -883,15 +968,20 @@ func test_commitStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -937,7 +1027,10 @@ func test_commitStmt() {
g.Testing("we can't commit non-existent messages", func() {
err := cmt(consumer, guuid.New())
- g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintNotNull,
+ )
})
g.Testing("multiple consumers may commit a message", func() {
@@ -987,8 +1080,10 @@ func test_commitStmt() {
})
g.Testing("error if we don't own the topic/consumer", func() {
- otherID := instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
+
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -998,13 +1093,10 @@ func test_commitStmt() {
g.TErrorIf(err)
err = commit(consumer, messageID)
- g.TAssertEqual(err, fmt.Errorf(
- noLongerOwnerErrorFmt,
- instanceID,
- topic,
- consumer,
- otherID,
- ))
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintTrigger,
+ )
})
g.Testing("no actual closing occurs", func() {
@@ -1023,6 +1115,7 @@ func test_toDeadStmt() {
topic = "toDead() topic"
payloadStr = "toDead() payload"
consumer = "toDead() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1035,15 +1128,20 @@ func test_toDeadStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1107,7 +1205,10 @@ func test_toDeadStmt() {
g.Testing("we can't mark as dead non-existent messages", func() {
err := asDead(consumer, guuid.New(), guuid.New())
- g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintNotNull,
+ )
})
g.Testing("multiple consumers may mark a message as dead", func() {
@@ -1173,11 +1274,13 @@ func test_toDeadStmt() {
})
g.Testing("error if we don't own the message's consumer/topic", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
+
messageID1 := pub(topic)
messageID2 := pub(topic)
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -1188,13 +1291,10 @@ func test_toDeadStmt() {
g.TErrorIf(err)
err = toDead(consumer, messageID2, guuid.New())
- g.TAssertEqual(err, fmt.Errorf(
- noLongerOwnerErrorFmt,
- instanceID,
- topic,
- consumer,
- otherID,
- ))
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintTrigger,
+ )
})
g.Testing("no actual closing occurs", func() {
@@ -1213,6 +1313,7 @@ func test_replayStmt() {
topic = "replay() topic"
payloadStr = "replay() payload"
consumer = "replay() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1225,15 +1326,20 @@ func test_replayStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1343,6 +1449,7 @@ func test_oneDeadStmt() {
topic = "oneDead() topic"
payloadStr = "oneDead() payload"
consumer = "oneDead() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1355,16 +1462,21 @@ func test_oneDeadStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1457,6 +1569,7 @@ func test_deadletterEach() {
topic = "deadletterEach() topic"
payloadStr = "deadletterEach() payload"
consumer = "deadletterEach() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1469,15 +1582,20 @@ func test_deadletterEach() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1631,6 +1749,7 @@ func test_allDeadStmt() {
topic = "allDead() topic"
payloadStr = "allDead() payload"
consumer = "allDead() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1643,16 +1762,21 @@ func test_allDeadStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
- allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1784,6 +1908,7 @@ func test_sizeStmt() {
topic = "size() topic"
payloadStr = "size() payload"
consumer = "size() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1796,17 +1921,22 @@ func test_sizeStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
- size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
+ size, sizeClose, sizeErr := sizeStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1892,6 +2022,7 @@ func test_countStmt() {
topic = "count() topic"
payloadStr = "count() payload"
consumer = "count() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1904,17 +2035,22 @@ func test_countStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- count, countClose, countErr := countStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ next, nextClose, nextErr := nextStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ count, countClose, countErr := countStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -2020,6 +2156,7 @@ func test_hasDataStmt() {
topic = "hasData() topic"
payloadStr = "hasData() payload"
consumer = "hasData() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -2032,17 +2169,22 @@ func test_hasDataStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ next, nextClose, nextErr := nextStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ hasData, hasDataClose, hasDataErr := hasDataStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -2148,6 +2290,8 @@ func test_initDB() {
topic = "initDB() topic"
payloadStr = "initDB() payload"
consumer = "initDB() consumer"
+ dbpath = golite.InMemory
+ prefix = defaultPrefix
)
var (
flowID = guuid.New()
@@ -2159,17 +2303,12 @@ func test_initDB() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
-
var messages []messageT
notifyFn := func(message messageT) {
messages = append(messages, message)
}
- instanceID := os.Getpid()
- queries, err := initDB(db, defaultPrefix, notifyFn, instanceID)
+ queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
g.TErrorIf(err)
defer queries.close()
@@ -2256,31 +2395,16 @@ func test_initDB() {
func test_queriesTclose() {
g.TestStart("queriesT.close()")
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
+ const (
+ dbpath = golite.InMemory
+ prefix = defaultPrefix
+ )
- instanceID := os.Getpid()
- queries, err := initDB(db, defaultPrefix, func(messageT) {}, instanceID)
+ notifyFn := func(messageT) {}
+ queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
g.TErrorIf(err)
- g.Testing("after closing, we can't run queries", func() {
- unsent := UnsentMessage{ Payload: []byte{}, }
- _, err := queries.publish(unsent, guuid.New())
- g.TErrorIf(err)
- g.TErrorIf(db.Close())
-
- err = queries.close()
- g.TErrorIf(err)
-
- _, err = queries.publish(unsent, guuid.New())
- g.TAssertEqual(
- err.Error(),
- "sql: database is closed",
- )
- })
-
g.Testing("closing mre than once does not error", func() {
g.TErrorIf(g.SomeError(
queries.close(),
@@ -2289,7 +2413,6 @@ func test_queriesTclose() {
})
}
-
func test_newPinger() {
g.TestStart("newPinger()")
@@ -3287,6 +3410,7 @@ func test_queueT_Publish() {
const (
topic = "queueT.Publish() topic"
payloadStr = "queueT.Publish() payload"
+ dbpath = golite.InMemory
)
var (
flowID = guuid.New()
@@ -3298,7 +3422,7 @@ func test_queueT_Publish() {
}
)
- queue, err := New(golite.InMemory)
+ queue, err := New(dbpath)
g.TErrorIf(err)
defer queue.Close()
@@ -4350,11 +4474,7 @@ func test_queueT_Close() {
queriesErr = errors.New("queriesT{} error")
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
-
queue := queueT{
- db: db,
queries: queriesT{
close: func() error{
queriesCount++
@@ -4376,7 +4496,7 @@ func test_queueT_Close() {
},
}
- err = queue.Close()
+ err := queue.Close()
g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr))
g.TAssertEqual(pingerCount, 1)
g.TAssertEqual(subscriptionsCount, 1)
@@ -5674,6 +5794,7 @@ func dumpQueries() {
{ "take", takeSQL },
{ "publish", publishSQL },
{ "find", findSQL },
+ { "next", nextSQL },
{ "pending", pendingSQL },
{ "commit", commitSQL },
{ "toDead", toDeadSQL },
@@ -5703,6 +5824,8 @@ func MainTest() {
g.Init()
test_defaultPrefix()
+ test_serialized()
+ test_execSerialized()
test_tryRollback()
test_inTx()
test_createTables()
diff --git a/tests/queries.sql b/tests/queries.sql
index c821e25..e790d41 100644
--- a/tests/queries.sql
+++ b/tests/queries.sql
@@ -27,6 +27,7 @@
consumer TEXT NOT NULL,
message_id INTEGER NOT NULL
REFERENCES "q_messages"(id),
+ instance_id INTEGER NOT NULL,
UNIQUE (consumer, message_id)
) STRICT;
CREATE INDEX IF NOT EXISTS "q_offsets_consumer"
@@ -38,6 +39,7 @@
consumer TEXT NOT NULL,
message_id INTEGER NOT NULL
REFERENCES "q_messages"(id),
+ instance_id INTEGER NOT NULL,
UNIQUE (consumer, message_id)
) STRICT;
CREATE INDEX IF NOT EXISTS "q_deadletters_consumer"
@@ -58,6 +60,44 @@
owner_id INTEGER NOT NULL,
UNIQUE (topic, consumer)
) STRICT;
+
+ CREATE TRIGGER IF NOT EXISTS "q_check_instance_owns_topic"
+ BEFORE INSERT ON "q_offsets"
+ WHEN NEW.instance_id != (
+ SELECT owner_id FROM "q_owners"
+ WHERE topic = (
+ SELECT "q_payloads".topic
+ FROM "q_payloads"
+ JOIN "q_messages" ON "q_payloads".id =
+ "q_messages".payload_id
+ WHERE "q_messages".id = NEW.message_id
+ ) AND consumer = NEW.consumer
+ )
+ BEGIN
+ SELECT RAISE(
+ ABORT,
+ 'instance does not own topic/consumer combo'
+ );
+ END;
+
+ CREATE TRIGGER IF NOT EXISTS "q_check_can_publish_deadletter"
+ BEFORE INSERT ON "q_deadletters"
+ WHEN NEW.instance_id != (
+ SELECT owner_id FROM "q_owners"
+ WHERE topic = (
+ SELECT "q_payloads".topic
+ FROM "q_payloads"
+ JOIN "q_messages" ON "q_payloads".id =
+ "q_messages".payload_id
+ WHERE "q_messages".id = NEW.message_id
+ ) AND consumer = NEW.consumer
+ )
+ BEGIN
+ SELECT RAISE(
+ ABORT,
+ 'Instance does not own topic/consumer combo'
+ );
+ END;
-- read:
@@ -81,7 +121,6 @@
INSERT INTO "q_payloads" (topic, payload)
VALUES (?, ?);
- -- FIXME: must be inside a trnsaction
INSERT INTO "q_messages" (uuid, flow_id, payload_id)
VALUES (?, ?, last_insert_rowid());
@@ -114,6 +153,38 @@
-- owner:
+-- next.sql:
+-- write:
+
+-- read:
+ SELECT
+ (
+ SELECT owner_id FROM "q_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?
+ LIMIT 1
+ ) AS owner_id,
+ "q_messages".id,
+ "q_messages".timestamp,
+ "q_messages".uuid,
+ "q_messages".flow_id,
+ "q_payloads".payload
+ FROM "q_messages"
+ JOIN "q_payloads" ON
+ "q_payloads".id = "q_messages".payload_id
+ WHERE
+ "q_payloads".topic = ? AND
+ "q_messages".id NOT IN (
+ SELECT message_id FROM "q_offsets"
+ WHERE consumer = ?
+ )
+ ORDER BY "q_messages".id ASC
+ LIMIT 1;
+
+
+-- owner:
+
-- pending.sql:
-- write:
@@ -146,46 +217,28 @@
-- commit.sql:
-- write:
- INSERT INTO "q_offsets" (consumer, message_id)
- VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+ INSERT INTO "q_offsets" (consumer, message_id, instance_id)
+ VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?);
-- read:
- SELECT "q_payloads".topic from "q_payloads"
- JOIN "q_messages" ON
- "q_payloads".id = "q_messages".payload_id
- WHERE "q_messages".uuid = ?;
-
-- owner:
- SELECT owner_id FROM "q_owners"
- WHERE
- topic = ? AND
- consumer = ?;
-
-- toDead.sql:
-- write:
- INSERT INTO "q_offsets" ( consumer, message_id)
- VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+ INSERT INTO "q_offsets"
+ ( consumer, message_id, instance_id)
+ VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?);
- INSERT INTO "q_deadletters" (uuid, consumer, message_id)
- VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+ INSERT INTO "q_deadletters"
+ (uuid, consumer, message_id, instance_id)
+ VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?);
-- read:
- SELECT "q_payloads".topic FROM "q_payloads"
- JOIN "q_messages" ON
- "q_payloads".id = "q_messages".payload_id
- WHERE "q_messages".uuid = ?;
-
-- owner:
- SELECT owner_id FROM "q_owners"
- WHERE
- topic = ? AND
- consumer = ?;
-
-- replay.sql:
-- write: