1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
package q
import (
"database/sql"
"os"
"runtime"
"golite"
"guuid"
g "gobang"
)
const topic = "topic"
func MainTest() {
_, file, _, ok := runtime.Caller(0)
g.TAssertEqualS(ok, true, "can't get filename")
databasePath := file + ".db"
os.Remove(databasePath)
os.Remove(databasePath + "-shm")
os.Remove(databasePath + "-wal")
db, err := sql.Open(golite.DriverName, databasePath)
g.TErrorIf(err)
defer db.Close()
queue, err := New(db)
g.TErrorIf(err)
defer queue.Close()
pub := func(flowID guuid.UUID, payload []byte) {
unsent := UnsentMessage{
Topic: topic,
FlowID: flowID,
Payload: payload,
}
_, err := queue.Publish(unsent)
g.TErrorIf(err)
}
g.Testing("we can WaitFor() a message before its publishing", func() {
flowID := guuid.New()
waiter := queue.WaitFor(topic, flowID, "waiter").Channel
pub(flowID, []byte("payload before"))
given := <- waiter
g.TAssertEqual(given, []byte("payload before"))
})
g.Testing("we can also do it after its publishing", func() {
flowID := guuid.New()
pub(flowID, []byte("payload after"))
given := <- queue.WaitFor(topic, flowID, "waiter").Channel
g.TAssertEqual(given, []byte("payload after"))
})
}
|