summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2025-02-19 16:48:07 -0300
committerEuAndreh <eu@euandre.org>2025-02-19 16:48:07 -0300
commit2e320aabfdf091753b9768be728b741f5e698a48 (patch)
tree9d6a10a6e25fa09f69b98091da89d585224de3a6 /src
parentsrc/papod.go: Rename stateT to stateMutableT (diff)
downloadpapod-2e320aabfdf091753b9768be728b741f5e698a48.tar.gz
papod-2e320aabfdf091753b9768be728b741f5e698a48.tar.xz
src/papod.go: WIP stm around stateT
Diffstat (limited to 'src')
-rw-r--r--src/papod.go67
1 files changed, 49 insertions, 18 deletions
diff --git a/src/papod.go b/src/papod.go
index 77721d6..fc2d10d 100644
--- a/src/papod.go
+++ b/src/papod.go
@@ -19,6 +19,8 @@ import (
"fiinha"
"golite"
"guuid"
+ "pds"
+ "stm"
g "gobang"
)
@@ -250,13 +252,14 @@ type metricsT struct{
}
type stateMutableT struct{
- connected func(*connectionT)
- disconnect func(*connectionT)
- authenticated func(*connectionT)
+ // disconnect func(*connectionT)
subscribe func(string, []string)
- members func(string) []string
- connections func(string) []guuid.UUID
- connection func(guuid.UUID) *connectionT
+}
+
+type stateT struct{
+ members *pds.Map[string, []string]
+ users *pds.Map[string, []guuid.UUID]
+ connections *pds.Map[guuid.UUID, connectionT]
}
// TODO: key for members should be the channelID, not its name
@@ -273,6 +276,7 @@ type papodT struct{
listeners listenersT
consumers []consumerT
stateMutable stateMutableT
+ state *stm.Var[*stateT]
metrics metricsT
// logger g.Logger
}
@@ -3537,6 +3541,7 @@ func newStateMutable() stateMutableT {
members: map[string]map[string][]guuid.UUID{},
}
return stateMutableT{
+ /*
connected: func(connection *connectionT) {
rwmutex.Lock()
defer rwmutex.Unlock()
@@ -3572,6 +3577,7 @@ func newStateMutable() stateMutableT {
connection.uuid,
)
},
+ */
subscribe: func(
username string,
channelNames []string,
@@ -3587,6 +3593,7 @@ func newStateMutable() stateMutableT {
state.users[username]
}
},
+ /*
members: func(channelName string) []string {
rwmutex.RLock()
defer rwmutex.RUnlock()
@@ -3616,6 +3623,15 @@ func newStateMutable() stateMutableT {
defer rwmutex.RUnlock()
return state.connections[connectionID]
},
+ */
+ }
+}
+
+func newState() *stateT {
+ return &stateT{
+ members: pds.NewMap[string, []string](nil),
+ users: pds.NewMap[string, []guuid.UUID](nil),
+ connections: pds.NewMap[guuid.UUID, connectionT](nil),
}
}
@@ -3678,6 +3694,7 @@ func NewWithPrefix(
consumers := buildConsumers(prefix)
stateMutable := newStateMutable()
+ state := newState()
// receivers := makeReceivers()
metrics := buildMetrics(prefix)
// logger := g.NewLogger("prefix", prefix, "program", "papod")
@@ -3689,6 +3706,7 @@ func NewWithPrefix(
listeners: listeners,
consumers: consumers,
stateMutable: stateMutable,
+ state: stm.NewVar[*stateT](state),
// receivers: receivers,
metrics: metrics,
// logger: logger,
@@ -3793,6 +3811,9 @@ func asReply(event eventT) replyT {
return replyT{}
}
+func removeConnection(state *stateT, connection *connectionT) *stateT {
+ return state
+}
/// Is this death by a thousand goroutines? Is the runtime able to handle the
/// creation and destruction of hundreds of thousands of goroutines per second?
@@ -3802,14 +3823,14 @@ func asReply(event eventT) replyT {
func broadcastMessage(
message messageT,
channelName string,
- usersFn func(string) []string,
- connectionIDsFn func(string) []guuid.UUID,
- connectionFn func(guuid.UUID) *connectionT,
+ state *stateT,
) {
- for _, username := range usersFn(channelName) {
- for _, connectionID := range connectionIDsFn(username) {
- connection := connectionFn(connectionID)
- if connection == nil {
+ usernames, _ := state.members.Get(channelName)
+ for _, username := range usernames {
+ connectionIDs, _ := state.users.Get(username)
+ for _, connectionID := range connectionIDs {
+ connection, ok := state.connections.Get(connectionID)
+ if !ok {
continue
}
@@ -4102,9 +4123,7 @@ func handlePRIVMSG(
go broadcastMessage(
msg,
msg.params[0],
- papod.stateMutable.members,
- papod.stateMutable.connections,
- papod.stateMutable.connection,
+ stm.Deref(papod.state),
)
return []replyT{}, false, nil
}
@@ -4402,12 +4421,24 @@ func processMessage(
)
}
- papod.stateMutable.disconnect(connection)
+ stm.Swap(papod.state, func(state *stateT) *stateT {
+ return removeConnection(state, connection)
+ })
+ err := connection.conn.Close()
+ if err != nil {
+ g.Warning(
+ "Failed to close the connection",
+ "close-error",
+ "from", "daemon",
+ "err", err,
+ )
+ }
+
return
}
if shouldClose {
- papod.stateMutable.disconnect(connection)
+ // papod.stateMutable.disconnect(connection)
}
}