diff options
-rw-r--r-- | src/papod.go | 67 |
1 files changed, 49 insertions, 18 deletions
diff --git a/src/papod.go b/src/papod.go index 77721d6..fc2d10d 100644 --- a/src/papod.go +++ b/src/papod.go @@ -19,6 +19,8 @@ import ( "fiinha" "golite" "guuid" + "pds" + "stm" g "gobang" ) @@ -250,13 +252,14 @@ type metricsT struct{ } type stateMutableT struct{ - connected func(*connectionT) - disconnect func(*connectionT) - authenticated func(*connectionT) + // disconnect func(*connectionT) subscribe func(string, []string) - members func(string) []string - connections func(string) []guuid.UUID - connection func(guuid.UUID) *connectionT +} + +type stateT struct{ + members *pds.Map[string, []string] + users *pds.Map[string, []guuid.UUID] + connections *pds.Map[guuid.UUID, connectionT] } // TODO: key for members should be the channelID, not its name @@ -273,6 +276,7 @@ type papodT struct{ listeners listenersT consumers []consumerT stateMutable stateMutableT + state *stm.Var[*stateT] metrics metricsT // logger g.Logger } @@ -3537,6 +3541,7 @@ func newStateMutable() stateMutableT { members: map[string]map[string][]guuid.UUID{}, } return stateMutableT{ + /* connected: func(connection *connectionT) { rwmutex.Lock() defer rwmutex.Unlock() @@ -3572,6 +3577,7 @@ func newStateMutable() stateMutableT { connection.uuid, ) }, + */ subscribe: func( username string, channelNames []string, @@ -3587,6 +3593,7 @@ func newStateMutable() stateMutableT { state.users[username] } }, + /* members: func(channelName string) []string { rwmutex.RLock() defer rwmutex.RUnlock() @@ -3616,6 +3623,15 @@ func newStateMutable() stateMutableT { defer rwmutex.RUnlock() return state.connections[connectionID] }, + */ + } +} + +func newState() *stateT { + return &stateT{ + members: pds.NewMap[string, []string](nil), + users: pds.NewMap[string, []guuid.UUID](nil), + connections: pds.NewMap[guuid.UUID, connectionT](nil), } } @@ -3678,6 +3694,7 @@ func NewWithPrefix( consumers := buildConsumers(prefix) stateMutable := newStateMutable() + state := newState() // receivers := makeReceivers() metrics := buildMetrics(prefix) // logger := g.NewLogger("prefix", prefix, "program", "papod") @@ -3689,6 +3706,7 @@ func NewWithPrefix( listeners: listeners, consumers: consumers, stateMutable: stateMutable, + state: stm.NewVar[*stateT](state), // receivers: receivers, metrics: metrics, // logger: logger, @@ -3793,6 +3811,9 @@ func asReply(event eventT) replyT { return replyT{} } +func removeConnection(state *stateT, connection *connectionT) *stateT { + return state +} /// Is this death by a thousand goroutines? Is the runtime able to handle the /// creation and destruction of hundreds of thousands of goroutines per second? @@ -3802,14 +3823,14 @@ func asReply(event eventT) replyT { func broadcastMessage( message messageT, channelName string, - usersFn func(string) []string, - connectionIDsFn func(string) []guuid.UUID, - connectionFn func(guuid.UUID) *connectionT, + state *stateT, ) { - for _, username := range usersFn(channelName) { - for _, connectionID := range connectionIDsFn(username) { - connection := connectionFn(connectionID) - if connection == nil { + usernames, _ := state.members.Get(channelName) + for _, username := range usernames { + connectionIDs, _ := state.users.Get(username) + for _, connectionID := range connectionIDs { + connection, ok := state.connections.Get(connectionID) + if !ok { continue } @@ -4102,9 +4123,7 @@ func handlePRIVMSG( go broadcastMessage( msg, msg.params[0], - papod.stateMutable.members, - papod.stateMutable.connections, - papod.stateMutable.connection, + stm.Deref(papod.state), ) return []replyT{}, false, nil } @@ -4402,12 +4421,24 @@ func processMessage( ) } - papod.stateMutable.disconnect(connection) + stm.Swap(papod.state, func(state *stateT) *stateT { + return removeConnection(state, connection) + }) + err := connection.conn.Close() + if err != nil { + g.Warning( + "Failed to close the connection", + "close-error", + "from", "daemon", + "err", err, + ) + } + return } if shouldClose { - papod.stateMutable.disconnect(connection) + // papod.stateMutable.disconnect(connection) } } |