summaryrefslogtreecommitdiff
path: root/tests/functional/new-instance-takeover/fiinha.go
blob: 5e6ad4b8e6bc33ee34a360e8aa86e93358d42dfb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package fiinha

import (
	"runtime"
	"os"

	"uuid"
	g "gobang"
)



const topic = "topic"



func pub(queue IQueue, topic string, flowID uuid.UUID) {
	unsent := UnsentMessage{
		Topic:   topic,
		FlowID:  flowID,
		Payload: []byte{},
	}
	_, err := queue.Publish(unsent)
	g.TErrorIf(err)
}

func handlerFn(publish func(uuid.UUID)) func(Message) error {
	return func(message Message) error {
		publish(message.FlowID)
		return nil
	}
}

func startInstance(
	dbpath string,
	instanceID int,
	name string,
) (IQueue, error) {
	iqueue, err := New(dbpath)
	g.TErrorIf(err)
	queue := iqueue.(queueT)

	notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
	queries, err := initDB(dbpath, defaultPrefix, notifyFn, instanceID)
	g.TErrorIf(err)

	err = queue.queries.close()
	g.TErrorIf(err)

	queue.queries = queries

	pub_ := func(topic string) func(uuid.UUID) {
		return func(flowID uuid.UUID) {
			pub(queue, topic, flowID)
		}
	}

	individual := "individual-" + name
	shared     := "shared"

	queue.Subscribe(topic, individual, handlerFn(pub_(individual)))
	queue.Subscribe(topic, shared,     handlerFn(pub_(shared + "-" + name)))

	return queue, nil
}



func MainTest() {
	g.Init()

	_, file, _, ok := runtime.Caller(0)
	g.TAssertEqualS(ok, true, "can't get filename")

	dbpath := file + ".db"
	instanceID1 := os.Getpid()
	instanceID2 := instanceID1 + 1

	flowID1 := uuid.New()
	flowID2 := uuid.New()

	g.Testing("new instances take ownership of topic+name combo", func() {
		q1, err := startInstance(dbpath, instanceID1, "first")
		g.TErrorIf(err)
		defer q1.Close()

		pub(q1, topic, uuid.New())
		pub(q1, topic, uuid.New())
		pub(q1, topic, flowID1)

		<- q1.WaitFor("individual-first", flowID1, "w").Channel
		<- q1.WaitFor(    "shared-first", flowID1, "w").Channel

		q2, err := startInstance(dbpath, instanceID2, "second")
		g.TErrorIf(err)
		defer q2.Close()

		<- q2.WaitFor("individual-second", flowID1, "w").Channel

		pub(q2, topic, uuid.New())
		pub(q2, topic, uuid.New())
		pub(q2, topic, flowID2)

		// FIXME: notify multiple instances so we can add this:
		// <- q2.WaitFor("individual-first",  flowID2, "w").Channel
		<- q2.WaitFor("individual-second", flowID2, "w").Channel
		<- q2.WaitFor(    "shared-second", flowID2, "w").Channel
	})
}