-- createTables.sql: -- write: CREATE TABLE IF NOT EXISTS "q_payloads" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), topic TEXT NOT NULL, payload BLOB NOT NULL ) STRICT; CREATE INDEX IF NOT EXISTS "q_payloads_topic" ON "q_payloads"(topic); CREATE TABLE IF NOT EXISTS "q_messages" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), uuid BLOB NOT NULL UNIQUE, flow_id BLOB NOT NULL, payload_id INTEGER NOT NULL REFERENCES "q_payloads"(id) ) STRICT; CREATE INDEX IF NOT EXISTS "q_messages_flow_id" ON "q_messages"(flow_id); CREATE TABLE IF NOT EXISTS "q_offsets" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')), consumer TEXT NOT NULL, message_id INTEGER NOT NULL REFERENCES "q_messages"(id), UNIQUE (consumer, message_id) ) STRICT; CREATE INDEX IF NOT EXISTS "q_offsets_consumer" ON "q_offsets"(consumer); CREATE TABLE IF NOT EXISTS "q_deadletters" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, uuid BLOB NOT NULL UNIQUE, consumer TEXT NOT NULL, message_id INTEGER NOT NULL REFERENCES "q_messages"(id), UNIQUE (consumer, message_id) ) STRICT; CREATE INDEX IF NOT EXISTS "q_deadletters_consumer" ON "q_deadletters"(consumer); CREATE TABLE IF NOT EXISTS "q_replays" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, deadletter_id INTEGER NOT NULL UNIQUE REFERENCES "q_deadletters"(id) , message_id INTEGER NOT NULL UNIQUE REFERENCES "q_messages"(id) ) STRICT; CREATE TABLE IF NOT EXISTS "q_owners" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, topic TEXT NOT NULL, consumer TEXT NOT NULL, owner_id INTEGER NOT NULL, UNIQUE (topic, consumer) ) STRICT; -- read: -- owner: -- take.sql: -- write: INSERT INTO "q_owners" (topic, consumer, owner_id) VALUES (?, ?, ?) ON CONFLICT (topic, consumer) DO UPDATE SET owner_id=excluded.owner_id; -- read: -- owner: -- publish.sql: -- write: INSERT INTO "q_payloads" (topic, payload) VALUES (?, ?); -- FIXME: must be inside a trnsaction INSERT INTO "q_messages" (uuid, flow_id, payload_id) VALUES (?, ?, last_insert_rowid()); -- read: SELECT id, timestamp FROM "q_messages" WHERE uuid = ?; -- owner: -- find.sql: -- write: -- read: SELECT "q_messages".id, "q_messages".timestamp, "q_messages".uuid, "q_payloads".payload FROM "q_messages" JOIN "q_payloads" ON "q_payloads".id = "q_messages".payload_id WHERE "q_payloads".topic = ? AND "q_messages".flow_id = ? ORDER BY "q_messages".id DESC LIMIT 1; -- owner: -- pending.sql: -- write: -- read: SELECT "q_messages".id, "q_messages".timestamp, "q_messages".uuid, "q_messages".flow_id, "q_payloads".topic, "q_payloads".payload FROM "q_messages" JOIN "q_payloads" ON "q_payloads".id = "q_messages".payload_id WHERE "q_payloads".topic = ? AND "q_messages".id NOT IN ( SELECT message_id FROM "q_offsets" WHERE consumer = ? ) ORDER BY "q_messages".id ASC; -- owner: SELECT owner_id FROM "q_owners" WHERE topic = ? AND consumer = ?; -- commit.sql: -- write: INSERT INTO "q_offsets" (consumer, message_id) VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?)); -- read: SELECT "q_payloads".topic from "q_payloads" JOIN "q_messages" ON "q_payloads".id = "q_messages".payload_id WHERE "q_messages".uuid = ?; -- owner: SELECT owner_id FROM "q_owners" WHERE topic = ? AND consumer = ?; -- toDead.sql: -- write: INSERT INTO "q_offsets" ( consumer, message_id) VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?)); INSERT INTO "q_deadletters" (uuid, consumer, message_id) VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?)); -- read: SELECT "q_payloads".topic FROM "q_payloads" JOIN "q_messages" ON "q_payloads".id = "q_messages".payload_id WHERE "q_messages".uuid = ?; -- owner: SELECT owner_id FROM "q_owners" WHERE topic = ? AND consumer = ?; -- replay.sql: -- write: INSERT INTO "q_messages" (uuid, flow_id, payload_id) SELECT ?, "q_messages".flow_id, "q_messages".payload_id FROM "q_messages" JOIN "q_deadletters" ON "q_messages".id = "q_deadletters".message_id WHERE "q_deadletters".uuid = ?; INSERT INTO "q_replays" (deadletter_id, message_id) VALUES ( (SELECT id FROM "q_deadletters" WHERE uuid = ?), last_insert_rowid() ); -- read: SELECT "q_messages".id, "q_messages".timestamp, "q_messages".flow_id, "q_payloads".topic, "q_payloads".payload FROM "q_messages" JOIN "q_payloads" ON "q_payloads".id = "q_messages".payload_id WHERE "q_messages".uuid = ?; -- owner: -- oneDead.sql: -- write: -- read: SELECT "q_deadletters".uuid, "q_offsets".timestamp, "q_messages".uuid FROM "q_deadletters" JOIN "q_offsets" ON "q_deadletters".message_id = "q_offsets".message_id JOIN "q_messages" ON "q_deadletters".message_id = "q_messages".id JOIN "q_payloads" ON "q_messages".payload_id = "q_payloads".id WHERE "q_payloads".topic = ? AND "q_deadletters".consumer = ? AND "q_offsets".consumer = ? AND "q_deadletters".id NOT IN ( SELECT deadletter_id FROM "q_replays" ) ORDER BY "q_deadletters".id ASC LIMIT 1; -- owner: -- allDead.sql: -- write: -- read: SELECT "q_deadletters".uuid, "q_deadletters".message_id, "q_offsets".timestamp, "q_offsets".consumer, "q_messages".timestamp, "q_messages".uuid, "q_messages".flow_id, "q_payloads".topic, "q_payloads".payload FROM "q_deadletters" JOIN "q_offsets" ON "q_deadletters".message_id = "q_offsets".message_id JOIN "q_messages" ON "q_deadletters".message_id = "q_messages".id JOIN "q_payloads" ON "q_messages".payload_id = "q_payloads".id WHERE "q_payloads".topic = ? AND "q_deadletters".consumer = ? AND "q_offsets".consumer = ? AND "q_deadletters".id NOT IN ( SELECT deadletter_id FROM "q_replays" ) ORDER BY "q_deadletters".id ASC; -- owner: -- size.sql: -- write: -- read: SELECT COUNT(1) as size FROM "q_messages" JOIN "q_payloads" ON "q_messages".payload_id = "q_payloads".id WHERE "q_payloads".topic = ?; -- owner: -- count.sql: -- write: -- read: SELECT COUNT(1) as count FROM "q_messages" JOIN "q_offsets" ON "q_messages".id = "q_offsets".message_id JOIN "q_payloads" ON "q_messages".payload_id = "q_payloads".id WHERE "q_payloads".topic = ? AND "q_offsets".consumer = ?; -- owner: -- hasData.sql: -- write: -- read: SELECT 1 as data FROM "q_messages" JOIN "q_payloads" ON "q_payloads".id = "q_messages".payload_id WHERE "q_payloads".topic = ? AND "q_messages".id NOT IN ( SELECT message_id FROM "q_offsets" WHERE consumer = ? ) LIMIT 1; -- owner: