summaryrefslogtreecommitdiff
path: root/tests/functional/new-instance-takeover/q.go
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/new-instance-takeover/q.go')
-rw-r--r--tests/functional/new-instance-takeover/q.go127
1 files changed, 127 insertions, 0 deletions
diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go
new file mode 100644
index 0000000..a678415
--- /dev/null
+++ b/tests/functional/new-instance-takeover/q.go
@@ -0,0 +1,127 @@
+package q
+
+import (
+"fmt"
+ "database/sql"
+ "runtime"
+ "os"
+
+ g "gobang"
+ "guuid"
+)
+
+
+
+const topic = "topic"
+
+
+
+func pub(queue IQueue, topic string, flowID guuid.UUID) {
+ unsent := UnsentMessage{
+ Topic: topic,
+ FlowID: flowID,
+ Payload: []byte{},
+ }
+ _, err := queue.Publish(unsent)
+ g.TErrorIf(err)
+}
+
+func handlerFn(publish func(guuid.UUID)) func(Message) error {
+ return func(message Message) error {
+ publish(message.FlowID)
+ return nil
+ }
+}
+
+func startInstance(
+ databasePath string,
+ instanceID int,
+ name string,
+) (*sql.DB, IQueue, error) {
+ db, err := sql.Open("acude", databasePath)
+ g.TErrorIf(err)
+
+ iqueue, err := New(db)
+ g.TErrorIf(err)
+ queue := iqueue.(queueT)
+
+ notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger)
+ queries, err := initDB(db, defaultPrefix, notifyFn, instanceID)
+ g.TErrorIf(err)
+
+ err = queue.queries.close()
+ g.TErrorIf(err)
+
+ queue.queries = queries
+
+ pub_ := func(topic string) func(guuid.UUID) {
+ return func(flowID guuid.UUID) {
+ pub(queue, topic, flowID)
+ }
+ }
+
+ individual := "individual-" + name
+ shared := "shared"
+
+ queue.Subscribe(topic, individual, handlerFn(pub_(individual)))
+ queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name)))
+
+ return db, queue, nil
+}
+
+
+
+func MainTest() {
+ // https://sqlite.org/forum/forumpost/2507664507
+ g.Init()
+
+ _, file, _, ok := runtime.Caller(0)
+ g.TAssertEqualS(ok, true, "can't get filename")
+
+ dbpath := file + ".db"
+ dbpath = "/mnt/dois/andreh/t.db"
+ os.Remove(dbpath)
+ os.Remove(dbpath + "-shm")
+ os.Remove(dbpath + "-wal")
+
+ instanceID1 := os.Getpid()
+ instanceID2 := instanceID1 + 1
+
+ flowID1 := guuid.New()
+ flowID2 := guuid.New()
+
+ g.Testing("new instances take ownership of topic+name combo", func() {
+ if false {
+ fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1)
+ }
+
+ db, q1, err := startInstance(dbpath, instanceID1, "first")
+ g.TErrorIf(err)
+ defer db.Close()
+ defer q1.Close()
+
+ pub(q1, topic, guuid.New())
+ pub(q1, topic, guuid.New())
+ pub(q1, topic, flowID1)
+
+ <- q1.WaitFor("individual-first", flowID1, "w").Channel
+ <- q1.WaitFor( "shared-first", flowID1, "w").Channel
+ // println("waited 1")
+
+ db, q2, err := startInstance(dbpath, instanceID2, "second")
+ g.TErrorIf(err)
+ defer db.Close()
+ defer q2.Close()
+
+ <- q2.WaitFor("individual-second", flowID1, "w").Channel
+
+ pub(q2, topic, guuid.New())
+ pub(q2, topic, guuid.New())
+ pub(q2, topic, flowID2)
+
+ // FIXME: notify multiple instances so we can add this:
+ // <- q2.WaitFor("individual-first", flowID2, "w").Channel
+ <- q2.WaitFor("individual-second", flowID2, "w").Channel
+ <- q2.WaitFor( "shared-second", flowID2, "w").Channel
+ })
+}