diff options
Diffstat (limited to 'tests/functional/new-instance-takeover/q.go')
-rw-r--r-- | tests/functional/new-instance-takeover/q.go | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/tests/functional/new-instance-takeover/q.go b/tests/functional/new-instance-takeover/q.go new file mode 100644 index 0000000..a678415 --- /dev/null +++ b/tests/functional/new-instance-takeover/q.go @@ -0,0 +1,127 @@ +package q + +import ( +"fmt" + "database/sql" + "runtime" + "os" + + g "gobang" + "guuid" +) + + + +const topic = "topic" + + + +func pub(queue IQueue, topic string, flowID guuid.UUID) { + unsent := UnsentMessage{ + Topic: topic, + FlowID: flowID, + Payload: []byte{}, + } + _, err := queue.Publish(unsent) + g.TErrorIf(err) +} + +func handlerFn(publish func(guuid.UUID)) func(Message) error { + return func(message Message) error { + publish(message.FlowID) + return nil + } +} + +func startInstance( + databasePath string, + instanceID int, + name string, +) (*sql.DB, IQueue, error) { + db, err := sql.Open("acude", databasePath) + g.TErrorIf(err) + + iqueue, err := New(db) + g.TErrorIf(err) + queue := iqueue.(queueT) + + notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger) + queries, err := initDB(db, defaultPrefix, notifyFn, instanceID) + g.TErrorIf(err) + + err = queue.queries.close() + g.TErrorIf(err) + + queue.queries = queries + + pub_ := func(topic string) func(guuid.UUID) { + return func(flowID guuid.UUID) { + pub(queue, topic, flowID) + } + } + + individual := "individual-" + name + shared := "shared" + + queue.Subscribe(topic, individual, handlerFn(pub_(individual))) + queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name))) + + return db, queue, nil +} + + + +func MainTest() { + // https://sqlite.org/forum/forumpost/2507664507 + g.Init() + + _, file, _, ok := runtime.Caller(0) + g.TAssertEqualS(ok, true, "can't get filename") + + dbpath := file + ".db" + dbpath = "/mnt/dois/andreh/t.db" + os.Remove(dbpath) + os.Remove(dbpath + "-shm") + os.Remove(dbpath + "-wal") + + instanceID1 := os.Getpid() + instanceID2 := instanceID1 + 1 + + flowID1 := guuid.New() + flowID2 := guuid.New() + + g.Testing("new instances take ownership of topic+name combo", func() { + if false { + fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1) + } + + db, q1, err := startInstance(dbpath, instanceID1, "first") + g.TErrorIf(err) + defer db.Close() + defer q1.Close() + + pub(q1, topic, guuid.New()) + pub(q1, topic, guuid.New()) + pub(q1, topic, flowID1) + + <- q1.WaitFor("individual-first", flowID1, "w").Channel + <- q1.WaitFor( "shared-first", flowID1, "w").Channel + // println("waited 1") + + db, q2, err := startInstance(dbpath, instanceID2, "second") + g.TErrorIf(err) + defer db.Close() + defer q2.Close() + + <- q2.WaitFor("individual-second", flowID1, "w").Channel + + pub(q2, topic, guuid.New()) + pub(q2, topic, guuid.New()) + pub(q2, topic, flowID2) + + // FIXME: notify multiple instances so we can add this: + // <- q2.WaitFor("individual-first", flowID2, "w").Channel + <- q2.WaitFor("individual-second", flowID2, "w").Channel + <- q2.WaitFor( "shared-second", flowID2, "w").Channel + }) +} |