diff options
author | EuAndreh <eu@euandre.org> | 2024-10-20 20:42:10 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2024-10-20 20:42:10 -0300 |
commit | 7c8ccdaebd767b311e583ca7f92356e561138457 (patch) | |
tree | cc8a51b2a6b45837dc0952a7059d429206c2f127 | |
parent | WIP: Address new-instance-takeover failure (diff) | |
download | fiinha-7c8ccdaebd767b311e583ca7f92356e561138457.tar.gz fiinha-7c8ccdaebd767b311e583ca7f92356e561138457.tar.xz |
Address renaming of acudego -> golite
-rw-r--r-- | src/q.go | 30 | ||||
-rw-r--r-- | tests/functional/consumer-with-deadletter/q.go | 6 | ||||
-rw-r--r-- | tests/functional/new-instance-takeover/q.go | 5 | ||||
-rw-r--r-- | tests/functional/wait-after-publish/q.go | 5 | ||||
-rw-r--r-- | tests/q.go | 103 | ||||
-rw-r--r-- | tests/queries.sql | 79 |
6 files changed, 117 insertions, 111 deletions
@@ -10,7 +10,7 @@ import ( "sync" "time" - _ "acudego" + "golite" "guuid" g "gobang" ) @@ -27,11 +27,6 @@ const ( -type dbI interface{ - findOne(q string, args []any, bindings []any) error - exec(q string) -} - type queryT struct{ write string read string @@ -338,6 +333,7 @@ func publishSQL(prefix string) queryT { INSERT INTO "%s_payloads" (topic, payload) VALUES (?, ?); + -- FIXME: must be inside a trnsaction INSERT INTO "%s_messages" (uuid, flow_id, payload_id) VALUES (?, ?, last_insert_rowid()); ` @@ -376,13 +372,17 @@ func publishStmt( message_id_bytes := messageID[:] flow_id_bytes := unsentMessage.FlowID[:] - _, err := db.Exec( - q.write, - unsentMessage.Topic, - unsentMessage.Payload, - message_id_bytes, - flow_id_bytes, - ) + err := inTx(db, func(ctx context.Context) error { + _, err := db.ExecContext( + ctx, + q.write, + unsentMessage.Topic, + unsentMessage.Payload, + message_id_bytes, + flow_id_bytes, + ) + return err + }) if err != nil { return messageT{}, err } @@ -700,7 +700,7 @@ func pendingStmt( ownerStmt, err := db.Prepare(q.owner) if err != nil { - return nil, nil, err + return nil, nil, g.WrapErrors(readStmt.Close(), err) } fn := func(topic string, consumer string) (*sql.Rows, error) { @@ -2413,7 +2413,7 @@ func runCommand( stdout io.Writer, stderr io.Writer, ) int { - db, err := sql.Open("acude", args.databasePath) + db, err := sql.Open(golite.DriverName, args.databasePath) if err != nil { fmt.Fprintln(stderr, err) return 1 diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go index e1462d7..44bc90b 100644 --- a/tests/functional/consumer-with-deadletter/q.go +++ b/tests/functional/consumer-with-deadletter/q.go @@ -6,9 +6,9 @@ import ( "os" "runtime" - _ "acudego" - g "gobang" + "golite" "guuid" + g "gobang" ) @@ -50,7 +50,7 @@ func MainTest() { os.Remove(databasePath + "-shm") os.Remove(databasePath + "-wal") - db, err := sql.Open("acude", databasePath) + db, err := sql.Open(golite.DriverName, databasePath) g.TErrorIf(err) defer db.Close() diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go index 350d583..10c4744 100644 --- a/tests/functional/new-instance-takeover/q.go +++ b/tests/functional/new-instance-takeover/q.go @@ -6,8 +6,9 @@ import ( "runtime" "os" - g "gobang" + "golite" "guuid" + g "gobang" ) @@ -38,7 +39,7 @@ func startInstance( instanceID int, name string, ) (*sql.DB, IQueue, error) { - db, err := sql.Open("acude", databasePath) + db, err := sql.Open(golite.DriverName, databasePath) g.TErrorIf(err) iqueue, err := New(db) diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go index 701258a..d3426ae 100644 --- a/tests/functional/wait-after-publish/q.go +++ b/tests/functional/wait-after-publish/q.go @@ -5,8 +5,9 @@ import ( "os" "runtime" - g "gobang" + "golite" "guuid" + g "gobang" ) @@ -24,7 +25,7 @@ func MainTest() { os.Remove(databasePath + "-shm") os.Remove(databasePath + "-wal") - db, err := sql.Open("acude", databasePath) + db, err := sql.Open(golite.DriverName, databasePath) g.TErrorIf(err) defer db.Close() @@ -14,7 +14,7 @@ import ( "sync" "time" - "acudego" + "golite" "guuid" g "gobang" ) @@ -29,12 +29,16 @@ func test_defaultPrefix() { }) } +func test_tryRollback() { + // FIXME +} + func test_inTx() { /* // FIXME g.TestStart("inTx()") - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) defer db.Close() @@ -59,24 +63,24 @@ func test_inTx() { func test_createTables() { g.TestStart("createTables()") - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) defer db.Close() g.Testing("tables exist afterwards", func() { - tmpl := ` + const tmpl_read = ` SELECT id FROM "%s_messages" LIMIT 1; ` - q := fmt.Sprintf(tmpl, defaultPrefix) + qRead := fmt.Sprintf(tmpl_read, defaultPrefix) - _, err := db.Exec(q) + _, err := db.Exec(qRead) g.TErrorNil(err) err = createTables(db, defaultPrefix) g.TErrorIf(err) - _, err = db.Exec(q) + _, err = db.Exec(qRead) g.TErrorIf(err) }) @@ -98,7 +102,7 @@ func test_takeStmt() { prefix = defaultPrefix ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -178,15 +182,13 @@ func test_publishStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) instanceID := os.Getpid() publish, publishClose, publishErr := publishStmt(db, prefix, instanceID) - g.TErrorIf(g.SomeError( - publishErr, - )) + g.TErrorIf(publishErr) defer g.SomeFnError( publishClose, db.Close, @@ -233,8 +235,8 @@ func test_publishStmt() { g.TAssertEqual(message1.payload, payload) g.TAssertEqual( - err2.(acudego.Error).ExtendedCode, - acudego.ErrConstraintUnique, + err2.(golite.Error).ExtendedCode, + golite.ErrConstraintUnique, ) }) @@ -265,7 +267,7 @@ func test_findStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -364,7 +366,7 @@ func test_nextStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -488,7 +490,7 @@ func test_messageEach() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -650,7 +652,7 @@ func test_pendingStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -881,7 +883,7 @@ func test_commitStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -928,8 +930,8 @@ func test_commitStmt() { err2 := cmt(consumer, messageID) g.TErrorIf(err1) g.TAssertEqual( - err2.(acudego.Error).ExtendedCode, - acudego.ErrConstraintUnique, + err2.(golite.Error).ExtendedCode, + golite.ErrConstraintUnique, ) }) @@ -979,8 +981,8 @@ func test_commitStmt() { err2 := cmt(consumer, messageID) g.TErrorIf(err1) g.TAssertEqual( - err2.(acudego.Error).ExtendedCode, - acudego.ErrConstraintUnique, + err2.(golite.Error).ExtendedCode, + golite.ErrConstraintUnique, ) }) @@ -1033,7 +1035,7 @@ func test_toDeadStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -1083,8 +1085,8 @@ func test_toDeadStmt() { err2 := asDead(consumer, messageID, guuid.New()) g.TErrorIf(err1) g.TAssertEqual( - err2.(acudego.Error).ExtendedCode, - acudego.ErrConstraintUnique, + err2.(golite.Error).ExtendedCode, + golite.ErrConstraintUnique, ) }) @@ -1097,8 +1099,8 @@ func test_toDeadStmt() { err2 := asDead(consumer, messageID2, deadletterID) g.TErrorIf(err1) g.TAssertEqual( - err2.(acudego.Error).ExtendedCode, - acudego.ErrConstraintUnique, + err2.(golite.Error).ExtendedCode, + golite.ErrConstraintUnique, ) }) @@ -1165,8 +1167,8 @@ func test_toDeadStmt() { err2 := asDead(consumer, messageID, guuid.New()) g.TErrorIf(err1) g.TAssertEqual( - err2.(acudego.Error).ExtendedCode, - acudego.ErrConstraintUnique, + err2.(golite.Error).ExtendedCode, + golite.ErrConstraintUnique, ) }) @@ -1223,7 +1225,7 @@ func test_replayStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -1291,16 +1293,16 @@ func test_replayStmt() { _, err2 := replay(deadletterID, guuid.New()) g.TErrorIf(err1) g.TAssertEqual( - err2.(acudego.Error).ExtendedCode, - acudego.ErrConstraintUnique, + err2.(golite.Error).ExtendedCode, + golite.ErrConstraintUnique, ) }) g.Testing("we cant replay non-existent messages", func() { _, err := replay(guuid.New(), guuid.New()) g.TAssertEqual( - err.(acudego.Error).ExtendedCode, - acudego.ErrConstraintNotNull, + err.(golite.Error).ExtendedCode, + golite.ErrConstraintNotNull, ) }) @@ -1353,7 +1355,7 @@ func test_oneDeadStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -1467,7 +1469,7 @@ func test_deadletterEach() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -1641,7 +1643,7 @@ func test_allDeadStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -1794,7 +1796,7 @@ func test_sizeStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -1902,7 +1904,7 @@ func test_countStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -2030,7 +2032,7 @@ func test_hasDataStmt() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) g.TErrorIf(createTables(db, prefix)) @@ -2157,7 +2159,7 @@ func test_initDB() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) defer db.Close() @@ -2254,7 +2256,7 @@ func test_initDB() { func test_queriesTclose() { g.TestStart("queriesT.close()") - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) defer db.Close() @@ -3229,7 +3231,7 @@ func test_NewWithPrefix() { g.TestStart("NewWithPrefix()") g.Testing("we get an error with a bad prefix", func() { - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) defer db.Close() @@ -3238,7 +3240,7 @@ func test_NewWithPrefix() { }) g.Testing("otherwise we have a queueT and no errors", func() { - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) defer db.Close() @@ -3254,7 +3256,7 @@ func test_New() { g.TestStart("New()") g.Testing("smoke test that we get a queueT", func() { - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) defer db.Close() @@ -3308,7 +3310,7 @@ func test_queueT_Publish() { } ) - db, err := sql.Open("acude", ":memory:") + db, err := sql.Open(golite.DriverName, ":memory:") g.TErrorIf(err) defer db.Close() @@ -5696,9 +5698,9 @@ func dumpQueries() { for _, query := range queries { q := query.fn(defaultPrefix) fmt.Printf("\n-- %s.sql:", query.name) - fmt.Printf("\n-- write: %s\n", q.write) - fmt.Printf("\n-- read: %s\n", q.read) - fmt.Printf("\n-- owner: %s\n", q.owner) + fmt.Printf("\n-- write:%s\n", q.write) + fmt.Printf("\n-- read:%s\n", q.read) + fmt.Printf("\n-- owner:%s\n", q.owner) } } @@ -5712,6 +5714,7 @@ func MainTest() { g.Init() test_defaultPrefix() + test_tryRollback() test_inTx() test_createTables() test_takeStmt() diff --git a/tests/queries.sql b/tests/queries.sql index 2515778..c821e25 100644 --- a/tests/queries.sql +++ b/tests/queries.sql @@ -1,6 +1,6 @@ -- createTables.sql: --- write: +-- write: CREATE TABLE IF NOT EXISTS "q_payloads" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), @@ -60,42 +60,43 @@ ) STRICT; --- read: +-- read: --- owner: +-- owner: -- take.sql: --- write: +-- write: INSERT INTO "q_owners" (topic, consumer, owner_id) VALUES (?, ?, ?) ON CONFLICT (topic, consumer) DO UPDATE SET owner_id=excluded.owner_id; --- read: +-- read: --- owner: +-- owner: -- publish.sql: --- write: +-- write: INSERT INTO "q_payloads" (topic, payload) VALUES (?, ?); + -- FIXME: must be inside a trnsaction INSERT INTO "q_messages" (uuid, flow_id, payload_id) VALUES (?, ?, last_insert_rowid()); --- read: +-- read: SELECT id, timestamp FROM "q_messages" WHERE uuid = ?; --- owner: +-- owner: -- find.sql: --- write: +-- write: --- read: +-- read: SELECT "q_messages".id, "q_messages".timestamp, @@ -111,12 +112,12 @@ LIMIT 1; --- owner: +-- owner: -- pending.sql: --- write: +-- write: --- read: +-- read: SELECT "q_messages".id, "q_messages".timestamp, @@ -136,7 +137,7 @@ ORDER BY "q_messages".id ASC; --- owner: +-- owner: SELECT owner_id FROM "q_owners" WHERE topic = ? AND @@ -144,19 +145,19 @@ -- commit.sql: --- write: +-- write: INSERT INTO "q_offsets" (consumer, message_id) VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?)); --- read: +-- read: SELECT "q_payloads".topic from "q_payloads" JOIN "q_messages" ON "q_payloads".id = "q_messages".payload_id WHERE "q_messages".uuid = ?; --- owner: +-- owner: SELECT owner_id FROM "q_owners" WHERE topic = ? AND @@ -164,7 +165,7 @@ -- toDead.sql: --- write: +-- write: INSERT INTO "q_offsets" ( consumer, message_id) VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?)); @@ -172,14 +173,14 @@ VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?)); --- read: +-- read: SELECT "q_payloads".topic FROM "q_payloads" JOIN "q_messages" ON "q_payloads".id = "q_messages".payload_id WHERE "q_messages".uuid = ?; --- owner: +-- owner: SELECT owner_id FROM "q_owners" WHERE topic = ? AND @@ -187,7 +188,7 @@ -- replay.sql: --- write: +-- write: INSERT INTO "q_messages" (uuid, flow_id, payload_id) SELECT ?, @@ -205,7 +206,7 @@ ); --- read: +-- read: SELECT "q_messages".id, "q_messages".timestamp, @@ -218,12 +219,12 @@ WHERE "q_messages".uuid = ?; --- owner: +-- owner: -- oneDead.sql: --- write: +-- write: --- read: +-- read: SELECT "q_deadletters".uuid, "q_offsets".timestamp, @@ -246,12 +247,12 @@ LIMIT 1; --- owner: +-- owner: -- allDead.sql: --- write: +-- write: --- read: +-- read: SELECT "q_deadletters".uuid, "q_deadletters".message_id, @@ -279,12 +280,12 @@ ORDER BY "q_deadletters".id ASC; --- owner: +-- owner: -- size.sql: --- write: +-- write: --- read: +-- read: SELECT COUNT(1) as size FROM "q_messages" @@ -293,12 +294,12 @@ WHERE "q_payloads".topic = ?; --- owner: +-- owner: -- count.sql: --- write: +-- write: --- read: +-- read: SELECT COUNT(1) as count FROM "q_messages" @@ -311,12 +312,12 @@ "q_offsets".consumer = ?; --- owner: +-- owner: -- hasData.sql: --- write: +-- write: --- read: +-- read: SELECT 1 as data FROM "q_messages" JOIN "q_payloads" ON @@ -330,4 +331,4 @@ LIMIT 1; --- owner: +-- owner: |