aboutsummaryrefslogtreecommitdiff
path: root/src/q.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/q.go')
-rw-r--r--src/q.go32
1 files changed, 20 insertions, 12 deletions
diff --git a/src/q.go b/src/q.go
index de06383..79570df 100644
--- a/src/q.go
+++ b/src/q.go
@@ -126,6 +126,7 @@ type subscriptionsT struct {
}
type queueT struct{
+ db *sql.DB
queries queriesT
subscriptions subscriptionsT
pinger pingerT[struct{}]
@@ -174,6 +175,9 @@ func tryRollback(db *sql.DB, ctx context.Context, err error) error {
return err
}
+// FIXME
+// See:
+// https://sqlite.org/forum/forumpost/2507664507
func inTx(db *sql.DB, fn func(context.Context) error) error {
ctx := context.Background()
@@ -1821,12 +1825,17 @@ func runReaper(
}))
}
-func NewWithPrefix(db *sql.DB, prefix string) (IQueue, error) {
+func NewWithPrefix(databasePath string, prefix string) (IQueue, error) {
err := g.ValidateSQLTablePrefix(prefix)
if err != nil {
return queueT{}, err
}
+ db, err := sql.Open(golite.DriverName, databasePath)
+ if err != nil {
+ return queueT{}, err
+ }
+
subscriptions := makeSubscriptionsFuncs()
pinger := newPinger[struct{}]()
notifyFn := makeNotifyFn(subscriptions.read, pinger)
@@ -1838,14 +1847,15 @@ func NewWithPrefix(db *sql.DB, prefix string) (IQueue, error) {
go runReaper(pinger.onPing, subscriptions.read, subscriptions.write)
return queueT{
+ db: db,
queries: queries,
subscriptions: subscriptions,
pinger: pinger,
}, nil
}
-func New(db *sql.DB) (IQueue, error) {
- return NewWithPrefix(db, defaultPrefix)
+func New(databasePath string) (IQueue, error) {
+ return NewWithPrefix(databasePath, defaultPrefix)
}
func asPublicMessage(message messageT) Message {
@@ -1860,7 +1870,11 @@ func asPublicMessage(message messageT) Message {
func (queue queueT) Publish(unsent UnsentMessage) (Message, error) {
message, err := queue.queries.publish(unsent, guuid.New())
- return asPublicMessage(message), err
+ if err != nil {
+ return Message{}, err
+ }
+
+ return asPublicMessage(message), nil
}
func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error {
@@ -2106,6 +2120,7 @@ func cleanSubscriptions(set subscriptionsSetM) error {
func (queue queueT) Close() error {
queue.pinger.close()
return g.WrapErrors(
+ queue.db.Close(),
queue.subscriptions.write(cleanSubscriptions),
queue.queries.close(),
)
@@ -2413,14 +2428,7 @@ func runCommand(
stdout io.Writer,
stderr io.Writer,
) int {
- db, err := sql.Open(golite.DriverName, args.databasePath)
- if err != nil {
- fmt.Fprintln(stderr, err)
- return 1
- }
- defer db.Close()
-
- iqueue, err := NewWithPrefix(db, args.prefix)
+ iqueue, err := NewWithPrefix(args.databasePath, args.prefix)
if err != nil {
fmt.Fprintln(stderr, err)
return 1