diff options
Diffstat (limited to 'tests/queries.sql')
-rw-r--r-- | tests/queries.sql | 107 |
1 files changed, 80 insertions, 27 deletions
diff --git a/tests/queries.sql b/tests/queries.sql index c821e25..e790d41 100644 --- a/tests/queries.sql +++ b/tests/queries.sql @@ -27,6 +27,7 @@ consumer TEXT NOT NULL, message_id INTEGER NOT NULL REFERENCES "q_messages"(id), + instance_id INTEGER NOT NULL, UNIQUE (consumer, message_id) ) STRICT; CREATE INDEX IF NOT EXISTS "q_offsets_consumer" @@ -38,6 +39,7 @@ consumer TEXT NOT NULL, message_id INTEGER NOT NULL REFERENCES "q_messages"(id), + instance_id INTEGER NOT NULL, UNIQUE (consumer, message_id) ) STRICT; CREATE INDEX IF NOT EXISTS "q_deadletters_consumer" @@ -58,6 +60,44 @@ owner_id INTEGER NOT NULL, UNIQUE (topic, consumer) ) STRICT; + + CREATE TRIGGER IF NOT EXISTS "q_check_instance_owns_topic" + BEFORE INSERT ON "q_offsets" + WHEN NEW.instance_id != ( + SELECT owner_id FROM "q_owners" + WHERE topic = ( + SELECT "q_payloads".topic + FROM "q_payloads" + JOIN "q_messages" ON "q_payloads".id = + "q_messages".payload_id + WHERE "q_messages".id = NEW.message_id + ) AND consumer = NEW.consumer + ) + BEGIN + SELECT RAISE( + ABORT, + 'instance does not own topic/consumer combo' + ); + END; + + CREATE TRIGGER IF NOT EXISTS "q_check_can_publish_deadletter" + BEFORE INSERT ON "q_deadletters" + WHEN NEW.instance_id != ( + SELECT owner_id FROM "q_owners" + WHERE topic = ( + SELECT "q_payloads".topic + FROM "q_payloads" + JOIN "q_messages" ON "q_payloads".id = + "q_messages".payload_id + WHERE "q_messages".id = NEW.message_id + ) AND consumer = NEW.consumer + ) + BEGIN + SELECT RAISE( + ABORT, + 'Instance does not own topic/consumer combo' + ); + END; -- read: @@ -81,7 +121,6 @@ INSERT INTO "q_payloads" (topic, payload) VALUES (?, ?); - -- FIXME: must be inside a trnsaction INSERT INTO "q_messages" (uuid, flow_id, payload_id) VALUES (?, ?, last_insert_rowid()); @@ -114,6 +153,38 @@ -- owner: +-- next.sql: +-- write: + +-- read: + SELECT + ( + SELECT owner_id FROM "q_owners" + WHERE + topic = ? AND + consumer = ? + LIMIT 1 + ) AS owner_id, + "q_messages".id, + "q_messages".timestamp, + "q_messages".uuid, + "q_messages".flow_id, + "q_payloads".payload + FROM "q_messages" + JOIN "q_payloads" ON + "q_payloads".id = "q_messages".payload_id + WHERE + "q_payloads".topic = ? AND + "q_messages".id NOT IN ( + SELECT message_id FROM "q_offsets" + WHERE consumer = ? + ) + ORDER BY "q_messages".id ASC + LIMIT 1; + + +-- owner: + -- pending.sql: -- write: @@ -146,46 +217,28 @@ -- commit.sql: -- write: - INSERT INTO "q_offsets" (consumer, message_id) - VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?)); + INSERT INTO "q_offsets" (consumer, message_id, instance_id) + VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?); -- read: - SELECT "q_payloads".topic from "q_payloads" - JOIN "q_messages" ON - "q_payloads".id = "q_messages".payload_id - WHERE "q_messages".uuid = ?; - -- owner: - SELECT owner_id FROM "q_owners" - WHERE - topic = ? AND - consumer = ?; - -- toDead.sql: -- write: - INSERT INTO "q_offsets" ( consumer, message_id) - VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?)); + INSERT INTO "q_offsets" + ( consumer, message_id, instance_id) + VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?); - INSERT INTO "q_deadletters" (uuid, consumer, message_id) - VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?)); + INSERT INTO "q_deadletters" + (uuid, consumer, message_id, instance_id) + VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?); -- read: - SELECT "q_payloads".topic FROM "q_payloads" - JOIN "q_messages" ON - "q_payloads".id = "q_messages".payload_id - WHERE "q_messages".uuid = ?; - -- owner: - SELECT owner_id FROM "q_owners" - WHERE - topic = ? AND - consumer = ?; - -- replay.sql: -- write: |