1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
package q
import (
"fmt"
"runtime"
"os"
"guuid"
g "gobang"
)
const topic = "topic"
func pub(queue IQueue, topic string, flowID guuid.UUID) {
unsent := UnsentMessage{
Topic: topic,
FlowID: flowID,
Payload: []byte{},
}
_, err := queue.Publish(unsent)
g.TErrorIf(err)
}
func handlerFn(publish func(guuid.UUID)) func(Message) error {
return func(message Message) error {
publish(message.FlowID)
return nil
}
}
func startInstance(
databasePath string,
instanceID int,
name string,
) (IQueue, error) {
iqueue, err := New(databasePath)
g.TErrorIf(err)
queue := iqueue.(queueT)
notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
queries, err := initDB(queue.db, defaultPrefix, notifyFn, instanceID)
g.TErrorIf(err)
err = queue.queries.close()
g.TErrorIf(err)
queue.queries = queries
pub_ := func(topic string) func(guuid.UUID) {
return func(flowID guuid.UUID) {
pub(queue, topic, flowID)
}
}
individual := "individual-" + name
shared := "shared"
queue.Subscribe(topic, individual, handlerFn(pub_(individual)))
queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name)))
return queue, nil
}
func MainTest() {
// https://sqlite.org/forum/forumpost/2507664507
g.Init()
_, file, _, ok := runtime.Caller(0)
g.TAssertEqualS(ok, true, "can't get filename")
dbpath := file + ".db"
dbpath = "/mnt/dois/andreh/t.db"
os.Remove(dbpath)
os.Remove(dbpath + "-shm")
os.Remove(dbpath + "-wal")
// FIXME
return
instanceID1 := os.Getpid()
instanceID2 := instanceID1 + 1
flowID1 := guuid.New()
flowID2 := guuid.New()
g.Testing("new instances take ownership of topic+name combo", func() {
if false {
fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1)
}
q1, err := startInstance(dbpath, instanceID1, "first")
g.TErrorIf(err)
defer q1.Close()
pub(q1, topic, guuid.New())
pub(q1, topic, guuid.New())
pub(q1, topic, flowID1)
<- q1.WaitFor("individual-first", flowID1, "w").Channel
<- q1.WaitFor( "shared-first", flowID1, "w").Channel
// println("waited 1")
q2, err := startInstance(dbpath, instanceID2, "second")
g.TErrorIf(err)
defer q2.Close()
<- q2.WaitFor("individual-second", flowID1, "w").Channel
pub(q2, topic, guuid.New())
pub(q2, topic, guuid.New())
pub(q2, topic, flowID2)
// FIXME: notify multiple instances so we can add this:
// <- q2.WaitFor("individual-first", flowID2, "w").Channel
<- q2.WaitFor("individual-second", flowID2, "w").Channel
<- q2.WaitFor( "shared-second", flowID2, "w").Channel
})
}
|