From ab1795aeb8f00b61c331ac77fdc1011ec14c5253 Mon Sep 17 00:00:00 2001 From: EuAndreh Date: Tue, 17 Sep 2024 08:01:05 -0300 Subject: Initial version: first implementation --- tests/functional/wait-after-publish/q.go | 64 ++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 tests/functional/wait-after-publish/q.go (limited to 'tests/functional/wait-after-publish/q.go') diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go new file mode 100644 index 0000000..701258a --- /dev/null +++ b/tests/functional/wait-after-publish/q.go @@ -0,0 +1,64 @@ +package q + +import ( + "database/sql" + "os" + "runtime" + + g "gobang" + "guuid" +) + + + +const topic = "topic" + + + +func MainTest() { + _, file, _, ok := runtime.Caller(0) + g.TAssertEqualS(ok, true, "can't get filename") + + databasePath := file + ".db" + os.Remove(databasePath) + os.Remove(databasePath + "-shm") + os.Remove(databasePath + "-wal") + + db, err := sql.Open("acude", databasePath) + g.TErrorIf(err) + defer db.Close() + + queue, err := New(db) + g.TErrorIf(err) + defer queue.Close() + + pub := func(flowID guuid.UUID, payload []byte) { + unsent := UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: payload, + } + _, err := queue.Publish(unsent) + g.TErrorIf(err) + } + + + g.Testing("we can WaitFor() a message before its publishing", func() { + flowID := guuid.New() + waiter := queue.WaitFor(topic, flowID, "waiter").Channel + + pub(flowID, []byte("payload before")) + + given := <- waiter + g.TAssertEqual(given, []byte("payload before")) + }) + + g.Testing("we can also do it after its publishing", func() { + flowID := guuid.New() + + pub(flowID, []byte("payload after")) + + given := <- queue.WaitFor(topic, flowID, "waiter").Channel + g.TAssertEqual(given, []byte("payload after")) + }) +} -- cgit v1.2.3