summaryrefslogtreecommitdiff
path: root/src/q.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/q.go')
-rw-r--r--src/q.go30
1 files changed, 15 insertions, 15 deletions
diff --git a/src/q.go b/src/q.go
index 6eeefe6..de06383 100644
--- a/src/q.go
+++ b/src/q.go
@@ -10,7 +10,7 @@ import (
"sync"
"time"
- _ "acudego"
+ "golite"
"guuid"
g "gobang"
)
@@ -27,11 +27,6 @@ const (
-type dbI interface{
- findOne(q string, args []any, bindings []any) error
- exec(q string)
-}
-
type queryT struct{
write string
read string
@@ -338,6 +333,7 @@ func publishSQL(prefix string) queryT {
INSERT INTO "%s_payloads" (topic, payload)
VALUES (?, ?);
+ -- FIXME: must be inside a trnsaction
INSERT INTO "%s_messages" (uuid, flow_id, payload_id)
VALUES (?, ?, last_insert_rowid());
`
@@ -376,13 +372,17 @@ func publishStmt(
message_id_bytes := messageID[:]
flow_id_bytes := unsentMessage.FlowID[:]
- _, err := db.Exec(
- q.write,
- unsentMessage.Topic,
- unsentMessage.Payload,
- message_id_bytes,
- flow_id_bytes,
- )
+ err := inTx(db, func(ctx context.Context) error {
+ _, err := db.ExecContext(
+ ctx,
+ q.write,
+ unsentMessage.Topic,
+ unsentMessage.Payload,
+ message_id_bytes,
+ flow_id_bytes,
+ )
+ return err
+ })
if err != nil {
return messageT{}, err
}
@@ -700,7 +700,7 @@ func pendingStmt(
ownerStmt, err := db.Prepare(q.owner)
if err != nil {
- return nil, nil, err
+ return nil, nil, g.WrapErrors(readStmt.Close(), err)
}
fn := func(topic string, consumer string) (*sql.Rows, error) {
@@ -2413,7 +2413,7 @@ func runCommand(
stdout io.Writer,
stderr io.Writer,
) int {
- db, err := sql.Open("acude", args.databasePath)
+ db, err := sql.Open(golite.DriverName, args.databasePath)
if err != nil {
fmt.Fprintln(stderr, err)
return 1