diff options
| author | EuAndreh <eu@euandre.org> | 2024-10-27 07:40:51 -0300 |
|---|---|---|
| committer | EuAndreh <eu@euandre.org> | 2024-10-27 07:40:51 -0300 |
| commit | f637c04d24bbb3a1f6bbd64180de55fa1aee44e2 (patch) | |
| tree | 5c578ddfdd5af4cfc9df5823a9d1faa93f207e35 | |
| parent | Makefile: Simplify output generation of "bench" target dependencies (diff) | |
| download | fiinha-f637c04d24bbb3a1f6bbd64180de55fa1aee44e2.tar.gz fiinha-f637c04d24bbb3a1f6bbd64180de55fa1aee44e2.tar.xz | |
src/q.go: New() - manage *sql.DB handle internally
| -rw-r--r-- | src/q.go | 32 | ||||
| -rw-r--r-- | tests/functional/consumer-with-deadletter/q.go | 8 | ||||
| -rw-r--r-- | tests/functional/new-instance-takeover/q.go | 21 | ||||
| -rw-r--r-- | tests/functional/wait-after-publish/q.go | 8 | ||||
| -rw-r--r-- | tests/q.go | 31 |
5 files changed, 39 insertions, 61 deletions
@@ -126,6 +126,7 @@ type subscriptionsT struct { } type queueT struct{ + db *sql.DB queries queriesT subscriptions subscriptionsT pinger pingerT[struct{}] @@ -174,6 +175,9 @@ func tryRollback(db *sql.DB, ctx context.Context, err error) error { return err } +// FIXME +// See: +// https://sqlite.org/forum/forumpost/2507664507 func inTx(db *sql.DB, fn func(context.Context) error) error { ctx := context.Background() @@ -1821,12 +1825,17 @@ func runReaper( })) } -func NewWithPrefix(db *sql.DB, prefix string) (IQueue, error) { +func NewWithPrefix(databasePath string, prefix string) (IQueue, error) { err := g.ValidateSQLTablePrefix(prefix) if err != nil { return queueT{}, err } + db, err := sql.Open(golite.DriverName, databasePath) + if err != nil { + return queueT{}, err + } + subscriptions := makeSubscriptionsFuncs() pinger := newPinger[struct{}]() notifyFn := makeNotifyFn(subscriptions.read, pinger) @@ -1838,14 +1847,15 @@ func NewWithPrefix(db *sql.DB, prefix string) (IQueue, error) { go runReaper(pinger.onPing, subscriptions.read, subscriptions.write) return queueT{ + db: db, queries: queries, subscriptions: subscriptions, pinger: pinger, }, nil } -func New(db *sql.DB) (IQueue, error) { - return NewWithPrefix(db, defaultPrefix) +func New(databasePath string) (IQueue, error) { + return NewWithPrefix(databasePath, defaultPrefix) } func asPublicMessage(message messageT) Message { @@ -1860,7 +1870,11 @@ func asPublicMessage(message messageT) Message { func (queue queueT) Publish(unsent UnsentMessage) (Message, error) { message, err := queue.queries.publish(unsent, guuid.New()) - return asPublicMessage(message), err + if err != nil { + return Message{}, err + } + + return asPublicMessage(message), nil } func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error { @@ -2106,6 +2120,7 @@ func cleanSubscriptions(set subscriptionsSetM) error { func (queue queueT) Close() error { queue.pinger.close() return g.WrapErrors( + queue.db.Close(), queue.subscriptions.write(cleanSubscriptions), queue.queries.close(), ) @@ -2413,14 +2428,7 @@ func runCommand( stdout io.Writer, stderr io.Writer, ) int { - db, err := sql.Open(golite.DriverName, args.databasePath) - if err != nil { - fmt.Fprintln(stderr, err) - return 1 - } - defer db.Close() - - iqueue, err := NewWithPrefix(db, args.prefix) + iqueue, err := NewWithPrefix(args.databasePath, args.prefix) if err != nil { fmt.Fprintln(stderr, err) return 1 diff --git a/tests/functional/consumer-with-deadletter/q.go b/tests/functional/consumer-with-deadletter/q.go index 44bc90b..25391c5 100644 --- a/tests/functional/consumer-with-deadletter/q.go +++ b/tests/functional/consumer-with-deadletter/q.go @@ -1,12 +1,10 @@ package q import ( - "database/sql" "errors" "os" "runtime" - "golite" "guuid" g "gobang" ) @@ -50,11 +48,7 @@ func MainTest() { os.Remove(databasePath + "-shm") os.Remove(databasePath + "-wal") - db, err := sql.Open(golite.DriverName, databasePath) - g.TErrorIf(err) - defer db.Close() - - queue, err := New(db) + queue, err := New(databasePath) g.TErrorIf(err) defer queue.Close() diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go index 10c4744..6e04e5f 100644 --- a/tests/functional/new-instance-takeover/q.go +++ b/tests/functional/new-instance-takeover/q.go @@ -1,12 +1,10 @@ package q import ( -"fmt" - "database/sql" + "fmt" "runtime" "os" - "golite" "guuid" g "gobang" ) @@ -38,16 +36,13 @@ func startInstance( databasePath string, instanceID int, name string, -) (*sql.DB, IQueue, error) { - db, err := sql.Open(golite.DriverName, databasePath) - g.TErrorIf(err) - - iqueue, err := New(db) +) (IQueue, error) { + iqueue, err := New(databasePath) g.TErrorIf(err) queue := iqueue.(queueT) notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger) - queries, err := initDB(db, defaultPrefix, notifyFn, instanceID) + queries, err := initDB(queue.db, defaultPrefix, notifyFn, instanceID) g.TErrorIf(err) err = queue.queries.close() @@ -67,7 +62,7 @@ func startInstance( queue.Subscribe(topic, individual, handlerFn(pub_(individual))) queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name))) - return db, queue, nil + return queue, nil } @@ -98,9 +93,8 @@ func MainTest() { fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1) } - db, q1, err := startInstance(dbpath, instanceID1, "first") + q1, err := startInstance(dbpath, instanceID1, "first") g.TErrorIf(err) - defer db.Close() defer q1.Close() pub(q1, topic, guuid.New()) @@ -111,9 +105,8 @@ func MainTest() { <- q1.WaitFor( "shared-first", flowID1, "w").Channel // println("waited 1") - db, q2, err := startInstance(dbpath, instanceID2, "second") + q2, err := startInstance(dbpath, instanceID2, "second") g.TErrorIf(err) - defer db.Close() defer q2.Close() <- q2.WaitFor("individual-second", flowID1, "w").Channel diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go index d3426ae..15a532d 100644 --- a/tests/functional/wait-after-publish/q.go +++ b/tests/functional/wait-after-publish/q.go @@ -1,11 +1,9 @@ package q import ( - "database/sql" "os" "runtime" - "golite" "guuid" g "gobang" ) @@ -25,11 +23,7 @@ func MainTest() { os.Remove(databasePath + "-shm") os.Remove(databasePath + "-wal") - db, err := sql.Open(golite.DriverName, databasePath) - g.TErrorIf(err) - defer db.Close() - - queue, err := New(db) + queue, err := New(databasePath) g.TErrorIf(err) defer queue.Close() @@ -3231,20 +3231,12 @@ func test_NewWithPrefix() { g.TestStart("NewWithPrefix()") g.Testing("we get an error with a bad prefix", func() { - db, err := sql.Open(golite.DriverName, ":memory:") - g.TErrorIf(err) - defer db.Close() - - _, err = NewWithPrefix(db, "a bad prefix") + _, err := NewWithPrefix(":memory:", "a bad prefix") g.TAssertEqual(err, g.ErrBadSQLTablePrefix) }) g.Testing("otherwise we have a queueT and no errors", func() { - db, err := sql.Open(golite.DriverName, ":memory:") - g.TErrorIf(err) - defer db.Close() - - queue, err := NewWithPrefix(db, "good") + queue, err := NewWithPrefix(":memory:", "good") g.TErrorIf(err) queue.Close() @@ -3256,11 +3248,7 @@ func test_New() { g.TestStart("New()") g.Testing("smoke test that we get a queueT", func() { - db, err := sql.Open(golite.DriverName, ":memory:") - g.TErrorIf(err) - defer db.Close() - - queue, err := New(db) + queue, err := New(":memory:") g.TErrorIf(err) queue.Close() @@ -3310,11 +3298,7 @@ func test_queueT_Publish() { } ) - db, err := sql.Open(golite.DriverName, ":memory:") - g.TErrorIf(err) - defer db.Close() - - queue, err := New(db) + queue, err := New(":memory:") g.TErrorIf(err) defer queue.Close() @@ -4365,7 +4349,12 @@ func test_queueT_Close() { subscriptionsErr = errors.New("subscriptionsT{} error") queriesErr = errors.New("queriesT{} error") ) + + db, err := sql.Open(golite.DriverName, ":memory:") + g.TErrorIf(err) + queue := queueT{ + db: db, queries: queriesT{ close: func() error{ queriesCount++ @@ -4387,7 +4376,7 @@ func test_queueT_Close() { }, } - err := queue.Close() + err = queue.Close() g.TAssertEqual(err, g.WrapErrors(subscriptionsErr, queriesErr)) g.TAssertEqual(pingerCount, 1) g.TAssertEqual(subscriptionsCount, 1) |
