-- createTables.sql: -- write: CREATE TABLE IF NOT EXISTS "fiinha_payloads" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), topic TEXT NOT NULL, payload BLOB NOT NULL ) STRICT; CREATE INDEX IF NOT EXISTS "fiinha_payloads_topic" ON "fiinha_payloads"(topic); CREATE TABLE IF NOT EXISTS "fiinha_messages" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), uuid BLOB NOT NULL UNIQUE, flow_id BLOB NOT NULL, payload_id INTEGER NOT NULL REFERENCES "fiinha_payloads"(id) ) STRICT; CREATE INDEX IF NOT EXISTS "fiinha_messages_flow_id" ON "fiinha_messages"(flow_id); CREATE TABLE IF NOT EXISTS "fiinha_offsets" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), consumer TEXT NOT NULL, message_id INTEGER NOT NULL REFERENCES "fiinha_messages"(id), instance_id INTEGER NOT NULL, UNIQUE (consumer, message_id) ) STRICT; CREATE INDEX IF NOT EXISTS "fiinha_offsets_consumer" ON "fiinha_offsets"(consumer); CREATE TABLE IF NOT EXISTS "fiinha_deadletters" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, uuid BLOB NOT NULL UNIQUE, consumer TEXT NOT NULL, message_id INTEGER NOT NULL REFERENCES "fiinha_messages"(id), instance_id INTEGER NOT NULL, UNIQUE (consumer, message_id) ) STRICT; CREATE INDEX IF NOT EXISTS "fiinha_deadletters_consumer" ON "fiinha_deadletters"(consumer); CREATE TABLE IF NOT EXISTS "fiinha_replays" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, deadletter_id INTEGER NOT NULL UNIQUE REFERENCES "fiinha_deadletters"(id) , message_id INTEGER NOT NULL UNIQUE REFERENCES "fiinha_messages"(id) ) STRICT; CREATE TABLE IF NOT EXISTS "fiinha_owners" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, topic TEXT NOT NULL, consumer TEXT NOT NULL, owner_id INTEGER NOT NULL, UNIQUE (topic, consumer) ) STRICT; CREATE TRIGGER IF NOT EXISTS "fiinha_check_instance_owns_topic" BEFORE INSERT ON "fiinha_offsets" WHEN NEW.instance_id != ( SELECT owner_id FROM "fiinha_owners" WHERE topic = ( SELECT "fiinha_payloads".topic FROM "fiinha_payloads" JOIN "fiinha_messages" ON "fiinha_payloads".id = "fiinha_messages".payload_id WHERE "fiinha_messages".id = NEW.message_id ) AND consumer = NEW.consumer ) BEGIN SELECT RAISE( ABORT, 'instance does not own topic/consumer combo' ); END; CREATE TRIGGER IF NOT EXISTS "fiinha_check_can_publish_deadletter" BEFORE INSERT ON "fiinha_deadletters" WHEN NEW.instance_id != ( SELECT owner_id FROM "fiinha_owners" WHERE topic = ( SELECT "fiinha_payloads".topic FROM "fiinha_payloads" JOIN "fiinha_messages" ON "fiinha_payloads".id = "fiinha_messages".payload_id WHERE "fiinha_messages".id = NEW.message_id ) AND consumer = NEW.consumer ) BEGIN SELECT RAISE( ABORT, 'Instance does not own topic/consumer combo' ); END; -- read: -- owner: -- take.sql: -- write: INSERT INTO "fiinha_owners" (topic, consumer, owner_id) VALUES (?, ?, ?) ON CONFLICT (topic, consumer) DO UPDATE SET owner_id=excluded.owner_id; -- read: -- owner: -- publish.sql: -- write: INSERT INTO "fiinha_payloads" (topic, payload) VALUES (?, ?); INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id) VALUES (?, ?, last_insert_rowid()); -- read: SELECT id, timestamp FROM "fiinha_messages" WHERE uuid = ?; -- owner: -- find.sql: -- write: -- read: SELECT "fiinha_messages".id, "fiinha_messages".timestamp, "fiinha_messages".uuid, "fiinha_payloads".payload FROM "fiinha_messages" JOIN "fiinha_payloads" ON "fiinha_payloads".id = "fiinha_messages".payload_id WHERE "fiinha_payloads".topic = ? AND "fiinha_messages".flow_id = ? ORDER BY "fiinha_messages".id DESC LIMIT 1; -- owner: -- next.sql: -- write: -- read: SELECT ( SELECT owner_id FROM "fiinha_owners" WHERE topic = ? AND consumer = ? LIMIT 1 ) AS owner_id, "fiinha_messages".id, "fiinha_messages".timestamp, "fiinha_messages".uuid, "fiinha_messages".flow_id, "fiinha_payloads".payload FROM "fiinha_messages" JOIN "fiinha_payloads" ON "fiinha_payloads".id = "fiinha_messages".payload_id WHERE "fiinha_payloads".topic = ? AND "fiinha_messages".id NOT IN ( SELECT message_id FROM "fiinha_offsets" WHERE consumer = ? ) ORDER BY "fiinha_messages".id ASC LIMIT 1; -- owner: -- pending.sql: -- write: -- read: SELECT "fiinha_messages".id, "fiinha_messages".timestamp, "fiinha_messages".uuid, "fiinha_messages".flow_id, "fiinha_payloads".topic, "fiinha_payloads".payload FROM "fiinha_messages" JOIN "fiinha_payloads" ON "fiinha_payloads".id = "fiinha_messages".payload_id WHERE "fiinha_payloads".topic = ? AND "fiinha_messages".id NOT IN ( SELECT message_id FROM "fiinha_offsets" WHERE consumer = ? ) ORDER BY "fiinha_messages".id ASC; -- owner: SELECT owner_id FROM "fiinha_owners" WHERE topic = ? AND consumer = ?; -- commit.sql: -- write: INSERT INTO "fiinha_offsets" (consumer, message_id, instance_id) VALUES (?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?); -- read: -- owner: -- toDead.sql: -- write: INSERT INTO "fiinha_offsets" ( consumer, message_id, instance_id) VALUES ( ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?); INSERT INTO "fiinha_deadletters" (uuid, consumer, message_id, instance_id) VALUES (?, ?, (SELECT id FROM "fiinha_messages" WHERE uuid = ?), ?); -- read: -- owner: -- replay.sql: -- write: INSERT INTO "fiinha_messages" (uuid, flow_id, payload_id) SELECT ?, "fiinha_messages".flow_id, "fiinha_messages".payload_id FROM "fiinha_messages" JOIN "fiinha_deadletters" ON "fiinha_messages".id = "fiinha_deadletters".message_id WHERE "fiinha_deadletters".uuid = ?; INSERT INTO "fiinha_replays" (deadletter_id, message_id) VALUES ( (SELECT id FROM "fiinha_deadletters" WHERE uuid = ?), last_insert_rowid() ); -- read: SELECT "fiinha_messages".id, "fiinha_messages".timestamp, "fiinha_messages".flow_id, "fiinha_payloads".topic, "fiinha_payloads".payload FROM "fiinha_messages" JOIN "fiinha_payloads" ON "fiinha_payloads".id = "fiinha_messages".payload_id WHERE "fiinha_messages".uuid = ?; -- owner: -- oneDead.sql: -- write: -- read: SELECT "fiinha_deadletters".uuid, "fiinha_offsets".timestamp, "fiinha_messages".uuid FROM "fiinha_deadletters" JOIN "fiinha_offsets" ON "fiinha_deadletters".message_id = "fiinha_offsets".message_id JOIN "fiinha_messages" ON "fiinha_deadletters".message_id = "fiinha_messages".id JOIN "fiinha_payloads" ON "fiinha_messages".payload_id = "fiinha_payloads".id WHERE "fiinha_payloads".topic = ? AND "fiinha_deadletters".consumer = ? AND "fiinha_offsets".consumer = ? AND "fiinha_deadletters".id NOT IN ( SELECT deadletter_id FROM "fiinha_replays" ) ORDER BY "fiinha_deadletters".id ASC LIMIT 1; -- owner: -- allDead.sql: -- write: -- read: SELECT "fiinha_deadletters".uuid, "fiinha_deadletters".message_id, "fiinha_offsets".timestamp, "fiinha_offsets".consumer, "fiinha_messages".timestamp, "fiinha_messages".uuid, "fiinha_messages".flow_id, "fiinha_payloads".topic, "fiinha_payloads".payload FROM "fiinha_deadletters" JOIN "fiinha_offsets" ON "fiinha_deadletters".message_id = "fiinha_offsets".message_id JOIN "fiinha_messages" ON "fiinha_deadletters".message_id = "fiinha_messages".id JOIN "fiinha_payloads" ON "fiinha_messages".payload_id = "fiinha_payloads".id WHERE "fiinha_payloads".topic = ? AND "fiinha_deadletters".consumer = ? AND "fiinha_offsets".consumer = ? AND "fiinha_deadletters".id NOT IN ( SELECT deadletter_id FROM "fiinha_replays" ) ORDER BY "fiinha_deadletters".id ASC; -- owner: -- size.sql: -- write: -- read: SELECT COUNT(1) as size FROM "fiinha_messages" JOIN "fiinha_payloads" ON "fiinha_messages".payload_id = "fiinha_payloads".id WHERE "fiinha_payloads".topic = ?; -- owner: -- count.sql: -- write: -- read: SELECT COUNT(1) as count FROM "fiinha_messages" JOIN "fiinha_offsets" ON "fiinha_messages".id = "fiinha_offsets".message_id JOIN "fiinha_payloads" ON "fiinha_messages".payload_id = "fiinha_payloads".id WHERE "fiinha_payloads".topic = ? AND "fiinha_offsets".consumer = ?; -- owner: -- hasData.sql: -- write: -- read: SELECT 1 as data FROM "fiinha_messages" JOIN "fiinha_payloads" ON "fiinha_payloads".id = "fiinha_messages".payload_id WHERE "fiinha_payloads".topic = ? AND "fiinha_messages".id NOT IN ( SELECT message_id FROM "fiinha_offsets" WHERE consumer = ? ) LIMIT 1; -- owner: