summaryrefslogtreecommitdiff
path: root/tests/functional/new-instance-takeover/q.go
blob: 6e04e5f97cde0b422479bbd53a47ca7ba1848ae7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package q

import (
	"fmt"
	"runtime"
	"os"

	"guuid"
	g "gobang"
)



const topic = "topic"



func pub(queue IQueue, topic string, flowID guuid.UUID) {
	unsent := UnsentMessage{
		Topic:   topic,
		FlowID:  flowID,
		Payload: []byte{},
	}
	_, err := queue.Publish(unsent)
	g.TErrorIf(err)
}

func handlerFn(publish func(guuid.UUID)) func(Message) error {
	return func(message Message) error {
		publish(message.FlowID)
		return nil
	}
}

func startInstance(
	databasePath string,
	instanceID int,
	name string,
) (IQueue, error) {
	iqueue, err := New(databasePath)
	g.TErrorIf(err)
	queue := iqueue.(queueT)

	notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
	queries, err := initDB(queue.db, defaultPrefix, notifyFn, instanceID)
	g.TErrorIf(err)

	err = queue.queries.close()
	g.TErrorIf(err)

	queue.queries = queries

	pub_ := func(topic string) func(guuid.UUID) {
		return func(flowID guuid.UUID) {
			pub(queue, topic, flowID)
		}
	}

	individual := "individual-" + name
	shared     := "shared"

	queue.Subscribe(topic, individual, handlerFn(pub_(individual)))
	queue.Subscribe(topic, shared,     handlerFn(pub_(shared + "-" + name)))

	return queue, nil
}



func MainTest() {
	// https://sqlite.org/forum/forumpost/2507664507
	g.Init()

	_, file, _, ok := runtime.Caller(0)
	g.TAssertEqualS(ok, true, "can't get filename")

	dbpath := file + ".db"
	dbpath = "/mnt/dois/andreh/t.db"
	os.Remove(dbpath)
	os.Remove(dbpath + "-shm")
	os.Remove(dbpath + "-wal")

	// FIXME
	return
	instanceID1 := os.Getpid()
	instanceID2 := instanceID1 + 1

	flowID1 := guuid.New()
	flowID2 := guuid.New()

	g.Testing("new instances take ownership of topic+name combo", func() {
		if false {
		fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1)
		}

		q1, err := startInstance(dbpath, instanceID1, "first")
		g.TErrorIf(err)
		defer q1.Close()

		pub(q1, topic, guuid.New())
		pub(q1, topic, guuid.New())
		pub(q1, topic, flowID1)

		<- q1.WaitFor("individual-first", flowID1, "w").Channel
		<- q1.WaitFor(    "shared-first", flowID1, "w").Channel
		// println("waited 1")

		q2, err := startInstance(dbpath, instanceID2, "second")
		g.TErrorIf(err)
		defer q2.Close()

		<- q2.WaitFor("individual-second", flowID1, "w").Channel

		pub(q2, topic, guuid.New())
		pub(q2, topic, guuid.New())
		pub(q2, topic, flowID2)

		// FIXME: notify multiple instances so we can add this:
		// <- q2.WaitFor("individual-first",  flowID2, "w").Channel
		<- q2.WaitFor("individual-second", flowID2, "w").Channel
		<- q2.WaitFor(    "shared-second", flowID2, "w").Channel
	})
}