diff options
Diffstat (limited to 'src/q.go')
| -rw-r--r-- | src/q.go | 32 |
1 files changed, 20 insertions, 12 deletions
@@ -126,6 +126,7 @@ type subscriptionsT struct { } type queueT struct{ + db *sql.DB queries queriesT subscriptions subscriptionsT pinger pingerT[struct{}] @@ -174,6 +175,9 @@ func tryRollback(db *sql.DB, ctx context.Context, err error) error { return err } +// FIXME +// See: +// https://sqlite.org/forum/forumpost/2507664507 func inTx(db *sql.DB, fn func(context.Context) error) error { ctx := context.Background() @@ -1821,12 +1825,17 @@ func runReaper( })) } -func NewWithPrefix(db *sql.DB, prefix string) (IQueue, error) { +func NewWithPrefix(databasePath string, prefix string) (IQueue, error) { err := g.ValidateSQLTablePrefix(prefix) if err != nil { return queueT{}, err } + db, err := sql.Open(golite.DriverName, databasePath) + if err != nil { + return queueT{}, err + } + subscriptions := makeSubscriptionsFuncs() pinger := newPinger[struct{}]() notifyFn := makeNotifyFn(subscriptions.read, pinger) @@ -1838,14 +1847,15 @@ func NewWithPrefix(db *sql.DB, prefix string) (IQueue, error) { go runReaper(pinger.onPing, subscriptions.read, subscriptions.write) return queueT{ + db: db, queries: queries, subscriptions: subscriptions, pinger: pinger, }, nil } -func New(db *sql.DB) (IQueue, error) { - return NewWithPrefix(db, defaultPrefix) +func New(databasePath string) (IQueue, error) { + return NewWithPrefix(databasePath, defaultPrefix) } func asPublicMessage(message messageT) Message { @@ -1860,7 +1870,11 @@ func asPublicMessage(message messageT) Message { func (queue queueT) Publish(unsent UnsentMessage) (Message, error) { message, err := queue.queries.publish(unsent, guuid.New()) - return asPublicMessage(message), err + if err != nil { + return Message{}, err + } + + return asPublicMessage(message), nil } func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error { @@ -2106,6 +2120,7 @@ func cleanSubscriptions(set subscriptionsSetM) error { func (queue queueT) Close() error { queue.pinger.close() return g.WrapErrors( + queue.db.Close(), queue.subscriptions.write(cleanSubscriptions), queue.queries.close(), ) @@ -2413,14 +2428,7 @@ func runCommand( stdout io.Writer, stderr io.Writer, ) int { - db, err := sql.Open(golite.DriverName, args.databasePath) - if err != nil { - fmt.Fprintln(stderr, err) - return 1 - } - defer db.Close() - - iqueue, err := NewWithPrefix(db, args.prefix) + iqueue, err := NewWithPrefix(args.databasePath, args.prefix) if err != nil { fmt.Fprintln(stderr, err) return 1 |
