aboutsummaryrefslogtreecommitdiff
path: root/src/fiinha.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/fiinha.go')
-rw-r--r--src/fiinha.go2517
1 files changed, 0 insertions, 2517 deletions
diff --git a/src/fiinha.go b/src/fiinha.go
deleted file mode 100644
index a819557..0000000
--- a/src/fiinha.go
+++ /dev/null
@@ -1,2517 +0,0 @@
-package fiinha
-
-import (
- "database/sql"
- "flag"
- "fmt"
- "io"
- "log/slog"
- "os"
- "sync"
- "time"
-
- "golite"
- "uuid"
- g "gobang"
-)
-
-
-
-const (
- defaultPrefix = "fiinha"
- reaperSkipCount = 1000
- notOwnerErrorFmt = "%v owns %#v as %#v, not us (%v)"
- rollbackErrorFmt = "rollback error: %w; while executing: %w"
-)
-
-
-
-type dbconfigT struct{
- shared *sql.DB
- dbpath string
- prefix string
- instanceID int
-}
-
-type queryT struct{
- write string
- read string
- owner string
-}
-
-type queriesT struct{
- take func(string, string) error
- publish func(UnsentMessage, uuid.UUID) (messageT, error)
- find func(string, uuid.UUID) (messageT, error)
- next func(string, string) (messageT, error)
- pending func(string, string, func(messageT) error) error
- commit func(string, uuid.UUID) error
- toDead func(string, uuid.UUID, uuid.UUID) error
- replay func(uuid.UUID, uuid.UUID) (messageT, error)
- oneDead func(string, string) (deadletterT, error)
- allDead func(string, string, func(deadletterT, messageT) error) error
- size func(string) (int, error)
- count func(string, string) (int, error)
- hasData func(string, string) (bool, error)
- close func() error
-}
-
-type messageT struct{
- id int64
- timestamp time.Time
- uuid uuid.UUID
- topic string
- flowID uuid.UUID
- payload []byte
-}
-
-type UnsentMessage struct{
- Topic string
- FlowID uuid.UUID
- Payload []byte
-}
-
-type Message struct{
- ID uuid.UUID
- Timestamp time.Time
- Topic string
- FlowID uuid.UUID
- Payload []byte
-}
-
-type deadletterT struct{
- uuid uuid.UUID
- timestamp time.Time
- consumer string
- messageID uuid.UUID
-}
-
-type pingerT[T any] struct{
- tryPing func(T)
- onPing func(func(T))
- closed func() bool
- close func()
-}
-
-type consumerDataT struct{
- topic string
- name string
-}
-
-type waiterDataT struct{
- topic string
- flowID uuid.UUID
- name string
-
-}
-
-type consumerT struct{
- data consumerDataT
- callback func(Message) error
- pinger pingerT[struct{}]
- close *func()
-}
-
-type waiterT struct{
- data waiterDataT
- pinger pingerT[[]byte]
- closed *func() bool
- close *func()
-}
-
-type topicSubscriptionT struct{
- consumers map[string]consumerT
- waiters map[uuid.UUID]map[string]waiterT
-}
-
-type subscriptionsSetM map[string]topicSubscriptionT
-
-type subscriptionsT struct {
- read func(func(subscriptionsSetM) error) error
- write func(func(subscriptionsSetM) error) error
-}
-
-type queueT struct{
- queries queriesT
- subscriptions subscriptionsT
- pinger pingerT[struct{}]
-}
-
-type argsT struct{
- databasePath string
- prefix string
- command string
- allArgs []string
- args []string
- topic string
- consumer string
-}
-
-type commandT struct{
- name string
- getopt func(argsT, io.Writer) (argsT, bool)
- exec func(argsT, queriesT, io.Reader, io.Writer) (int, error)
-}
-
-type IQueue interface{
- Publish(UnsentMessage) (Message, error)
- Subscribe( string, string, func(Message) error) error
- Unsubscribe(string, string)
- WaitFor(string, uuid.UUID, string) Waiter
- Close() error
-}
-
-
-
-func tryRollback(tx *sql.Tx, err error) error {
- rollbackErr := tx.Rollback()
- if rollbackErr != nil {
- return fmt.Errorf(
- rollbackErrorFmt,
- rollbackErr,
- err,
- )
- }
-
- return err
-}
-
-func inTx(db *sql.DB, fn func(*sql.Tx) error) error {
- tx, err := db.Begin()
- if err != nil {
- return err
- }
-
- err = fn(tx)
- if err != nil {
- return tryRollback(tx, err)
- }
-
- err = tx.Commit()
- if err != nil {
- return tryRollback(tx, err)
- }
-
- return nil
-}
-
-func serialized[A any, B any](callback func(...A) B) (func(...A) B, func()) {
- in := make(chan []A)
- out := make(chan B)
-
- closed := false
- var (
- closeWg sync.WaitGroup
- closeMutex sync.Mutex
- )
- closeWg.Add(1)
-
- go func() {
- for input := range in {
- out <- callback(input...)
- }
- close(out)
- closeWg.Done()
- }()
-
- fn := func(input ...A) B {
- in <- input
- return (<- out)
- }
-
- closeFn := func() {
- closeMutex.Lock()
- defer closeMutex.Unlock()
- if closed {
- return
- }
- close(in)
- closed = true
- closeWg.Wait()
- }
-
- return fn, closeFn
-}
-
-func execSerialized(query string, db *sql.DB) (func(...any) error, func()) {
- return serialized(func(args ...any) error {
- return inTx(db, func(tx *sql.Tx) error {
- _, err := tx.Exec(query, args...)
- return err
- })
- })
-}
-
-func createTablesSQL(prefix string) queryT {
- const tmpl_write = `
- CREATE TABLE IF NOT EXISTS "%s_payloads" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- topic TEXT NOT NULL,
- payload BLOB NOT NULL
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "%s_payloads_topic"
- ON "%s_payloads"(topic);
-
- CREATE TABLE IF NOT EXISTS "%s_messages" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- uuid BLOB NOT NULL UNIQUE,
- flow_id BLOB NOT NULL,
- payload_id INTEGER NOT NULL
- REFERENCES "%s_payloads"(id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "%s_messages_flow_id"
- ON "%s_messages"(flow_id);
-
- CREATE TABLE IF NOT EXISTS "%s_offsets" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- consumer TEXT NOT NULL,
- message_id INTEGER NOT NULL
- REFERENCES "%s_messages"(id),
- instance_id INTEGER NOT NULL,
- UNIQUE (consumer, message_id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "%s_offsets_consumer"
- ON "%s_offsets"(consumer);
-
- CREATE TABLE IF NOT EXISTS "%s_deadletters" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- uuid BLOB NOT NULL UNIQUE,
- consumer TEXT NOT NULL,
- message_id INTEGER NOT NULL
- REFERENCES "%s_messages"(id),
- instance_id INTEGER NOT NULL,
- UNIQUE (consumer, message_id)
- ) STRICT;
- CREATE INDEX IF NOT EXISTS "%s_deadletters_consumer"
- ON "%s_deadletters"(consumer);
-
- CREATE TABLE IF NOT EXISTS "%s_replays" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- deadletter_id INTEGER NOT NULL UNIQUE
- REFERENCES "%s_deadletters"(id) ,
- message_id INTEGER NOT NULL UNIQUE
- REFERENCES "%s_messages"(id)
- ) STRICT;
-
- CREATE TABLE IF NOT EXISTS "%s_owners" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- topic TEXT NOT NULL,
- consumer TEXT NOT NULL,
- owner_id INTEGER NOT NULL,
- UNIQUE (topic, consumer)
- ) STRICT;
-
- CREATE TRIGGER IF NOT EXISTS "%s_check_instance_owns_topic"
- BEFORE INSERT ON "%s_offsets"
- WHEN NEW.instance_id != (
- SELECT owner_id FROM "%s_owners"
- WHERE topic = (
- SELECT "%s_payloads".topic
- FROM "%s_payloads"
- JOIN "%s_messages" ON "%s_payloads".id =
- "%s_messages".payload_id
- WHERE "%s_messages".id = NEW.message_id
- ) AND consumer = NEW.consumer
- )
- BEGIN
- SELECT RAISE(
- ABORT,
- 'instance does not own topic/consumer combo'
- );
- END;
-
- CREATE TRIGGER IF NOT EXISTS "%s_check_can_publish_deadletter"
- BEFORE INSERT ON "%s_deadletters"
- WHEN NEW.instance_id != (
- SELECT owner_id FROM "%s_owners"
- WHERE topic = (
- SELECT "%s_payloads".topic
- FROM "%s_payloads"
- JOIN "%s_messages" ON "%s_payloads".id =
- "%s_messages".payload_id
- WHERE "%s_messages".id = NEW.message_id
- ) AND consumer = NEW.consumer
- )
- BEGIN
- SELECT RAISE(
- ABORT,
- 'Instance does not own topic/consumer combo'
- );
- END;
- `
- return queryT{
- write: fmt.Sprintf(
- tmpl_write,
- prefix,
- g.SQLiteNow,
- prefix,
- prefix,
- prefix,
- g.SQLiteNow,
- prefix,
- prefix,
- prefix,
- prefix,
- g.SQLiteNow,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func createTables(db *sql.DB, prefix string) error {
- q := createTablesSQL(prefix)
-
- return inTx(db, func(tx *sql.Tx) error {
- _, err := tx.Exec(q.write)
- return err
- })
-}
-
-func takeSQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_owners" (topic, consumer, owner_id)
- VALUES (?, ?, ?)
- ON CONFLICT (topic, consumer) DO
- UPDATE SET owner_id=excluded.owner_id;
- `
- return queryT{
- write: fmt.Sprintf(tmpl_write, prefix),
- }
-}
-
-func takeStmt(
- cfg dbconfigT,
-) (func(string, string) error, func() error, error) {
- q := takeSQL(cfg.prefix)
-
- writeStmt, err := cfg.shared.Prepare(q.write)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) error {
- _, err := writeStmt.Exec(
- topic,
- consumer,
- cfg.instanceID,
- )
- return err
- }
-
- return fn, writeStmt.Close, nil
-}
-
-func publishSQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_payloads" (topic, payload)
- VALUES (?, ?);
-
- INSERT INTO "%s_messages" (uuid, flow_id, payload_id)
- VALUES (?, ?, last_insert_rowid());
- `
- const tmpl_read = `
- SELECT id, timestamp FROM "%s_messages"
- WHERE uuid = ?;
- `
- return queryT{
- write: fmt.Sprintf(tmpl_write, prefix, prefix),
- read: fmt.Sprintf(tmpl_read, prefix),
- }
-}
-
-func publishStmt(
- cfg dbconfigT,
-) (func(UnsentMessage, uuid.UUID) (messageT, error), func() error, error) {
- q := publishSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- privateDB, err := sql.Open(golite.DriverName, cfg.dbpath)
- if err != nil {
- readStmt.Close()
- return nil, nil, err
- }
-
- writeFn, writeFnClose := execSerialized(q.write, privateDB)
-
- fn := func(
- unsentMessage UnsentMessage,
- messageID uuid.UUID,
- ) (messageT, error) {
- message := messageT{
- uuid: messageID,
- topic: unsentMessage.Topic,
- flowID: unsentMessage.FlowID,
- payload: unsentMessage.Payload,
- }
-
- message_id_bytes := messageID[:]
- flow_id_bytes := unsentMessage.FlowID[:]
- err := writeFn(
- unsentMessage.Topic,
- unsentMessage.Payload,
- message_id_bytes,
- flow_id_bytes,
- )
- if err != nil {
- return messageT{}, err
- }
-
- var timestr string
- err = readStmt.QueryRow(message_id_bytes).Scan(
- &message.id,
- &timestr,
- )
- if err != nil {
- return messageT{}, err
- }
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- return messageT{}, err
- }
-
- return message, nil
- }
-
- closeFn := func() error {
- writeFnClose()
- return g.SomeError(privateDB.Close(), readStmt.Close())
- }
-
- return fn, closeFn, nil
-}
-
-func findSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- "%s_messages".id,
- "%s_messages".timestamp,
- "%s_messages".uuid,
- "%s_payloads".payload
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_messages".flow_id = ?
- ORDER BY "%s_messages".id DESC
- LIMIT 1;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func findStmt(
- cfg dbconfigT,
-) (func(string, uuid.UUID) (messageT, error), func() error, error) {
- q := findSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, flowID uuid.UUID) (messageT, error) {
- message := messageT{
- topic: topic,
- flowID: flowID,
- }
-
- var (
- timestr string
- message_id_bytes []byte
- )
- flow_id_bytes := flowID[:]
- err = readStmt.QueryRow(topic, flow_id_bytes).Scan(
- &message.id,
- &timestr,
- &message_id_bytes,
- &message.payload,
- )
- if err != nil {
- return messageT{}, err
- }
- message.uuid = uuid.UUID(message_id_bytes)
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- return messageT{}, err
- }
-
- return message, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func nextSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- (
- SELECT owner_id FROM "%s_owners"
- WHERE
- topic = ? AND
- consumer = ?
- LIMIT 1
- ) AS owner_id,
- "%s_messages".id,
- "%s_messages".timestamp,
- "%s_messages".uuid,
- "%s_messages".flow_id,
- "%s_payloads".payload
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_messages".id NOT IN (
- SELECT message_id FROM "%s_offsets"
- WHERE consumer = ?
- )
- ORDER BY "%s_messages".id ASC
- LIMIT 1;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func nextStmt(
- cfg dbconfigT,
-) (func(string, string) (messageT, error), func() error, error) {
- q := nextSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (messageT, error) {
- message := messageT{
- topic: topic,
- }
-
- var (
- ownerID int
- timestr string
- message_id_bytes []byte
- flow_id_bytes []byte
- )
-
- err = readStmt.QueryRow(topic, consumer, topic, consumer).Scan(
- &ownerID,
- &message.id,
- &timestr,
- &message_id_bytes,
- &flow_id_bytes,
- &message.payload,
- )
- if err != nil {
- return messageT{}, err
- }
-
- if ownerID != cfg.instanceID {
- err := fmt.Errorf(
- notOwnerErrorFmt,
- ownerID,
- topic,
- consumer,
- cfg.instanceID,
- )
- return messageT{}, err
- }
- message.uuid = uuid.UUID(message_id_bytes)
- message.flowID = uuid.UUID(flow_id_bytes)
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- return messageT{}, err
- }
-
- return message, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func messageEach(rows *sql.Rows, callback func(messageT) error) error {
- if rows == nil {
- return nil
- }
-
- for rows.Next() {
- var (
- message messageT
- timestr string
- message_id_bytes []byte
- flow_id_bytes []byte
- )
- err := rows.Scan(
- &message.id,
- &timestr,
- &message_id_bytes,
- &flow_id_bytes,
- &message.topic,
- &message.payload,
- )
- if err != nil {
- rows.Close()
- return err
- }
- message.uuid = uuid.UUID(message_id_bytes)
- message.flowID = uuid.UUID(flow_id_bytes)
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- rows.Close()
- return err
- }
-
- err = callback(message)
- if err != nil {
- rows.Close()
- return err
- }
- }
-
- return g.WrapErrors(rows.Err(), rows.Close())
-}
-
-func pendingSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- "%s_messages".id,
- "%s_messages".timestamp,
- "%s_messages".uuid,
- "%s_messages".flow_id,
- "%s_payloads".topic,
- "%s_payloads".payload
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_messages".id NOT IN (
- SELECT message_id FROM "%s_offsets"
- WHERE consumer = ?
- )
- ORDER BY "%s_messages".id ASC;
- `
- const tmpl_owner = `
- SELECT owner_id FROM "%s_owners"
- WHERE
- topic = ? AND
- consumer = ?;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- owner: fmt.Sprintf(tmpl_owner, prefix),
- }
-}
-
-func pendingStmt(
- cfg dbconfigT,
-) (func(string, string) (*sql.Rows, error), func() error, error) {
- q := pendingSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- ownerStmt, err := cfg.shared.Prepare(q.owner)
- if err != nil {
- readStmt.Close()
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (*sql.Rows, error) {
- var ownerID int
- err := ownerStmt.QueryRow(topic, consumer).Scan(&ownerID)
- if err != nil {
- return nil, err
- }
-
- // best effort check, the final one is done during
- // commit within a transaction
- if ownerID != cfg.instanceID {
- return nil, nil
- }
-
- return readStmt.Query(topic, consumer)
- }
-
- closeFn := func() error {
- return g.SomeFnError(readStmt.Close, ownerStmt.Close)
- }
-
- return fn, closeFn, nil
-}
-
-func commitSQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_offsets" (consumer, message_id, instance_id)
- VALUES (?, (SELECT id FROM "%s_messages" WHERE uuid = ?), ?);
- `
- return queryT{
- write: fmt.Sprintf(tmpl_write, prefix, prefix),
- }
-}
-
-func commitStmt(
- cfg dbconfigT,
-) (func(string, uuid.UUID) error, func() error, error) {
- q := commitSQL(cfg.prefix)
-
- writeStmt, err := cfg.shared.Prepare(q.write)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(consumer string, messageID uuid.UUID) error {
- message_id_bytes := messageID[:]
- _, err = writeStmt.Exec(
- consumer,
- message_id_bytes,
- cfg.instanceID,
- )
- return err
- }
-
- return fn, writeStmt.Close, nil
-}
-
-func toDeadSQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_offsets"
- ( consumer, message_id, instance_id)
- VALUES ( ?, (SELECT id FROM "%s_messages" WHERE uuid = ?), ?);
-
- INSERT INTO "%s_deadletters"
- (uuid, consumer, message_id, instance_id)
- VALUES (?, ?, (SELECT id FROM "%s_messages" WHERE uuid = ?), ?);
- `
- return queryT{
- write: fmt.Sprintf(tmpl_write, prefix, prefix, prefix, prefix),
- }
-}
-
-func toDeadStmt(
- cfg dbconfigT,
-) (
- func(string, uuid.UUID, uuid.UUID) error,
- func() error,
- error,
-) {
- q := toDeadSQL(cfg.prefix)
-
- privateDB, err := sql.Open(golite.DriverName, cfg.dbpath)
- if err != nil {
- return nil, nil, err
- }
-
- writeFn, writeFnClose := execSerialized(q.write, privateDB)
-
- fn := func(
- consumer string,
- messageID uuid.UUID,
- deadletterID uuid.UUID,
- ) error {
- message_id_bytes := messageID[:]
- deadletter_id_bytes := deadletterID[:]
- return writeFn(
- consumer,
- message_id_bytes,
- cfg.instanceID,
- deadletter_id_bytes,
- consumer,
- message_id_bytes,
- cfg.instanceID,
- )
- }
-
- closeFn := func() error {
- writeFnClose()
- return privateDB.Close()
- }
-
-
- return fn, closeFn, nil
-}
-
-func replaySQL(prefix string) queryT {
- const tmpl_write = `
- INSERT INTO "%s_messages" (uuid, flow_id, payload_id)
- SELECT
- ?,
- "%s_messages".flow_id,
- "%s_messages".payload_id
- FROM "%s_messages"
- JOIN "%s_deadletters" ON
- "%s_messages".id = "%s_deadletters".message_id
- WHERE "%s_deadletters".uuid = ?;
-
- INSERT INTO "%s_replays" (deadletter_id, message_id)
- VALUES (
- (SELECT id FROM "%s_deadletters" WHERE uuid = ?),
- last_insert_rowid()
- );
- `
- const tmpl_read = `
- SELECT
- "%s_messages".id,
- "%s_messages".timestamp,
- "%s_messages".flow_id,
- "%s_payloads".topic,
- "%s_payloads".payload
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE "%s_messages".uuid = ?;
- `
- return queryT{
- write: fmt.Sprintf(
- tmpl_write,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func replayStmt(
- cfg dbconfigT,
-) (func(uuid.UUID, uuid.UUID) (messageT, error), func() error, error) {
- q := replaySQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- privateDB, err := sql.Open(golite.DriverName, cfg.dbpath)
- if err != nil {
- readStmt.Close()
- return nil, nil, err
- }
-
- writeFn, writeFnClose := execSerialized(q.write, privateDB)
-
- fn := func(
- deadletterID uuid.UUID,
- messageID uuid.UUID,
- ) (messageT, error) {
- deadletter_id_bytes := deadletterID[:]
- message_id_bytes := messageID[:]
- err := writeFn(
- message_id_bytes,
- deadletter_id_bytes,
- deadletter_id_bytes,
- )
- if err != nil {
- return messageT{}, err
- }
-
- message := messageT{
- uuid: messageID,
- }
-
- var (
- timestr string
- flow_id_bytes []byte
- )
- err = readStmt.QueryRow(message_id_bytes).Scan(
- &message.id,
- &timestr,
- &flow_id_bytes,
- &message.topic,
- &message.payload,
- )
- if err != nil {
- return messageT{}, err
- }
- message.flowID = uuid.UUID(flow_id_bytes)
-
- message.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
- if err != nil {
- return messageT{}, err
- }
-
- return message, nil
- }
-
- closeFn := func() error {
- writeFnClose()
- return g.SomeError(privateDB.Close(), readStmt.Close())
- }
-
- return fn, closeFn, nil
-}
-
-func oneDeadSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- "%s_deadletters".uuid,
- "%s_offsets".timestamp,
- "%s_messages".uuid
- FROM "%s_deadletters"
- JOIN "%s_offsets" ON
- "%s_deadletters".message_id = "%s_offsets".message_id
- JOIN "%s_messages" ON
- "%s_deadletters".message_id = "%s_messages".id
- JOIN "%s_payloads" ON
- "%s_messages".payload_id = "%s_payloads".id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_deadletters".consumer = ? AND
- "%s_offsets".consumer = ? AND
- "%s_deadletters".id NOT IN (
- SELECT deadletter_id FROM "%s_replays"
- )
- ORDER BY "%s_deadletters".id ASC
- LIMIT 1;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func oneDeadStmt(
- cfg dbconfigT,
-) (func(string, string) (deadletterT, error), func() error, error) {
- q := oneDeadSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (deadletterT, error) {
- deadletter := deadletterT{
- consumer: consumer,
- }
-
- var (
- deadletter_id_bytes []byte
- timestr string
- message_id_bytes []byte
- )
- err := readStmt.QueryRow(topic, consumer, consumer).Scan(
- &deadletter_id_bytes,
- &timestr,
- &message_id_bytes,
- )
- if err != nil {
- return deadletterT{}, err
- }
- deadletter.uuid = uuid.UUID(deadletter_id_bytes)
- deadletter.messageID = uuid.UUID(message_id_bytes)
-
- deadletter.timestamp, err = time.Parse(
- time.RFC3339Nano,
- timestr,
- )
- if err != nil {
- return deadletterT{}, err
- }
-
- return deadletter, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func deadletterEach(
- rows *sql.Rows,
- callback func(deadletterT, messageT) error,
-) error {
- for rows.Next() {
- var (
- deadletter deadletterT
- deadletter_id_bytes []byte
- deadletterTimestr string
- message messageT
- messageTimestr string
- message_id_bytes []byte
- flow_id_bytes []byte
- )
- err := rows.Scan(
- &deadletter_id_bytes,
- &message.id,
- &deadletterTimestr,
- &deadletter.consumer,
- &messageTimestr,
- &message_id_bytes,
- &flow_id_bytes,
- &message.topic,
- &message.payload,
- )
- if err != nil {
- rows.Close()
- return err
- }
-
- deadletter.uuid = uuid.UUID(deadletter_id_bytes)
- deadletter.messageID = uuid.UUID(message_id_bytes)
- message.uuid = uuid.UUID(message_id_bytes)
- message.flowID = uuid.UUID(flow_id_bytes)
-
- message.timestamp, err = time.Parse(
- time.RFC3339Nano,
- messageTimestr,
- )
- if err != nil {
- rows.Close()
- return err
- }
-
- deadletter.timestamp, err = time.Parse(
- time.RFC3339Nano,
- deadletterTimestr,
- )
- if err != nil {
- rows.Close()
- return err
- }
-
- err = callback(deadletter, message)
- if err != nil {
- rows.Close()
- return err
- }
- }
-
- return g.WrapErrors(rows.Err(), rows.Close())
-}
-
-func allDeadSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- "%s_deadletters".uuid,
- "%s_deadletters".message_id,
- "%s_offsets".timestamp,
- "%s_offsets".consumer,
- "%s_messages".timestamp,
- "%s_messages".uuid,
- "%s_messages".flow_id,
- "%s_payloads".topic,
- "%s_payloads".payload
- FROM "%s_deadletters"
- JOIN "%s_offsets" ON
- "%s_deadletters".message_id = "%s_offsets".message_id
- JOIN "%s_messages" ON
- "%s_deadletters".message_id = "%s_messages".id
- JOIN "%s_payloads" ON
- "%s_messages".payload_id = "%s_payloads".id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_deadletters".consumer = ? AND
- "%s_offsets".consumer = ? AND
- "%s_deadletters".id NOT IN (
- SELECT deadletter_id FROM "%s_replays"
- )
- ORDER BY "%s_deadletters".id ASC;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func allDeadStmt(
- cfg dbconfigT,
-) (func(string, string) (*sql.Rows, error), func() error, error) {
- q := allDeadSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (*sql.Rows, error) {
- return readStmt.Query(topic, consumer, consumer)
- }
-
- return fn, readStmt.Close, nil
-}
-
-func sizeSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- COUNT(1) as size
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_messages".payload_id = "%s_payloads".id
- WHERE "%s_payloads".topic = ?;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-
-func sizeStmt(
- cfg dbconfigT,
-) (func(string) (int, error), func() error, error) {
- q := sizeSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string) (int, error) {
- var size int
- err := readStmt.QueryRow(topic).Scan(&size)
- if err != nil {
- return -1, err
- }
-
- return size, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func countSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT
- COUNT(1) as count
- FROM "%s_messages"
- JOIN "%s_offsets" ON
- "%s_messages".id = "%s_offsets".message_id
- JOIN "%s_payloads" ON
- "%s_messages".payload_id = "%s_payloads".id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_offsets".consumer = ?;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func countStmt(
- cfg dbconfigT,
-) (func(string, string) (int, error), func() error, error) {
- q := countSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (int, error) {
- var count int
- err := readStmt.QueryRow(topic, consumer).Scan(&count)
- if err != nil {
- return -1, err
- }
-
- return count, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func hasDataSQL(prefix string) queryT {
- const tmpl_read = `
- SELECT 1 as data
- FROM "%s_messages"
- JOIN "%s_payloads" ON
- "%s_payloads".id = "%s_messages".payload_id
- WHERE
- "%s_payloads".topic = ? AND
- "%s_messages".id NOT IN (
- SELECT message_id FROM "%s_offsets"
- WHERE consumer = ?
- )
- LIMIT 1;
- `
- return queryT{
- read: fmt.Sprintf(
- tmpl_read,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- prefix,
- ),
- }
-}
-
-func hasDataStmt(
- cfg dbconfigT,
-) (func(string, string) (bool, error), func() error, error) {
- q := hasDataSQL(cfg.prefix)
-
- readStmt, err := cfg.shared.Prepare(q.read)
- if err != nil {
- return nil, nil, err
- }
-
- fn := func(topic string, consumer string) (bool, error) {
- var _x int
- err := readStmt.QueryRow(topic, consumer).Scan(&_x)
- if err == sql.ErrNoRows {
- return false, nil
- }
-
- if err != nil {
- return false, err
- }
-
- return true, nil
- }
-
- return fn, readStmt.Close, nil
-}
-
-func initDB(
- dbpath string,
- prefix string,
- notifyFn func(messageT),
- instanceID int,
-) (queriesT, error) {
- err := g.ValidateSQLTablePrefix(prefix)
- if err != nil {
- return queriesT{}, err
- }
-
- shared, err := sql.Open(golite.DriverName, dbpath)
- if err != nil {
- return queriesT{}, err
- }
-
- cfg := dbconfigT{
- shared: shared,
- dbpath: dbpath,
- prefix: prefix,
- instanceID: instanceID,
- }
-
- createTablesErr := createTables(shared, prefix)
- take, takeClose, takeErr := takeStmt(cfg)
- publish, publishClose, publishErr := publishStmt(cfg)
- find, findClose, findErr := findStmt(cfg)
- next, nextClose, nextErr := nextStmt(cfg)
- pending, pendingClose, pendingErr := pendingStmt(cfg)
- commit, commitClose, commitErr := commitStmt(cfg)
- toDead, toDeadClose, toDeadErr := toDeadStmt(cfg)
- replay, replayClose, replayErr := replayStmt(cfg)
- oneDead, oneDeadClose, oneDeadErr := oneDeadStmt(cfg)
- allDead, allDeadClose, allDeadErr := allDeadStmt(cfg)
- size, sizeClose, sizeErr := sizeStmt(cfg)
- count, countClose, countErr := countStmt(cfg)
- hasData, hasDataClose, hasDataErr := hasDataStmt(cfg)
-
- err = g.SomeError(
- createTablesErr,
- takeErr,
- publishErr,
- findErr,
- nextErr,
- pendingErr,
- commitErr,
- toDeadErr,
- replayErr,
- oneDeadErr,
- allDeadErr,
- sizeErr,
- countErr,
- hasDataErr,
- )
- if err != nil {
- return queriesT{}, err
- }
-
- closeFn := func() error {
- return g.SomeFnError(
- takeClose,
- publishClose,
- findClose,
- nextClose,
- pendingClose,
- commitClose,
- toDeadClose,
- replayClose,
- oneDeadClose,
- allDeadClose,
- sizeClose,
- countClose,
- hasDataClose,
- shared.Close,
- )
- }
-
- var connMutex sync.RWMutex
- return queriesT{
- take: func(a string, b string) error {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return take(a, b)
- },
- publish: func(a UnsentMessage, b uuid.UUID) (messageT, error) {
- var (
- err error
- message messageT
- )
- {
- connMutex.RLock()
- defer connMutex.RUnlock()
- message, err = publish(a, b)
- }
- if err != nil {
- return messageT{}, err
- }
-
- go notifyFn(message)
- return message, nil
- },
- find: func(a string, b uuid.UUID) (messageT, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return find(a, b)
- },
- next: func(a string, b string) (messageT, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return next(a, b)
- },
- pending: func(
- a string,
- b string,
- callback func(messageT) error,
- ) error {
- var (
- err error
- rows *sql.Rows
- )
- {
- connMutex.RLock()
- defer connMutex.RUnlock()
- rows, err = pending(a, b)
- }
- if err != nil {
- return err
- }
-
- return messageEach(rows, callback)
- },
- commit: func(a string, b uuid.UUID) error {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return commit(a, b)
- },
- toDead: func(
- a string,
- b uuid.UUID,
- c uuid.UUID,
- ) error {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return toDead(a, b, c)
- },
- replay: func(a uuid.UUID, b uuid.UUID) (messageT, error) {
- var (
- err error
- message messageT
- )
- {
- connMutex.RLock()
- defer connMutex.RUnlock()
- message, err = replay(a, b)
- }
- if err != nil {
- return messageT{}, err
- }
-
- go notifyFn(message)
- return message, nil
- },
- oneDead: func(a string, b string) (deadletterT, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return oneDead(a, b)
- },
- allDead: func(
- a string,
- b string,
- callback func(deadletterT, messageT) error,
- ) error {
- var (
- err error
- rows *sql.Rows
- )
- {
- connMutex.RLock()
- defer connMutex.RUnlock()
- rows, err = allDead(a, b)
- }
- if err != nil {
- return err
- }
-
- return deadletterEach(rows, callback)
- },
- size: func(a string) (int, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return size(a)
- },
- count: func(a string, b string) (int, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return count(a, b)
- },
- hasData: func(a string, b string) (bool, error) {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return hasData(a, b)
- },
- close: func() error {
- connMutex.Lock()
- defer connMutex.Unlock()
- return closeFn()
- },
- }, nil
-}
-
-
-func newPinger[T any]() pingerT[T] {
- channel := make(chan T, 1)
- closed := false
- var rwmutex sync.RWMutex
- return pingerT[T]{
- tryPing: func(x T) {
- rwmutex.RLock()
- defer rwmutex.RUnlock()
- if closed {
- return
- }
- select {
- case channel <- x:
- default:
- }
- },
- onPing: func(cb func(T)) {
- for x := range channel {
- cb(x)
- }
- },
- closed: func() bool {
- rwmutex.RLock()
- defer rwmutex.RUnlock()
- return closed
- },
- close: func() {
- rwmutex.Lock()
- defer rwmutex.Unlock()
- if closed {
- return
- }
- close(channel)
- closed = true
- },
- }
-}
-
-func makeSubscriptionsFuncs() subscriptionsT {
- var rwmutex sync.RWMutex
- subscriptions := subscriptionsSetM{}
- return subscriptionsT{
- read: func(callback func(subscriptionsSetM) error) error {
- rwmutex.RLock()
- defer rwmutex.RUnlock()
- return callback(subscriptions)
- },
- write: func(callback func(subscriptionsSetM) error) error {
- rwmutex.Lock()
- defer rwmutex.Unlock()
- return callback(subscriptions)
- },
- }
-}
-
-// Try notifying the consumer that they have data to work with. If they're
-// already full, we simply drop the notification, as on each they'll look for
-// all pending messages and process them all. So dropping the event here
-// doesn't mean not notifying the consumer, but simply acknoledging that the
-// existing notifications are enough for them to work with, without letting any
-// message slip through.
-func makeNotifyFn(
- readFn func(func(subscriptionsSetM) error) error,
- pinger pingerT[struct{}],
-) func(messageT) {
- return func(message messageT) {
- readFn(func(set subscriptionsSetM) error {
- topicSub, ok := set[message.topic]
- if !ok {
- return nil
- }
-
- for _, consumer := range topicSub.consumers {
- consumer.pinger.tryPing(struct{}{})
- }
- waiters := topicSub.waiters[message.flowID]
- for _, waiter := range waiters {
- waiter.pinger.tryPing(message.payload)
- }
- return nil
- })
- pinger.tryPing(struct{}{})
- }
-}
-
-func collectClosedWaiters(
- set subscriptionsSetM,
-) map[string]map[uuid.UUID][]string {
- waiters := map[string]map[uuid.UUID][]string{}
- for topic, topicSub := range set {
- waiters[topic] = map[uuid.UUID][]string{}
- for flowID, waitersByName := range topicSub.waiters {
- names := []string{}
- for name, waiter := range waitersByName {
- if (*waiter.closed)() {
- names = append(names, name)
- }
- }
- waiters[topic][flowID] = names
- }
- }
-
- return waiters
-}
-
-func trimEmptyLeaves(closedWaiters map[string]map[uuid.UUID][]string) {
- for topic, waiters := range closedWaiters {
- for flowID, names := range waiters {
- if len(names) == 0 {
- delete(closedWaiters[topic], flowID)
- }
- }
- if len(waiters) == 0 {
- delete(closedWaiters, topic)
- }
- }
-}
-
-func deleteIfEmpty(set subscriptionsSetM, topic string) {
- topicSub, ok := set[topic]
- if !ok {
- return
- }
-
- emptyConsumers := len(topicSub.consumers) == 0
- emptyWaiters := len(topicSub.waiters) == 0
- if emptyConsumers && emptyWaiters {
- delete(set, topic)
- }
-}
-
-func deleteEmptyTopics(set subscriptionsSetM) {
- for topic, _ := range set {
- deleteIfEmpty(set, topic)
- }
-}
-
-func removeClosedWaiters(
- set subscriptionsSetM,
- closedWaiters map[string]map[uuid.UUID][]string,
-) {
- for topic, waiters := range closedWaiters {
- _, ok := set[topic]
- if !ok {
- continue
- }
-
- for flowID, names := range waiters {
- if set[topic].waiters[flowID] == nil {
- continue
- }
- for _, name := range names {
- delete(set[topic].waiters[flowID], name)
- }
- if len(set[topic].waiters[flowID]) == 0 {
- delete(set[topic].waiters, flowID)
- }
- }
- }
-
- deleteEmptyTopics(set)
-}
-
-func reapClosedWaiters(
- readFn func(func(subscriptionsSetM) error) error,
- writeFn func(func(subscriptionsSetM) error) error,
-) {
- var closedWaiters map[string]map[uuid.UUID][]string
- readFn(func(set subscriptionsSetM) error {
- closedWaiters = collectClosedWaiters(set)
- return nil
- })
-
- trimEmptyLeaves(closedWaiters)
- if len(closedWaiters) == 0 {
- return
- }
-
- writeFn(func(set subscriptionsSetM) error {
- removeClosedWaiters(set, closedWaiters)
- return nil
- })
-}
-
-func everyNthCall[T any](n int, fn func(T)) func(T) {
- i := 0
- return func(x T) {
- i++
- if i == n {
- i = 0
- fn(x)
- }
- }
-}
-
-func runReaper(
- onPing func(func(struct{})),
- readFn func(func(subscriptionsSetM) error) error,
- writeFn func(func(subscriptionsSetM) error) error,
-) {
- onPing(everyNthCall(reaperSkipCount, func(struct{}) {
- reapClosedWaiters(readFn, writeFn)
- }))
-}
-
-func NewWithPrefix(databasePath string, prefix string) (IQueue, error) {
- subscriptions := makeSubscriptionsFuncs()
- pinger := newPinger[struct{}]()
- notifyFn := makeNotifyFn(subscriptions.read, pinger)
- queries, err := initDB(databasePath, prefix, notifyFn, os.Getpid())
- if err != nil {
- return queueT{}, err
- }
-
- go runReaper(pinger.onPing, subscriptions.read, subscriptions.write)
-
- return queueT{
- queries: queries,
- subscriptions: subscriptions,
- pinger: pinger,
- }, nil
-}
-
-func New(databasePath string) (IQueue, error) {
- return NewWithPrefix(databasePath, defaultPrefix)
-}
-
-func asPublicMessage(message messageT) Message {
- return Message{
- ID: message.uuid,
- Timestamp: message.timestamp,
- Topic: message.topic,
- FlowID: message.flowID,
- Payload: message.payload,
- }
-}
-
-func (queue queueT) Publish(unsent UnsentMessage) (Message, error) {
- message, err := queue.queries.publish(unsent, uuid.New())
- if err != nil {
- return Message{}, err
- }
-
- return asPublicMessage(message), nil
-}
-
-func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error {
- topicSub := topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- }
-
- return func(set subscriptionsSetM) error {
- topic := consumer.data.topic
- _, ok := set[topic]
- if !ok {
- set[topic] = topicSub
- }
- set[topic].consumers[consumer.data.name] = consumer
-
- return nil
- }
-}
-
-func registerWaiterFn(waiter waiterT) func(subscriptionsSetM) error {
- topicSub := topicSubscriptionT{
- consumers: map[string]consumerT{},
- waiters: map[uuid.UUID]map[string]waiterT{},
- }
- waiters := map[string]waiterT{}
-
- return func(set subscriptionsSetM) error {
- var (
- topic = waiter.data.topic
- flowID = waiter.data.flowID
- )
- _, ok := set[topic]
- if !ok {
- set[topic] = topicSub
- }
- if set[topic].waiters[flowID] == nil {
- set[topic].waiters[flowID] = waiters
- }
- set[topic].waiters[flowID][waiter.data.name] = waiter
- return nil
- }
-}
-
-func makeConsumeOneFn(
- data consumerDataT,
- callback func(Message) error,
- successFn func(string, uuid.UUID) error,
- errorFn func(string, uuid.UUID, uuid.UUID) error,
-) func(messageT) error {
- return func(message messageT) error {
- err := callback(asPublicMessage(message))
- if err != nil {
- g.Info(
- "consumer failed", "fiinha-consumer",
- "topic", data.topic,
- "consumer", data.name,
- "error", err,
- slog.Group(
- "message",
- "id", message.id,
- "flow-id", message.flowID.String(),
- ),
- )
-
- return errorFn(data.name, message.uuid, uuid.New())
- }
-
- return successFn(data.name, message.uuid)
- }
-}
-
-func makeConsumeAllFn(
- data consumerDataT,
- consumeOneFn func(messageT) error,
- eachFn func(string, string, func(messageT) error) error,
-) func(struct{}) {
- return func(struct{}) {
- err := eachFn(data.topic, data.name, consumeOneFn)
- if err != nil {
- g.Warning(
- "eachFn failed", "fiinha-consume-all",
- "topic", data.topic,
- "consumer", data.name,
- "error", err,
- "circuit-breaker-enabled?", false,
- )
- }
- }
-}
-
-func makeWaitFn(channel chan []byte, closeFn func()) func([]byte) {
- closed := false
- var mutex sync.Mutex
- return func(payload []byte) {
- mutex.Lock()
- defer mutex.Unlock()
- if closed {
- return
- }
-
- closeFn()
- channel <- payload
- close(channel)
- closed = true
- }
-}
-
-func runConsumer(onPing func(func(struct{})), consumeAllFn func(struct{})) {
- consumeAllFn(struct{}{})
- onPing(consumeAllFn)
-}
-
-func tryFinding(
- findFn func(string, uuid.UUID) (messageT, error),
- topic string,
- flowID uuid.UUID,
- waitFn func([]byte),
-) {
- message, err := findFn(topic, flowID)
- if err != nil {
- return
- }
-
- waitFn(message.payload)
-}
-
-func (queue queueT) Subscribe(
- topic string,
- name string,
- callback func(Message) error,
-) error {
- data := consumerDataT{
- topic: topic,
- name: name,
- }
- pinger := newPinger[struct{}]()
- consumer := consumerT{
- data: data,
- callback: callback,
- pinger: pinger,
- close: &pinger.close,
- }
- consumeOneFn := makeConsumeOneFn(
- consumer.data,
- consumer.callback,
- queue.queries.commit,
- queue.queries.toDead,
- )
- consumeAllFn := makeConsumeAllFn(
- consumer.data,
- consumeOneFn,
- queue.queries.pending,
- )
-
- err := queue.queries.take(topic, name)
- if err != nil {
- return err
- }
-
- queue.subscriptions.write(registerConsumerFn(consumer))
- go runConsumer(pinger.onPing, consumeAllFn)
- return nil
-}
-
-type Waiter struct{
- Channel <-chan []byte
- Close func()
-}
-
-func (queue queueT) WaitFor(
- topic string,
- flowID uuid.UUID,
- name string,
-) Waiter {
- data := waiterDataT{
- topic: topic,
- flowID: flowID,
- name: name,
- }
- pinger := newPinger[[]byte]()
- waiter := waiterT{
- data: data,
- pinger: pinger,
- closed: &pinger.closed,
- close: &pinger.close,
- }
- channel := make(chan []byte, 1)
- waitFn := makeWaitFn(channel, (*waiter.close))
- closeFn := func() {
- queue.subscriptions.read(func(set subscriptionsSetM) error {
- (*set[topic].waiters[flowID][name].close)()
- return nil
- })
- }
-
- queue.subscriptions.write(registerWaiterFn(waiter))
- tryFinding(queue.queries.find, topic, flowID, waitFn)
- go pinger.onPing(waitFn)
- return Waiter{channel, closeFn}
-}
-
-func unsubscribeIfExistsFn(
- topic string,
- name string,
-) func(subscriptionsSetM) error {
- return func(set subscriptionsSetM) error {
- topicSub, ok := set[topic]
- if !ok {
- return nil
- }
-
- consumer, ok := topicSub.consumers[name]
- if !ok {
- return nil
- }
-
- (*consumer.close)()
- delete(set[topic].consumers, name)
- deleteIfEmpty(set, topic)
- return nil
- }
-}
-
-func (queue queueT) Unsubscribe(topic string, name string) {
- queue.subscriptions.write(unsubscribeIfExistsFn(topic, name))
-}
-
-func cleanSubscriptions(set subscriptionsSetM) error {
- for _, topicSub := range set {
- for _, consumer := range topicSub.consumers {
- (*consumer.close)()
- }
- for _, waiters := range topicSub.waiters {
- for _, waiter := range waiters {
- (*waiter.close)()
- }
- }
- }
- return nil
-}
-
-func (queue queueT) Close() error {
- queue.pinger.close()
- return g.WrapErrors(
- queue.subscriptions.write(cleanSubscriptions),
- queue.queries.close(),
- )
-}
-
-
-func topicGetopt(args argsT, w io.Writer) (argsT, bool) {
- if len(args.args) == 0 {
- fmt.Fprintf(w, "Missing TOPIC.\n")
- return args, false
- }
-
- args.topic = args.args[0]
- return args, true
-}
-
-func topicConsumerGetopt(args argsT, w io.Writer) (argsT, bool) {
- fs := flag.NewFlagSet("", flag.ContinueOnError)
- fs.Usage = func() {}
- fs.SetOutput(w)
-
- consumer := fs.String(
- "C",
- "default-consumer",
- "The name of the consumer to be used",
- )
-
- if fs.Parse(args.args) != nil {
- return args, false
- }
-
- subArgs := fs.Args()
- if len(subArgs) == 0 {
- fmt.Fprintf(w, "Missing TOPIC.\n")
- return args, false
- }
-
- args.consumer = *consumer
- args.topic = subArgs[0]
- return args, true
-}
-
-func inExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- payload, err := io.ReadAll(r)
- if err != nil {
- return 1, err
- }
-
- unsent := UnsentMessage{
- Topic: args.topic,
- FlowID: uuid.New(),
- Payload: payload,
- }
- message, err := queries.publish(unsent, uuid.New())
- if err != nil {
- return 1, err
- }
-
- fmt.Fprintf(w, "%s\n", message.uuid.String())
-
- return 0, nil
-}
-
-func outExec(
- args argsT,
- queries queriesT,
- _ io.Reader,
- w io.Writer,
-) (int, error) {
- err := queries.take(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- message, err := queries.next(args.topic, args.consumer)
-
- if err == sql.ErrNoRows {
- return 3, nil
- }
-
- if err != nil {
- return 1, err
- }
-
- fmt.Fprintln(w, string(message.payload))
-
- return 0, nil
-}
-
-func commitExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- err := queries.take(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- message, err := queries.next(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- err = queries.commit(args.consumer, message.uuid)
- if err != nil {
- return 1, err
- }
-
- return 0, nil
-}
-
-func deadExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- err := queries.take(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- message, err := queries.next(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- err = queries.toDead(args.consumer, message.uuid, uuid.New())
- if err != nil {
- return 1, err
- }
-
- return 0, nil
-}
-
-func listDeadExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- eachFn := func(deadletter deadletterT, _ messageT) error {
- fmt.Fprintf(
- w,
- "%s\t%s\t%s\n",
- deadletter.uuid.String(),
- deadletter.timestamp.Format(time.RFC3339),
- deadletter.consumer,
- )
- return nil
- }
-
- err := queries.allDead(args.topic, args.consumer, eachFn)
- if err != nil {
- return 1, err
- }
-
- return 0, nil
-}
-
-func replayExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- deadletter, err := queries.oneDead(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- _, err = queries.replay(deadletter.uuid, uuid.New())
- if err != nil {
- return 1, err
- }
-
- return 0, nil
-}
-
-func sizeExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- size, err := queries.size(args.topic)
- if err != nil {
- return 1, err
- }
-
- fmt.Fprintln(w, size)
-
- return 0, nil
-}
-
-func countExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- count, err := queries.count(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- fmt.Fprintln(w, count)
-
- return 0, nil
-}
-
-func hasDataExec(
- args argsT,
- queries queriesT,
- r io.Reader,
- w io.Writer,
-) (int, error) {
- hasData, err := queries.hasData(args.topic, args.consumer)
- if err != nil {
- return 1, err
- }
-
- if hasData {
- return 0, nil
- } else {
- return 1, nil
- }
-}
-
-func usage(argv0 string, w io.Writer) {
- fmt.Fprintf(
- w,
- "Usage: %s [-f FILE] [-p PREFIX] COMMAND [OPTIONS]\n",
- argv0,
- )
-}
-
-func getopt(
- allArgs []string,
- commandsMap map[string]commandT,
- w io.Writer,
-) (argsT, commandT, int) {
- argv0 := allArgs[0]
- argv := allArgs[1:]
- fs := flag.NewFlagSet("", flag.ContinueOnError)
- fs.Usage = func() {}
- fs.SetOutput(w)
- databasePath := fs.String(
- "f",
- "fiinha.db",
- "The path to the file where the queue is kept",
- )
- prefix := fs.String(
- "p",
- defaultPrefix,
- "The fiinha prefix of the table names",
- )
- if fs.Parse(argv) != nil {
- usage(argv0, w)
- return argsT{}, commandT{}, 2
- }
-
- subArgs := fs.Args()
- if len(subArgs) == 0 {
- fmt.Fprintf(w, "Missing COMMAND.\n")
- usage(argv0, w)
- return argsT{}, commandT{}, 2
- }
-
- args := argsT{
- databasePath: *databasePath,
- prefix: *prefix,
- command: subArgs[0],
- allArgs: allArgs,
- args: subArgs[1:],
- }
-
- command := commandsMap[args.command]
- if command.name == "" {
- fmt.Fprintf(w, "Bad COMMAND: \"%s\".\n", args.command)
- usage(argv0, w)
- return argsT{}, commandT{}, 2
- }
-
- args, ok := command.getopt(args, w)
- if !ok {
- usage(argv0, w)
- return argsT{}, commandT{}, 2
- }
-
- return args, command, 0
-}
-
-func runCommand(
- args argsT,
- command commandT,
- stdin io.Reader,
- stdout io.Writer,
- stderr io.Writer,
-) int {
- iqueue, err := NewWithPrefix(args.databasePath, args.prefix)
- if err != nil {
- fmt.Fprintln(stderr, err)
- return 1
- }
- defer iqueue.Close()
-
- rc, err := command.exec(args, iqueue.(queueT).queries, stdin, stdout)
- if err != nil {
- fmt.Fprintln(stderr, err)
- }
-
- return rc
-}
-
-var commands = map[string]commandT{
- "in": commandT{
- name: "in",
- getopt: topicGetopt,
- exec: inExec,
- },
- "out": commandT{
- name: "out",
- getopt: topicConsumerGetopt,
- exec: outExec,
- },
- "commit": commandT{
- name: "commit",
- getopt: topicConsumerGetopt,
- exec: commitExec,
- },
- "dead": commandT{
- name: "dead",
- getopt: topicConsumerGetopt,
- exec: deadExec,
- },
- "ls-dead": commandT{
- name: "ls-dead",
- getopt: topicConsumerGetopt,
- exec: listDeadExec,
- },
- "replay": commandT{
- name: "replay",
- getopt: topicConsumerGetopt,
- exec: replayExec,
- },
- "size": commandT{
- name: "size",
- getopt: topicGetopt,
- exec: sizeExec,
- },
- "count": commandT{
- name: "count",
- getopt: topicConsumerGetopt,
- exec: countExec,
- },
- "has-data": commandT{
- name: "has-data",
- getopt: topicConsumerGetopt,
- exec: hasDataExec,
- },
-}
-
-
-
-func Main() {
- g.Init()
- args, command, rc := getopt(os.Args, commands, os.Stderr)
- if rc != 0 {
- os.Exit(rc)
- }
- os.Exit(runCommand(args, command, os.Stdin, os.Stdout, os.Stderr))
-}