summaryrefslogtreecommitdiff
path: root/tests/functional/wait-after-publish/q.go
blob: d3426aef4b7890b80d6e62b83850e2b9d56b44e9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package q

import (
	"database/sql"
	"os"
	"runtime"

	"golite"
	"guuid"
	g "gobang"
)



const topic = "topic"



func MainTest() {
	_, file, _, ok := runtime.Caller(0)
	g.TAssertEqualS(ok, true, "can't get filename")

	databasePath := file + ".db"
	os.Remove(databasePath)
	os.Remove(databasePath + "-shm")
	os.Remove(databasePath + "-wal")

	db, err := sql.Open(golite.DriverName, databasePath)
	g.TErrorIf(err)
	defer db.Close()

	queue, err := New(db)
	g.TErrorIf(err)
	defer queue.Close()

	pub := func(flowID guuid.UUID, payload []byte) {
		unsent := UnsentMessage{
			Topic:   topic,
			FlowID:  flowID,
			Payload: payload,
		}
		_, err := queue.Publish(unsent)
		g.TErrorIf(err)
	}


	g.Testing("we can WaitFor() a message before its publishing", func() {
		flowID := guuid.New()
		waiter := queue.WaitFor(topic, flowID, "waiter").Channel

		pub(flowID, []byte("payload before"))

		given := <- waiter
		g.TAssertEqual(given, []byte("payload before"))
	})

	g.Testing("we can also do it after its publishing", func() {
		flowID := guuid.New()

		pub(flowID, []byte("payload after"))

		given := <- queue.WaitFor(topic, flowID, "waiter").Channel
		g.TAssertEqual(given, []byte("payload after"))
	})
}