summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2024-10-30 20:26:41 -0300
committerEuAndreh <eu@euandre.org>2024-10-31 19:57:42 -0300
commit5d53704f450788452837b18b26f54d235d883490 (patch)
tree91bb61d6f823523675ae9f93bb8561858bf7be14 /tests
parenttests/q.go: Replace ":memory:" with golite.InMemory (diff)
downloadfiinha-5d53704f450788452837b18b26f54d235d883490.tar.gz
fiinha-5d53704f450788452837b18b26f54d235d883490.tar.xz
src/q.go: Fix SQLite ~broken~ transactions
As per expected by SQLite, create new connections so they have independent transaction, and serialize its use via a channel with a single consumer. == Other changes === Remove `db` attribute from `queueT` type Now that `initDB()` can create many database connections (A.K.A. opaque pointers to the `sqlite3*` object), it makes more sense to hand to it the responsability to create and destroy these databases. So now a `queries.close()` includes what previously was `db.Close()`, and needing to do it was the only reason that made us keep the `db` attribute. The arguments to `initDB()` were adjusted to reflect that, as it no longer is given an initialized database handle, but only the database path, and does the creation of the handles by itself. === Shoehorn multi-statement queries into single-statement ones In order to avoid creating one `execSerialized()` function per attribute of the `queryT` queries, we try to leverage SQLite's built-in transactionality per individual statement. So compound queries that were previously done in multiple statements wrapped with a `inTx()` that could be shoe-horned into a single `SELECT` were rewritten to avoid having more of this application-level serialization. The topic/consumer owner validation was also changed: now it is a trigger on the relevant tables that `ABORT` the implicit/invisible transaction and makes the `INSERT` fail. In order to make this possible, the `"%s_owners"` table now has an extra column: `instance_id`. Despite being extra data, this meta-information isn't duplicated from anywhere else, and it is an actual useful information for operators to leverage. So now the trigger is responsible for stopping the transaction from going forward without adding an explicit `inTx()` around it, and the writes can mostly be shrunk to single-statement queries. Fortunately, all this is enough to fix the `new-instance-takeover` functional test.
Diffstat (limited to 'tests')
-rw-r--r--tests/functional/consumer-with-deadletter/q.go6
-rw-r--r--tests/functional/new-instance-takeover/q.go20
-rw-r--r--tests/functional/wait-after-publish/q.go5
-rw-r--r--tests/q.go453
-rw-r--r--tests/queries.sql107
5 files changed, 371 insertions, 220 deletions
diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go
index 25391c5..a79ad5b 100644
--- a/tests/functional/consumer-with-deadletter/q.go
+++ b/tests/functional/consumer-with-deadletter/q.go
@@ -2,7 +2,6 @@ package q
import (
"errors"
- "os"
"runtime"
"guuid"
@@ -44,15 +43,10 @@ func MainTest() {
g.TAssertEqualS(ok, true, "can't get filename")
databasePath := file + ".db"
- os.Remove(databasePath)
- os.Remove(databasePath + "-shm")
- os.Remove(databasePath + "-wal")
-
queue, err := New(databasePath)
g.TErrorIf(err)
defer queue.Close()
-
pub := func(payload []byte, flowID guuid.UUID) {
unsent := UnsentMessage{
Topic: topicX,
diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go
index 6e04e5f..76ed59f 100644
--- a/tests/functional/new-instance-takeover/q.go
+++ b/tests/functional/new-instance-takeover/q.go
@@ -1,7 +1,6 @@
package q
import (
- "fmt"
"runtime"
"os"
@@ -33,16 +32,16 @@ func handlerFn(publish func(guuid.UUID)) func(Message) error {
}
func startInstance(
- databasePath string,
+ dbpath string,
instanceID int,
name string,
) (IQueue, error) {
- iqueue, err := New(databasePath)
+ iqueue, err := New(dbpath)
g.TErrorIf(err)
queue := iqueue.(queueT)
notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
- queries, err := initDB(queue.db, defaultPrefix, notifyFn, instanceID)
+ queries, err := initDB(dbpath, defaultPrefix, notifyFn, instanceID)
g.TErrorIf(err)
err = queue.queries.close()
@@ -68,20 +67,12 @@ func startInstance(
func MainTest() {
- // https://sqlite.org/forum/forumpost/2507664507
g.Init()
_, file, _, ok := runtime.Caller(0)
g.TAssertEqualS(ok, true, "can't get filename")
dbpath := file + ".db"
- dbpath = "/mnt/dois/andreh/t.db"
- os.Remove(dbpath)
- os.Remove(dbpath + "-shm")
- os.Remove(dbpath + "-wal")
-
- // FIXME
- return
instanceID1 := os.Getpid()
instanceID2 := instanceID1 + 1
@@ -89,10 +80,6 @@ func MainTest() {
flowID2 := guuid.New()
g.Testing("new instances take ownership of topic+name combo", func() {
- if false {
- fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1)
- }
-
q1, err := startInstance(dbpath, instanceID1, "first")
g.TErrorIf(err)
defer q1.Close()
@@ -103,7 +90,6 @@ func MainTest() {
<- q1.WaitFor("individual-first", flowID1, "w").Channel
<- q1.WaitFor( "shared-first", flowID1, "w").Channel
- // println("waited 1")
q2, err := startInstance(dbpath, instanceID2, "second")
g.TErrorIf(err)
diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go
index 15a532d..b70d27e 100644
--- a/tests/functional/wait-after-publish/q.go
+++ b/tests/functional/wait-after-publish/q.go
@@ -1,7 +1,6 @@
package q
import (
- "os"
"runtime"
"guuid"
@@ -19,10 +18,6 @@ func MainTest() {
g.TAssertEqualS(ok, true, "can't get filename")
databasePath := file + ".db"
- os.Remove(databasePath)
- os.Remove(databasePath + "-shm")
- os.Remove(databasePath + "-wal")
-
queue, err := New(databasePath)
g.TErrorIf(err)
defer queue.Close()
diff --git a/tests/q.go b/tests/q.go
index 844e722..565ca25 100644
--- a/tests/q.go
+++ b/tests/q.go
@@ -21,6 +21,10 @@ import (
+var instanceID = os.Getpid()
+
+
+
func test_defaultPrefix() {
g.TestStart("defaultPrefix")
@@ -29,13 +33,46 @@ func test_defaultPrefix() {
})
}
-func test_tryRollback() {
+func test_serialized() {
// FIXME
}
-func test_inTx() {
- /*
+func test_execSerialized() {
// FIXME
+}
+
+func test_tryRollback() {
+ g.TestStart("tryRollback()")
+
+ myErr := errors.New("bottom error")
+
+ db, err := sql.Open(golite.DriverName, golite.InMemory)
+ g.TErrorIf(err)
+ defer db.Close()
+
+
+ g.Testing("the error is propagated if rollback doesn't fail", func() {
+ tx, err := db.Begin()
+ g.TErrorIf(err)
+
+ err = tryRollback(tx, myErr)
+ g.TAssertEqual(err, myErr)
+ })
+
+ g.Testing("a wrapped error when rollback fails", func() {
+ tx, err := db.Begin()
+ g.TErrorIf(err)
+
+ err = tx.Commit()
+ g.TErrorIf(err)
+
+ err = tryRollback(tx, myErr)
+ g.TAssertEqual(reflect.DeepEqual(err, myErr), false)
+ g.TAssertEqual(errors.Is(err, myErr), true)
+ })
+}
+
+func test_inTx() {
g.TestStart("inTx()")
db, err := sql.Open(golite.DriverName, golite.InMemory)
@@ -44,11 +81,11 @@ func test_inTx() {
g.Testing("when fn() errors, we propagate it", func() {
- myError := errors.New("to be propagated")
+ myErr := errors.New("to be propagated")
err := inTx(db, func(tx *sql.Tx) error {
- return myError
+ return myErr
})
- g.TAssertEqual(err, myError)
+ g.TAssertEqual(err, myErr)
})
g.Testing("on nil error we get nil", func() {
@@ -57,13 +94,17 @@ func test_inTx() {
})
g.TErrorIf(err)
})
- */
}
func test_createTables() {
g.TestStart("createTables()")
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ const (
+ dbpath = golite.InMemory
+ prefix = defaultPrefix
+ )
+
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
defer db.Close()
@@ -72,12 +113,12 @@ func test_createTables() {
const tmpl_read = `
SELECT id FROM "%s_messages" LIMIT 1;
`
- qRead := fmt.Sprintf(tmpl_read, defaultPrefix)
+ qRead := fmt.Sprintf(tmpl_read, prefix)
_, err := db.Exec(qRead)
g.TErrorNil(err)
- err = createTables(db, defaultPrefix)
+ err = createTables(db, prefix)
g.TErrorIf(err)
_, err = db.Exec(qRead)
@@ -86,9 +127,9 @@ func test_createTables() {
g.Testing("we can do it multiple times", func() {
g.TErrorIf(g.SomeError(
- createTables(db, defaultPrefix),
- createTables(db, defaultPrefix),
- createTables(db, defaultPrefix),
+ createTables(db, prefix),
+ createTables(db, prefix),
+ createTables(db, prefix),
))
})
}
@@ -99,15 +140,21 @@ func test_takeStmt() {
const (
topic = "take() topic"
consumer = "take() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
g.TErrorIf(takeErr)
defer g.SomeFnError(
takeClose,
@@ -137,9 +184,10 @@ func test_takeStmt() {
})
g.Testing("if there is already an owner, we overtake it", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -153,7 +201,7 @@ func test_takeStmt() {
err = db.QueryRow(sqlOwner, topic, consumer).Scan(&ownerID)
g.TErrorIf(err)
- g.TAssertEqual(ownerID, otherID)
+ g.TAssertEqual(ownerID, otherCfg.instanceID)
})
g.Testing("no error if closed more than once", func() {
g.TErrorIf(g.SomeError(
@@ -170,6 +218,7 @@ func test_publishStmt() {
const (
topic = "publish() topic"
payloadStr = "publish() payload"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -182,12 +231,17 @@ func test_publishStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ publish, publishClose, publishErr := publishStmt(cfg)
g.TErrorIf(publishErr)
defer g.SomeFnError(
publishClose,
@@ -255,6 +309,7 @@ func test_findStmt() {
const (
topic = "find() topic"
payloadStr = "find() payload"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -267,13 +322,18 @@ func test_findStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- find, findClose, findErr := findStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ publish, publishClose, publishErr := publishStmt(cfg)
+ find, findClose, findErr := findStmt(cfg)
g.TErrorIf(g.SomeError(
publishErr,
findErr,
@@ -354,6 +414,7 @@ func test_nextStmt() {
topic = "next() topic"
payloadStr = "next() payload"
consumer = "next() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -366,15 +427,24 @@ func test_nextStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ next, nextClose, nextErr := nextStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ g.TErrorIf(takeErr)
+ g.TErrorIf(publishErr)
+ g.TErrorIf(nextErr)
+ g.TErrorIf(commitErr)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -440,9 +510,10 @@ func test_nextStmt() {
})
g.Testing("error when we're not the owner", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -455,7 +526,7 @@ func test_nextStmt() {
_, err = next(topic, consumer)
g.TAssertEqual(err, fmt.Errorf(
notOwnerErrorFmt,
- otherID,
+ otherCfg.instanceID,
topic,
consumer,
instanceID,
@@ -478,6 +549,7 @@ func test_messageEach() {
topic = "messageEach() topic"
payloadStr = "messageEach() payload"
consumer = "messageEach() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -490,14 +562,19 @@ func test_messageEach() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ pending, pendingClose, pendingErr := pendingStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -640,6 +717,7 @@ func test_pendingStmt() {
topic = "pending() topic"
payloadStr = "pending() payload"
consumer = "pending() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -652,16 +730,21 @@ func test_pendingStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- pending, pendingClose, pendingErr := pendingStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ pending, pendingClose, pendingErr := pendingStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -820,9 +903,10 @@ func test_pendingStmt() {
})
g.Testing("when we're not the owners we get nothing", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -871,6 +955,7 @@ func test_commitStmt() {
topic = "commit() topic"
payloadStr = "commit() payload"
consumer = "commit() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -883,15 +968,20 @@ func test_commitStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -937,7 +1027,10 @@ func test_commitStmt() {
g.Testing("we can't commit non-existent messages", func() {
err := cmt(consumer, guuid.New())
- g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintNotNull,
+ )
})
g.Testing("multiple consumers may commit a message", func() {
@@ -987,8 +1080,10 @@ func test_commitStmt() {
})
g.Testing("error if we don't own the topic/consumer", func() {
- otherID := instanceID + 1
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
+
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -998,13 +1093,10 @@ func test_commitStmt() {
g.TErrorIf(err)
err = commit(consumer, messageID)
- g.TAssertEqual(err, fmt.Errorf(
- noLongerOwnerErrorFmt,
- instanceID,
- topic,
- consumer,
- otherID,
- ))
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintTrigger,
+ )
})
g.Testing("no actual closing occurs", func() {
@@ -1023,6 +1115,7 @@ func test_toDeadStmt() {
topic = "toDead() topic"
payloadStr = "toDead() payload"
consumer = "toDead() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1035,15 +1128,20 @@ func test_toDeadStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1107,7 +1205,10 @@ func test_toDeadStmt() {
g.Testing("we can't mark as dead non-existent messages", func() {
err := asDead(consumer, guuid.New(), guuid.New())
- g.TAssertEqual(err, sql.ErrNoRows)
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintNotNull,
+ )
})
g.Testing("multiple consumers may mark a message as dead", func() {
@@ -1173,11 +1274,13 @@ func test_toDeadStmt() {
})
g.Testing("error if we don't own the message's consumer/topic", func() {
- otherID := instanceID + 1
+ otherCfg := cfg
+ otherCfg.instanceID = instanceID + 1
+
messageID1 := pub(topic)
messageID2 := pub(topic)
- take, takeClose, takeErr := takeStmt(db, prefix, otherID)
+ take, takeClose, takeErr := takeStmt(otherCfg)
g.TErrorIf(takeErr)
defer takeClose()
@@ -1188,13 +1291,10 @@ func test_toDeadStmt() {
g.TErrorIf(err)
err = toDead(consumer, messageID2, guuid.New())
- g.TAssertEqual(err, fmt.Errorf(
- noLongerOwnerErrorFmt,
- instanceID,
- topic,
- consumer,
- otherID,
- ))
+ g.TAssertEqual(
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintTrigger,
+ )
})
g.Testing("no actual closing occurs", func() {
@@ -1213,6 +1313,7 @@ func test_replayStmt() {
topic = "replay() topic"
payloadStr = "replay() payload"
consumer = "replay() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1225,15 +1326,20 @@ func test_replayStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1343,6 +1449,7 @@ func test_oneDeadStmt() {
topic = "oneDead() topic"
payloadStr = "oneDead() payload"
consumer = "oneDead() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1355,16 +1462,21 @@ func test_oneDeadStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1457,6 +1569,7 @@ func test_deadletterEach() {
topic = "deadletterEach() topic"
payloadStr = "deadletterEach() payload"
consumer = "deadletterEach() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1469,15 +1582,20 @@ func test_deadletterEach() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1631,6 +1749,7 @@ func test_allDeadStmt() {
topic = "allDead() topic"
payloadStr = "allDead() payload"
consumer = "allDead() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1643,16 +1762,21 @@ func test_allDeadStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
- allDead, allDeadClose, allDeadErr := allDeadStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
+ allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1784,6 +1908,7 @@ func test_sizeStmt() {
topic = "size() topic"
payloadStr = "size() payload"
consumer = "size() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1796,17 +1921,22 @@ func test_sizeStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- replay, replayClose, replayErr := replayStmt(db, prefix, instanceID)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(db, prefix, instanceID)
- size, sizeClose, sizeErr := sizeStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ replay, replayClose, replayErr := replayStmt(cfg)
+ oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
+ size, sizeClose, sizeErr := sizeStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -1892,6 +2022,7 @@ func test_countStmt() {
topic = "count() topic"
payloadStr = "count() payload"
consumer = "count() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -1904,17 +2035,22 @@ func test_countStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- count, countClose, countErr := countStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ next, nextClose, nextErr := nextStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ count, countClose, countErr := countStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -2020,6 +2156,7 @@ func test_hasDataStmt() {
topic = "hasData() topic"
payloadStr = "hasData() payload"
consumer = "hasData() consumer"
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
var (
@@ -2032,17 +2169,22 @@ func test_hasDataStmt() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
+ db, err := sql.Open(golite.DriverName, dbpath)
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- instanceID := os.Getpid()
- take, takeClose, takeErr := takeStmt(db, prefix, instanceID)
- publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- next, nextClose, nextErr := nextStmt(db, prefix, instanceID)
- commit, commitClose, commitErr := commitStmt(db, prefix, instanceID)
- toDead, toDeadClose, toDeadErr := toDeadStmt(db, prefix, instanceID)
- hasData, hasDataClose, hasDataErr := hasDataStmt(db, prefix, instanceID)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ instanceID: instanceID,
+ }
+ take, takeClose, takeErr := takeStmt(cfg)
+ publish, publishClose, publishErr := publishStmt(cfg)
+ next, nextClose, nextErr := nextStmt(cfg)
+ commit, commitClose, commitErr := commitStmt(cfg)
+ toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
+ hasData, hasDataClose, hasDataErr := hasDataStmt(cfg)
g.TErrorIf(g.SomeError(
takeErr,
publishErr,
@@ -2148,6 +2290,8 @@ func test_initDB() {
topic = "initDB() topic"
payloadStr = "initDB() payload"
consumer = "initDB() consumer"
+ dbpath = golite.InMemory
+ prefix = defaultPrefix
)
var (
flowID = guuid.New()
@@ -2159,17 +2303,12 @@ func test_initDB() {
}
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
-
var messages []messageT
notifyFn := func(message messageT) {
messages = append(messages, message)
}
- instanceID := os.Getpid()
- queries, err := initDB(db, defaultPrefix, notifyFn, instanceID)
+ queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
g.TErrorIf(err)
defer queries.close()
@@ -2256,31 +2395,16 @@ func test_initDB() {
func test_queriesTclose() {
g.TestStart("queriesT.close()")
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
- defer db.Close()
+ const (
+ dbpath = golite.InMemory
+ prefix = defaultPrefix
+ )
- instanceID := os.Getpid()
- queries, err := initDB(db, defaultPrefix, func(messageT) {}, instanceID)
+ notifyFn := func(messageT) {}
+ queries, err := initDB(dbpath, prefix, notifyFn, instanceID)
g.TErrorIf(err)
- g.Testing("after closing, we can't run queries", func() {
- unsent := UnsentMessage{ Payload: []byte{}, }
- _, err := queries.publish(unsent, guuid.New())
- g.TErrorIf(err)
- g.TErrorIf(db.Close())
-
- err = queries.close()
- g.TErrorIf(err)
-
- _, err = queries.publish(unsent, guuid.New())
- g.TAssertEqual(
- err.Error(),
- "sql: database is closed",
- )
- })
-
g.Testing("closing mre than once does not error", func() {
g.TErrorIf(g.SomeError(
queries.close(),
@@ -2289,7 +2413,6 @@ func test_queriesTclose() {
})
}
-
func test_newPinger() {
g.TestStart("newPinger()")
@@ -3287,6 +3410,7 @@ func test_queueT_Publish() {
const (
topic = "queueT.Publish() topic"
payloadStr = "queueT.Publish() payload"
+ dbpath = golite.InMemory
)
var (
flowID = guuid.New()
@@ -3298,7 +3422,7 @@ func test_queueT_Publish() {
}
)
- queue, err := New(golite.InMemory)
+ queue, err := New(dbpath)
g.TErrorIf(err)
defer queue.Close()
@@ -4350,11 +4474,7 @@ func test_queueT_Close() {
queriesErr = errors.New("queriesT{} error")
)
- db, err := sql.Open(golite.DriverName, golite.InMemory)
- g.TErrorIf(err)
-
queue := queueT{
- db: db,
queries: queriesT{
close: func() error{
queriesCount++
@@ -4376,7 +4496,7 @@ func test_queueT_Close() {
},
}
- err = queue.Close()
+ err := queue.Close()
g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr))
g.TAssertEqual(pingerCount, 1)
g.TAssertEqual(subscriptionsCount, 1)
@@ -5674,6 +5794,7 @@ func dumpQueries() {
{ "take", takeSQL },
{ "publish", publishSQL },
{ "find", findSQL },
+ { "next", nextSQL },
{ "pending", pendingSQL },
{ "commit", commitSQL },
{ "toDead", toDeadSQL },
@@ -5703,6 +5824,8 @@ func MainTest() {
g.Init()
test_defaultPrefix()
+ test_serialized()
+ test_execSerialized()
test_tryRollback()
test_inTx()
test_createTables()
diff --git a/tests/queries.sql b/tests/queries.sql
index c821e25..e790d41 100644
--- a/tests/queries.sql
+++ b/tests/queries.sql
@@ -27,6 +27,7 @@
consumer TEXT NOT NULL,
message_id INTEGER NOT NULL
REFERENCES "q_messages"(id),
+ instance_id INTEGER NOT NULL,
UNIQUE (consumer, message_id)
) STRICT;
CREATE INDEX IF NOT EXISTS "q_offsets_consumer"
@@ -38,6 +39,7 @@
consumer TEXT NOT NULL,
message_id INTEGER NOT NULL
REFERENCES "q_messages"(id),
+ instance_id INTEGER NOT NULL,
UNIQUE (consumer, message_id)
) STRICT;
CREATE INDEX IF NOT EXISTS "q_deadletters_consumer"
@@ -58,6 +60,44 @@
owner_id INTEGER NOT NULL,
UNIQUE (topic, consumer)
) STRICT;
+
+ CREATE TRIGGER IF NOT EXISTS "q_check_instance_owns_topic"
+ BEFORE INSERT ON "q_offsets"
+ WHEN NEW.instance_id != (
+ SELECT owner_id FROM "q_owners"
+ WHERE topic = (
+ SELECT "q_payloads".topic
+ FROM "q_payloads"
+ JOIN "q_messages" ON "q_payloads".id =
+ "q_messages".payload_id
+ WHERE "q_messages".id = NEW.message_id
+ ) AND consumer = NEW.consumer
+ )
+ BEGIN
+ SELECT RAISE(
+ ABORT,
+ 'instance does not own topic/consumer combo'
+ );
+ END;
+
+ CREATE TRIGGER IF NOT EXISTS "q_check_can_publish_deadletter"
+ BEFORE INSERT ON "q_deadletters"
+ WHEN NEW.instance_id != (
+ SELECT owner_id FROM "q_owners"
+ WHERE topic = (
+ SELECT "q_payloads".topic
+ FROM "q_payloads"
+ JOIN "q_messages" ON "q_payloads".id =
+ "q_messages".payload_id
+ WHERE "q_messages".id = NEW.message_id
+ ) AND consumer = NEW.consumer
+ )
+ BEGIN
+ SELECT RAISE(
+ ABORT,
+ 'Instance does not own topic/consumer combo'
+ );
+ END;
-- read:
@@ -81,7 +121,6 @@
INSERT INTO "q_payloads" (topic, payload)
VALUES (?, ?);
- -- FIXME: must be inside a trnsaction
INSERT INTO "q_messages" (uuid, flow_id, payload_id)
VALUES (?, ?, last_insert_rowid());
@@ -114,6 +153,38 @@
-- owner:
+-- next.sql:
+-- write:
+
+-- read:
+ SELECT
+ (
+ SELECT owner_id FROM "q_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?
+ LIMIT 1
+ ) AS owner_id,
+ "q_messages".id,
+ "q_messages".timestamp,
+ "q_messages".uuid,
+ "q_messages".flow_id,
+ "q_payloads".payload
+ FROM "q_messages"
+ JOIN "q_payloads" ON
+ "q_payloads".id = "q_messages".payload_id
+ WHERE
+ "q_payloads".topic = ? AND
+ "q_messages".id NOT IN (
+ SELECT message_id FROM "q_offsets"
+ WHERE consumer = ?
+ )
+ ORDER BY "q_messages".id ASC
+ LIMIT 1;
+
+
+-- owner:
+
-- pending.sql:
-- write:
@@ -146,46 +217,28 @@
-- commit.sql:
-- write:
- INSERT INTO "q_offsets" (consumer, message_id)
- VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+ INSERT INTO "q_offsets" (consumer, message_id, instance_id)
+ VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?);
-- read:
- SELECT "q_payloads".topic from "q_payloads"
- JOIN "q_messages" ON
- "q_payloads".id = "q_messages".payload_id
- WHERE "q_messages".uuid = ?;
-
-- owner:
- SELECT owner_id FROM "q_owners"
- WHERE
- topic = ? AND
- consumer = ?;
-
-- toDead.sql:
-- write:
- INSERT INTO "q_offsets" ( consumer, message_id)
- VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+ INSERT INTO "q_offsets"
+ ( consumer, message_id, instance_id)
+ VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?);
- INSERT INTO "q_deadletters" (uuid, consumer, message_id)
- VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+ INSERT INTO "q_deadletters"
+ (uuid, consumer, message_id, instance_id)
+ VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?);
-- read:
- SELECT "q_payloads".topic FROM "q_payloads"
- JOIN "q_messages" ON
- "q_payloads".id = "q_messages".payload_id
- WHERE "q_messages".uuid = ?;
-
-- owner:
- SELECT owner_id FROM "q_owners"
- WHERE
- topic = ? AND
- consumer = ?;
-
-- replay.sql:
-- write: