package q import ( "fmt" "runtime" "os" "guuid" g "gobang" ) const topic = "topic" func pub(queue IQueue, topic string, flowID guuid.UUID) { unsent := UnsentMessage{ Topic: topic, FlowID: flowID, Payload: []byte{}, } _, err := queue.Publish(unsent) g.TErrorIf(err) } func handlerFn(publish func(guuid.UUID)) func(Message) error { return func(message Message) error { publish(message.FlowID) return nil } } func startInstance( databasePath string, instanceID int, name string, ) (IQueue, error) { iqueue, err := New(databasePath) g.TErrorIf(err) queue := iqueue.(queueT) notifyFn := makeNotifyFn(queue.subscriptions.read, queue.pinger) queries, err := initDB(queue.db, defaultPrefix, notifyFn, instanceID) g.TErrorIf(err) err = queue.queries.close() g.TErrorIf(err) queue.queries = queries pub_ := func(topic string) func(guuid.UUID) { return func(flowID guuid.UUID) { pub(queue, topic, flowID) } } individual := "individual-" + name shared := "shared" queue.Subscribe(topic, individual, handlerFn(pub_(individual))) queue.Subscribe(topic, shared, handlerFn(pub_(shared + "-" + name))) return queue, nil } func MainTest() { // https://sqlite.org/forum/forumpost/2507664507 g.Init() _, file, _, ok := runtime.Caller(0) g.TAssertEqualS(ok, true, "can't get filename") dbpath := file + ".db" dbpath = "/mnt/dois/andreh/t.db" os.Remove(dbpath) os.Remove(dbpath + "-shm") os.Remove(dbpath + "-wal") // FIXME return instanceID1 := os.Getpid() instanceID2 := instanceID1 + 1 flowID1 := guuid.New() flowID2 := guuid.New() g.Testing("new instances take ownership of topic+name combo", func() { if false { fmt.Fprintf(os.Stderr, "(PID %d + 1) ", instanceID1) } q1, err := startInstance(dbpath, instanceID1, "first") g.TErrorIf(err) defer q1.Close() pub(q1, topic, guuid.New()) pub(q1, topic, guuid.New()) pub(q1, topic, flowID1) <- q1.WaitFor("individual-first", flowID1, "w").Channel <- q1.WaitFor( "shared-first", flowID1, "w").Channel // println("waited 1") q2, err := startInstance(dbpath, instanceID2, "second") g.TErrorIf(err) defer q2.Close() <- q2.WaitFor("individual-second", flowID1, "w").Channel pub(q2, topic, guuid.New()) pub(q2, topic, guuid.New()) pub(q2, topic, flowID2) // FIXME: notify multiple instances so we can add this: // <- q2.WaitFor("individual-first", flowID2, "w").Channel <- q2.WaitFor("individual-second", flowID2, "w").Channel <- q2.WaitFor( "shared-second", flowID2, "w").Channel }) }