summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2024-10-20 20:42:10 -0300
committerEuAndreh <eu@euandre.org>2024-10-20 20:42:10 -0300
commit7c8ccdaebd767b311e583ca7f92356e561138457 (patch)
treecc8a51b2a6b45837dc0952a7059d429206c2f127
parentWIP: Address new-instance-takeover failure (diff)
downloadfiinha-7c8ccdaebd767b311e583ca7f92356e561138457.tar.gz
fiinha-7c8ccdaebd767b311e583ca7f92356e561138457.tar.xz
Address renaming of acudego -> golite
-rw-r--r--src/q.go30
-rw-r--r--tests/functional/consumer-with-deadletter/q.go6
-rw-r--r--tests/functional/new-instance-takeover/q.go5
-rw-r--r--tests/functional/wait-after-publish/q.go5
-rw-r--r--tests/q.go103
-rw-r--r--tests/queries.sql79
6 files changed, 117 insertions, 111 deletions
diff --git a/src/q.go b/src/q.go
index 6eeefe6..de06383 100644
--- a/src/q.go
+++ b/src/q.go
@@ -10,7 +10,7 @@ import (
"sync"
"time"
- _ "acudego"
+ "golite"
"guuid"
g "gobang"
)
@@ -27,11 +27,6 @@ const (
-type dbI interface{
- findOne(q string, args []any, bindings []any) error
- exec(q string)
-}
-
type queryT struct{
write string
read string
@@ -338,6 +333,7 @@ func publishSQL(prefix string) queryT {
INSERT INTO "%s_payloads" (topic, payload)
VALUES (?, ?);
+ -- FIXME: must be inside a trnsaction
INSERT INTO "%s_messages" (uuid, flow_id, payload_id)
VALUES (?, ?, last_insert_rowid());
`
@@ -376,13 +372,17 @@ func publishStmt(
message_id_bytes := messageID[:]
flow_id_bytes := unsentMessage.FlowID[:]
- _, err := db.Exec(
- q.write,
- unsentMessage.Topic,
- unsentMessage.Payload,
- message_id_bytes,
- flow_id_bytes,
- )
+ err := inTx(db, func(ctx context.Context) error {
+ _, err := db.ExecContext(
+ ctx,
+ q.write,
+ unsentMessage.Topic,
+ unsentMessage.Payload,
+ message_id_bytes,
+ flow_id_bytes,
+ )
+ return err
+ })
if err != nil {
return messageT{}, err
}
@@ -700,7 +700,7 @@ func pendingStmt(
ownerStmt, err := db.Prepare(q.owner)
if err != nil {
- return nil, nil, err
+ return nil, nil, g.WrapErrors(readStmt.Close(), err)
}
fn := func(topic string, consumer string) (*sql.Rows, error) {
@@ -2413,7 +2413,7 @@ func runCommand(
stdout io.Writer,
stderr io.Writer,
) int {
- db, err := sql.Open("acude", args.databasePath)
+ db, err := sql.Open(golite.DriverName, args.databasePath)
if err != nil {
fmt.Fprintln(stderr, err)
return 1
diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go
index e1462d7..44bc90b 100644
--- a/tests/functional/consumer-with-deadletter/q.go
+++ b/tests/functional/consumer-with-deadletter/q.go
@@ -6,9 +6,9 @@ import (
"os"
"runtime"
- _ "acudego"
- g "gobang"
+ "golite"
"guuid"
+ g "gobang"
)
@@ -50,7 +50,7 @@ func MainTest() {
os.Remove(databasePath + "-shm")
os.Remove(databasePath + "-wal")
- db, err := sql.Open("acude", databasePath)
+ db, err := sql.Open(golite.DriverName, databasePath)
g.TErrorIf(err)
defer db.Close()
diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go
index 350d583..10c4744 100644
--- a/tests/functional/new-instance-takeover/q.go
+++ b/tests/functional/new-instance-takeover/q.go
@@ -6,8 +6,9 @@ import (
"runtime"
"os"
- g "gobang"
+ "golite"
"guuid"
+ g "gobang"
)
@@ -38,7 +39,7 @@ func startInstance(
instanceID int,
name string,
) (*sql.DB, IQueue, error) {
- db, err := sql.Open("acude", databasePath)
+ db, err := sql.Open(golite.DriverName, databasePath)
g.TErrorIf(err)
iqueue, err := New(db)
diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go
index 701258a..d3426ae 100644
--- a/tests/functional/wait-after-publish/q.go
+++ b/tests/functional/wait-after-publish/q.go
@@ -5,8 +5,9 @@ import (
"os"
"runtime"
- g "gobang"
+ "golite"
"guuid"
+ g "gobang"
)
@@ -24,7 +25,7 @@ func MainTest() {
os.Remove(databasePath + "-shm")
os.Remove(databasePath + "-wal")
- db, err := sql.Open("acude", databasePath)
+ db, err := sql.Open(golite.DriverName, databasePath)
g.TErrorIf(err)
defer db.Close()
diff --git a/tests/q.go b/tests/q.go
index 6b9e422..3657a69 100644
--- a/tests/q.go
+++ b/tests/q.go
@@ -14,7 +14,7 @@ import (
"sync"
"time"
- "acudego"
+ "golite"
"guuid"
g "gobang"
)
@@ -29,12 +29,16 @@ func test_defaultPrefix() {
})
}
+func test_tryRollback() {
+ // FIXME
+}
+
func test_inTx() {
/*
// FIXME
g.TestStart("inTx()")
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
defer db.Close()
@@ -59,24 +63,24 @@ func test_inTx() {
func test_createTables() {
g.TestStart("createTables()")
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
defer db.Close()
g.Testing("tables exist afterwards", func() {
- tmpl := `
+ const tmpl_read = `
SELECT id FROM "%s_messages" LIMIT 1;
`
- q := fmt.Sprintf(tmpl, defaultPrefix)
+ qRead := fmt.Sprintf(tmpl_read, defaultPrefix)
- _, err := db.Exec(q)
+ _, err := db.Exec(qRead)
g.TErrorNil(err)
err = createTables(db, defaultPrefix)
g.TErrorIf(err)
- _, err = db.Exec(q)
+ _, err = db.Exec(qRead)
g.TErrorIf(err)
})
@@ -98,7 +102,7 @@ func test_takeStmt() {
prefix = defaultPrefix
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -178,15 +182,13 @@ func test_publishStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
instanceID := os.Getpid()
publish, publishClose, publishErr := publishStmt(db, prefix, instanceID)
- g.TErrorIf(g.SomeError(
- publishErr,
- ))
+ g.TErrorIf(publishErr)
defer g.SomeFnError(
publishClose,
db.Close,
@@ -233,8 +235,8 @@ func test_publishStmt() {
g.TAssertEqual(message1.payload, payload)
g.TAssertEqual(
- err2.(acudego.Error).ExtendedCode,
- acudego.ErrConstraintUnique,
+ err2.(golite.Error).ExtendedCode,
+ golite.ErrConstraintUnique,
)
})
@@ -265,7 +267,7 @@ func test_findStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -364,7 +366,7 @@ func test_nextStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -488,7 +490,7 @@ func test_messageEach() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -650,7 +652,7 @@ func test_pendingStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -881,7 +883,7 @@ func test_commitStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -928,8 +930,8 @@ func test_commitStmt() {
err2 := cmt(consumer, messageID)
g.TErrorIf(err1)
g.TAssertEqual(
- err2.(acudego.Error).ExtendedCode,
- acudego.ErrConstraintUnique,
+ err2.(golite.Error).ExtendedCode,
+ golite.ErrConstraintUnique,
)
})
@@ -979,8 +981,8 @@ func test_commitStmt() {
err2 := cmt(consumer, messageID)
g.TErrorIf(err1)
g.TAssertEqual(
- err2.(acudego.Error).ExtendedCode,
- acudego.ErrConstraintUnique,
+ err2.(golite.Error).ExtendedCode,
+ golite.ErrConstraintUnique,
)
})
@@ -1033,7 +1035,7 @@ func test_toDeadStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -1083,8 +1085,8 @@ func test_toDeadStmt() {
err2 := asDead(consumer, messageID, guuid.New())
g.TErrorIf(err1)
g.TAssertEqual(
- err2.(acudego.Error).ExtendedCode,
- acudego.ErrConstraintUnique,
+ err2.(golite.Error).ExtendedCode,
+ golite.ErrConstraintUnique,
)
})
@@ -1097,8 +1099,8 @@ func test_toDeadStmt() {
err2 := asDead(consumer, messageID2, deadletterID)
g.TErrorIf(err1)
g.TAssertEqual(
- err2.(acudego.Error).ExtendedCode,
- acudego.ErrConstraintUnique,
+ err2.(golite.Error).ExtendedCode,
+ golite.ErrConstraintUnique,
)
})
@@ -1165,8 +1167,8 @@ func test_toDeadStmt() {
err2 := asDead(consumer, messageID, guuid.New())
g.TErrorIf(err1)
g.TAssertEqual(
- err2.(acudego.Error).ExtendedCode,
- acudego.ErrConstraintUnique,
+ err2.(golite.Error).ExtendedCode,
+ golite.ErrConstraintUnique,
)
})
@@ -1223,7 +1225,7 @@ func test_replayStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -1291,16 +1293,16 @@ func test_replayStmt() {
_, err2 := replay(deadletterID, guuid.New())
g.TErrorIf(err1)
g.TAssertEqual(
- err2.(acudego.Error).ExtendedCode,
- acudego.ErrConstraintUnique,
+ err2.(golite.Error).ExtendedCode,
+ golite.ErrConstraintUnique,
)
})
g.Testing("we cant replay non-existent messages", func() {
_, err := replay(guuid.New(), guuid.New())
g.TAssertEqual(
- err.(acudego.Error).ExtendedCode,
- acudego.ErrConstraintNotNull,
+ err.(golite.Error).ExtendedCode,
+ golite.ErrConstraintNotNull,
)
})
@@ -1353,7 +1355,7 @@ func test_oneDeadStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -1467,7 +1469,7 @@ func test_deadletterEach() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -1641,7 +1643,7 @@ func test_allDeadStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -1794,7 +1796,7 @@ func test_sizeStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -1902,7 +1904,7 @@ func test_countStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -2030,7 +2032,7 @@ func test_hasDataStmt() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
@@ -2157,7 +2159,7 @@ func test_initDB() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
defer db.Close()
@@ -2254,7 +2256,7 @@ func test_initDB() {
func test_queriesTclose() {
g.TestStart("queriesT.close()")
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
defer db.Close()
@@ -3229,7 +3231,7 @@ func test_NewWithPrefix() {
g.TestStart("NewWithPrefix()")
g.Testing("we get an error with a bad prefix", func() {
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
defer db.Close()
@@ -3238,7 +3240,7 @@ func test_NewWithPrefix() {
})
g.Testing("otherwise we have a queueT and no errors", func() {
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
defer db.Close()
@@ -3254,7 +3256,7 @@ func test_New() {
g.TestStart("New()")
g.Testing("smoke test that we get a queueT", func() {
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
defer db.Close()
@@ -3308,7 +3310,7 @@ func test_queueT_Publish() {
}
)
- db, err := sql.Open("acude", ":memory:")
+ db, err := sql.Open(golite.DriverName, ":memory:")
g.TErrorIf(err)
defer db.Close()
@@ -5696,9 +5698,9 @@ func dumpQueries() {
for _, query := range queries {
q := query.fn(defaultPrefix)
fmt.Printf("\n-- %s.sql:", query.name)
- fmt.Printf("\n-- write: %s\n", q.write)
- fmt.Printf("\n-- read: %s\n", q.read)
- fmt.Printf("\n-- owner: %s\n", q.owner)
+ fmt.Printf("\n-- write:%s\n", q.write)
+ fmt.Printf("\n-- read:%s\n", q.read)
+ fmt.Printf("\n-- owner:%s\n", q.owner)
}
}
@@ -5712,6 +5714,7 @@ func MainTest() {
g.Init()
test_defaultPrefix()
+ test_tryRollback()
test_inTx()
test_createTables()
test_takeStmt()
diff --git a/tests/queries.sql b/tests/queries.sql
index 2515778..c821e25 100644
--- a/tests/queries.sql
+++ b/tests/queries.sql
@@ -1,6 +1,6 @@
-- createTables.sql:
--- write:
+-- write:
CREATE TABLE IF NOT EXISTS "q_payloads" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
@@ -60,42 +60,43 @@
) STRICT;
--- read:
+-- read:
--- owner:
+-- owner:
-- take.sql:
--- write:
+-- write:
INSERT INTO "q_owners" (topic, consumer, owner_id)
VALUES (?, ?, ?)
ON CONFLICT (topic, consumer) DO
UPDATE SET owner_id=excluded.owner_id;
--- read:
+-- read:
--- owner:
+-- owner:
-- publish.sql:
--- write:
+-- write:
INSERT INTO "q_payloads" (topic, payload)
VALUES (?, ?);
+ -- FIXME: must be inside a trnsaction
INSERT INTO "q_messages" (uuid, flow_id, payload_id)
VALUES (?, ?, last_insert_rowid());
--- read:
+-- read:
SELECT id, timestamp FROM "q_messages"
WHERE uuid = ?;
--- owner:
+-- owner:
-- find.sql:
--- write:
+-- write:
--- read:
+-- read:
SELECT
"q_messages".id,
"q_messages".timestamp,
@@ -111,12 +112,12 @@
LIMIT 1;
--- owner:
+-- owner:
-- pending.sql:
--- write:
+-- write:
--- read:
+-- read:
SELECT
"q_messages".id,
"q_messages".timestamp,
@@ -136,7 +137,7 @@
ORDER BY "q_messages".id ASC;
--- owner:
+-- owner:
SELECT owner_id FROM "q_owners"
WHERE
topic = ? AND
@@ -144,19 +145,19 @@
-- commit.sql:
--- write:
+-- write:
INSERT INTO "q_offsets" (consumer, message_id)
VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?));
--- read:
+-- read:
SELECT "q_payloads".topic from "q_payloads"
JOIN "q_messages" ON
"q_payloads".id = "q_messages".payload_id
WHERE "q_messages".uuid = ?;
--- owner:
+-- owner:
SELECT owner_id FROM "q_owners"
WHERE
topic = ? AND
@@ -164,7 +165,7 @@
-- toDead.sql:
--- write:
+-- write:
INSERT INTO "q_offsets" ( consumer, message_id)
VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
@@ -172,14 +173,14 @@
VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
--- read:
+-- read:
SELECT "q_payloads".topic FROM "q_payloads"
JOIN "q_messages" ON
"q_payloads".id = "q_messages".payload_id
WHERE "q_messages".uuid = ?;
--- owner:
+-- owner:
SELECT owner_id FROM "q_owners"
WHERE
topic = ? AND
@@ -187,7 +188,7 @@
-- replay.sql:
--- write:
+-- write:
INSERT INTO "q_messages" (uuid, flow_id, payload_id)
SELECT
?,
@@ -205,7 +206,7 @@
);
--- read:
+-- read:
SELECT
"q_messages".id,
"q_messages".timestamp,
@@ -218,12 +219,12 @@
WHERE "q_messages".uuid = ?;
--- owner:
+-- owner:
-- oneDead.sql:
--- write:
+-- write:
--- read:
+-- read:
SELECT
"q_deadletters".uuid,
"q_offsets".timestamp,
@@ -246,12 +247,12 @@
LIMIT 1;
--- owner:
+-- owner:
-- allDead.sql:
--- write:
+-- write:
--- read:
+-- read:
SELECT
"q_deadletters".uuid,
"q_deadletters".message_id,
@@ -279,12 +280,12 @@
ORDER BY "q_deadletters".id ASC;
--- owner:
+-- owner:
-- size.sql:
--- write:
+-- write:
--- read:
+-- read:
SELECT
COUNT(1) as size
FROM "q_messages"
@@ -293,12 +294,12 @@
WHERE "q_payloads".topic = ?;
--- owner:
+-- owner:
-- count.sql:
--- write:
+-- write:
--- read:
+-- read:
SELECT
COUNT(1) as count
FROM "q_messages"
@@ -311,12 +312,12 @@
"q_offsets".consumer = ?;
--- owner:
+-- owner:
-- hasData.sql:
--- write:
+-- write:
--- read:
+-- read:
SELECT 1 as data
FROM "q_messages"
JOIN "q_payloads" ON
@@ -330,4 +331,4 @@
LIMIT 1;
--- owner:
+-- owner: