summaryrefslogtreecommitdiff
path: root/tests/queries.sql
diff options
context:
space:
mode:
Diffstat (limited to 'tests/queries.sql')
-rw-r--r--tests/queries.sql107
1 files changed, 80 insertions, 27 deletions
diff --git a/tests/queries.sql b/tests/queries.sql
index c821e25..e790d41 100644
--- a/tests/queries.sql
+++ b/tests/queries.sql
@@ -27,6 +27,7 @@
consumer TEXT NOT NULL,
message_id INTEGER NOT NULL
REFERENCES "q_messages"(id),
+ instance_id INTEGER NOT NULL,
UNIQUE (consumer, message_id)
) STRICT;
CREATE INDEX IF NOT EXISTS "q_offsets_consumer"
@@ -38,6 +39,7 @@
consumer TEXT NOT NULL,
message_id INTEGER NOT NULL
REFERENCES "q_messages"(id),
+ instance_id INTEGER NOT NULL,
UNIQUE (consumer, message_id)
) STRICT;
CREATE INDEX IF NOT EXISTS "q_deadletters_consumer"
@@ -58,6 +60,44 @@
owner_id INTEGER NOT NULL,
UNIQUE (topic, consumer)
) STRICT;
+
+ CREATE TRIGGER IF NOT EXISTS "q_check_instance_owns_topic"
+ BEFORE INSERT ON "q_offsets"
+ WHEN NEW.instance_id != (
+ SELECT owner_id FROM "q_owners"
+ WHERE topic = (
+ SELECT "q_payloads".topic
+ FROM "q_payloads"
+ JOIN "q_messages" ON "q_payloads".id =
+ "q_messages".payload_id
+ WHERE "q_messages".id = NEW.message_id
+ ) AND consumer = NEW.consumer
+ )
+ BEGIN
+ SELECT RAISE(
+ ABORT,
+ 'instance does not own topic/consumer combo'
+ );
+ END;
+
+ CREATE TRIGGER IF NOT EXISTS "q_check_can_publish_deadletter"
+ BEFORE INSERT ON "q_deadletters"
+ WHEN NEW.instance_id != (
+ SELECT owner_id FROM "q_owners"
+ WHERE topic = (
+ SELECT "q_payloads".topic
+ FROM "q_payloads"
+ JOIN "q_messages" ON "q_payloads".id =
+ "q_messages".payload_id
+ WHERE "q_messages".id = NEW.message_id
+ ) AND consumer = NEW.consumer
+ )
+ BEGIN
+ SELECT RAISE(
+ ABORT,
+ 'Instance does not own topic/consumer combo'
+ );
+ END;
-- read:
@@ -81,7 +121,6 @@
INSERT INTO "q_payloads" (topic, payload)
VALUES (?, ?);
- -- FIXME: must be inside a trnsaction
INSERT INTO "q_messages" (uuid, flow_id, payload_id)
VALUES (?, ?, last_insert_rowid());
@@ -114,6 +153,38 @@
-- owner:
+-- next.sql:
+-- write:
+
+-- read:
+ SELECT
+ (
+ SELECT owner_id FROM "q_owners"
+ WHERE
+ topic = ? AND
+ consumer = ?
+ LIMIT 1
+ ) AS owner_id,
+ "q_messages".id,
+ "q_messages".timestamp,
+ "q_messages".uuid,
+ "q_messages".flow_id,
+ "q_payloads".payload
+ FROM "q_messages"
+ JOIN "q_payloads" ON
+ "q_payloads".id = "q_messages".payload_id
+ WHERE
+ "q_payloads".topic = ? AND
+ "q_messages".id NOT IN (
+ SELECT message_id FROM "q_offsets"
+ WHERE consumer = ?
+ )
+ ORDER BY "q_messages".id ASC
+ LIMIT 1;
+
+
+-- owner:
+
-- pending.sql:
-- write:
@@ -146,46 +217,28 @@
-- commit.sql:
-- write:
- INSERT INTO "q_offsets" (consumer, message_id)
- VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+ INSERT INTO "q_offsets" (consumer, message_id, instance_id)
+ VALUES (?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?);
-- read:
- SELECT "q_payloads".topic from "q_payloads"
- JOIN "q_messages" ON
- "q_payloads".id = "q_messages".payload_id
- WHERE "q_messages".uuid = ?;
-
-- owner:
- SELECT owner_id FROM "q_owners"
- WHERE
- topic = ? AND
- consumer = ?;
-
-- toDead.sql:
-- write:
- INSERT INTO "q_offsets" ( consumer, message_id)
- VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+ INSERT INTO "q_offsets"
+ ( consumer, message_id, instance_id)
+ VALUES ( ?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?);
- INSERT INTO "q_deadletters" (uuid, consumer, message_id)
- VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?));
+ INSERT INTO "q_deadletters"
+ (uuid, consumer, message_id, instance_id)
+ VALUES (?, ?, (SELECT id FROM "q_messages" WHERE uuid = ?), ?);
-- read:
- SELECT "q_payloads".topic FROM "q_payloads"
- JOIN "q_messages" ON
- "q_payloads".id = "q_messages".payload_id
- WHERE "q_messages".uuid = ?;
-
-- owner:
- SELECT owner_id FROM "q_owners"
- WHERE
- topic = ? AND
- consumer = ?;
-
-- replay.sql:
-- write: