summaryrefslogtreecommitdiff
path: root/tests/functional/wait-after-publish/q.go
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/wait-after-publish/q.go')
-rw-r--r--tests/functional/wait-after-publish/q.go64
1 files changed, 64 insertions, 0 deletions
diff --git a/tests/functional/wait-after-publish/q.go b/tests/functional/wait-after-publish/q.go
new file mode 100644
index 0000000..701258a
--- /dev/null
+++ b/tests/functional/wait-after-publish/q.go
@@ -0,0 +1,64 @@
+package q
+
+import (
+ "database/sql"
+ "os"
+ "runtime"
+
+ g "gobang"
+ "guuid"
+)
+
+
+
+const topic = "topic"
+
+
+
+func MainTest() {
+ _, file, _, ok := runtime.Caller(0)
+ g.TAssertEqualS(ok, true, "can't get filename")
+
+ databasePath := file + ".db"
+ os.Remove(databasePath)
+ os.Remove(databasePath + "-shm")
+ os.Remove(databasePath + "-wal")
+
+ db, err := sql.Open("acude", databasePath)
+ g.TErrorIf(err)
+ defer db.Close()
+
+ queue, err := New(db)
+ g.TErrorIf(err)
+ defer queue.Close()
+
+ pub := func(flowID guuid.UUID, payload []byte) {
+ unsent := UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: payload,
+ }
+ _, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+ }
+
+
+ g.Testing("we can WaitFor() a message before its publishing", func() {
+ flowID := guuid.New()
+ waiter := queue.WaitFor(topic, flowID, "waiter").Channel
+
+ pub(flowID, []byte("payload before"))
+
+ given := <- waiter
+ g.TAssertEqual(given, []byte("payload before"))
+ })
+
+ g.Testing("we can also do it after its publishing", func() {
+ flowID := guuid.New()
+
+ pub(flowID, []byte("payload after"))
+
+ given := <- queue.WaitFor(topic, flowID, "waiter").Channel
+ g.TAssertEqual(given, []byte("payload after"))
+ })
+}