package papod import ( "bufio" "bytes" "database/sql" "errors" "fmt" "io" "log/slog" "net" "os" "regexp" "strings" "sync" "time" "cracha" "fiinha" "golite" "guuid" g "gobang" ) const ( defaultPrefix = "papod" rollbackErrorFmt = "rollback error: %w; while executing: %w" NEW_CHANNEL = "new-channel" pingFrequency = time.Duration(30) * time.Second pongMaxLatency = time.Duration(5) * time.Second ) type dbconfigT struct{ shared *sql.DB dbpath string prefix string } type queryT struct{ write string read string } type queriesT struct{ createUser func(newUserT) (userT, error) userByUUID func(guuid.UUID) (userT, error) updateUser func(userT) error deleteUser func(guuid.UUID) error addNetwork func(userT, newNetworkT) (networkT, error) getNetwork func(userT, guuid.UUID) (networkT, error) networks func(userT, func(networkT) error) error setNetwork func(userT, networkT) error nipNetwork func(userT, guuid.UUID) error addMember func(userT, networkT, newMemberT) (memberT, error) showMember func(userT, guuid.UUID) (memberT, error) members func(userT, guuid.UUID, func(memberT) error) error editMember func(userT, memberT) error dropMember func(userT, guuid.UUID) error addChannel func(guuid.UUID, newChannelT) (channelT, error) channels func(guuid.UUID, func(channelT) error) error topic func(channelT) error endChannel func(guuid.UUID) error join func(guuid.UUID, guuid.UUID) error part func(guuid.UUID, guuid.UUID) error names func(guuid.UUID, func(memberT) error) error addEvent func(newEventT) (eventT, error) allAfter func(guuid.UUID, func(eventT) error) error logMessage func(userT, messageT) error close func() error } type newUserT struct{ uuid guuid.UUID username string displayName string } type userT struct{ id int64 timestamp time.Time uuid guuid.UUID username string displayName string pictureID *guuid.UUID } type NetworkType string const ( NetworkType_Public NetworkType = "public" NetworkType_Private NetworkType = "private" NetworkType_Unlisted NetworkType = "unlisted" ) type newNetworkT struct{ uuid guuid.UUID name string description string type_ NetworkType } type networkT struct{ id int64 timestamp time.Time uuid guuid.UUID createdBy guuid.UUID name string description string type_ NetworkType } type newMemberT struct{ userID guuid.UUID } type memberT struct{ id int64 timestamp time.Time uuid guuid.UUID } type newChannelT struct{ id int64 timestamp time.Time uuid guuid.UUID // networkID guuid.UUID FIXME publicName string label string description string virtual bool } type channelT struct{ id int64 timestamp time.Time uuid guuid.UUID // networkID guuid.UUID FIXME publicName string label string description string virtual bool } type newEventT struct{ eventID guuid.UUID channelID guuid.UUID connectionID guuid.UUID type_ string payload string } type eventT struct{ id int64 timestamp time.Time uuid guuid.UUID channelID guuid.UUID connectionID guuid.UUID type_ string payload string previous *eventT isFist bool } type messageParamsT struct{ middle []string trailing string } type messageT struct{ prefix string command string params messageParamsT raw string } type replyT struct{ command string params messageParamsT } type listenersT struct{ daemon net.Listener commander net.Listener close func() error } type consumerT struct{ topic string name string // FIXME: use generic to avoid circular reference? handlerFn func(papodT) func(fiinha.Message) error } type connectionT struct{ conn net.Conn uuid guuid.UUID user *userT } type receiverT struct{ send func(messageT) close func() } type receiversT struct{ add func(receiverT) remove func(receiverT) get func(guuid.UUID) []receiverT close func() } type metricsT struct{ activeConnections g.Gauge nicksInChannel g.Gauge sendToClientError func(...any) receivedMessage func(...any) sentReply func(...any) } type papodT struct{ auth cracha.IAuth queue fiinha.IQueue queries queriesT listeners listenersT consumers []consumerT receivers receiversT metrics metricsT // logger g.Logger } type IPapod interface{ Start() error Close() error } func serialized[A any, B any](callback func(...A) B) (func(...A) B, func()) { in := make(chan []A) out := make(chan B) closed := false var ( closeWg sync.WaitGroup closeMutex sync.Mutex ) closeWg.Add(1) go func() { for input := range in { out <- callback(input...) } close(out) closeWg.Done() }() fn := func(input ...A) B { in <- input return (<- out) } closeFn := func() { closeMutex.Lock() defer closeMutex.Unlock() if closed { return } close(in) closed = true closeWg.Wait() } return fn, closeFn } func execSerialized(query string, db *sql.DB) (func(...any) error, func()) { return serialized(func(args ...any) error { return inTx(db, func(tx *sql.Tx) error { _, err := tx.Exec(query, args...) return err }) }) } func tryRollback(tx *sql.Tx, err error) error { rollbackErr := tx.Rollback() if rollbackErr != nil { return fmt.Errorf( rollbackErrorFmt, rollbackErr, err, ) } return err } func inTx(db *sql.DB, fn func(*sql.Tx) error) error { tx, err := db.Begin() if err != nil { return err } err = fn(tx) if err != nil { return tryRollback(tx, err) } err = tx.Commit() if err != nil { return tryRollback(tx, err) } return nil } /// "papod_users".uuid is the same as cracha_users.uuid. Not a foreign key to /// allow them to live in different physical locations. Adding it here feels /// less like an optimization related decision, and more of a coupling one. The /// way that New() works now uses the same databasePath for the fiinha.IQueue /// *and* cracha.IAuth, but cracha in no way exposes where it stores the user /// UUID or how the it is handled. This has similarities to how events here /// don't reference the fiinha.Message.ID via foreign keys either. They're /// treated only as opaque IDs. func createTablesSQL(prefix string) queryT { const tmpl_write = ` -- FIXME: unconfirmed premise: statements within a trigger are -- part of the transaction that caused it, and so are -- atomic. -- See also: -- https://stackoverflow.com/questions/77441888/ -- https://stackoverflow.com/questions/30511116/ CREATE TABLE IF NOT EXISTS "%s_users" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), -- provided by cracha uuid BLOB NOT NULL UNIQUE, username TEXT NOT NULL, display_name TEXT NOT NULL, picture_uuid BLOB UNIQUE, deleted INT NOT NULL CHECK(deleted IN (0, 1)) ) STRICT; -- CREATE TABLE IF NOT EXISTS "%s_user_changes" ( -- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, -- timestamp TEXT NOT NULL DEFAULT (%s), -- user_id INTEGER NOT NULL REFERENCES "%s_users"(id), -- attribute TEXT NOT NULL CHECK( -- attribute IN ( -- 'username', -- 'display_name', -- 'picture_uuid', -- 'deleted' -- ) -- ), -- value TEXT NOT NULL, -- op INT NOT NULL CHECK(op IN (0, 1)) -- ) STRICT; -- CREATE TRIGGER IF NOT EXISTS "%s_user_creation" -- AFTER INSERT ON "%s_users" -- BEGIN -- INSERT INTO "%s_user_changes" ( -- user_id, attribute, value, op -- ) VALUES -- (NEW.id, 'username', NEW.username, true), -- (NEW.id, 'display_name', NEW.display_name, true), -- (NEW.id, 'deleted', NEW.deleted, true) -- ; -- END; -- CREATE TRIGGER IF NOT EXISTS "%s_user_creation_picture_uuid" -- AFTER INSERT ON "%s_users" -- WHEN NEW.picture_uuid != NULL -- BEGIN -- INSERT INTO "%s_user_changes" ( -- user_id, attribute, value, op -- ) VALUES -- (NEW.id, 'picture_uuid', NEW.picture_uuid, true) -- ; -- END; -- CREATE TRIGGER IF NOT EXISTS "%s_user_update_username" -- AFTER UPDATE ON "%s_users" -- WHEN OLD.username != NEW.username -- BEGIN -- INSERT INTO "%s_user_changes" ( -- user_id, attribute, value, op -- ) VALUES -- (NEW.id, 'username', OLD.username, false), -- (NEW.id, 'username', NEW.username, true) -- ; -- END; -- CREATE TRIGGER IF NOT EXISTS "%s_user_update_display_name" -- AFTER UPDATE ON "%s_users" -- WHEN OLD.display_name != NEW.display_name -- BEGIN -- INSERT INTO "%s_user_changes" ( -- user_id, attribute, value, op -- ) VALUES -- (NEW.id, 'display_name', OLD.display_name, false), -- (NEW.id, 'display_name', NEW.display_name, true) -- ; -- END; -- CREATE TRIGGER IF NOT EXISTS "%s_user_update_picture_uuid" -- AFTER UPDATE ON "%s_users" -- WHEN OLD.picture_uuid != NEW.picture_uuid -- BEGIN -- INSERT INTO "%s_user_changes" ( -- user_id, attribute, value, op -- ) VALUES -- (NEW.id, 'picture_uuid', OLD.picture_uuid, false), -- (NEW.id, 'picture_uuid', NEW.picture_uuid, true) -- ; -- END; -- CREATE TRIGGER IF NOT EXISTS "%s_user_update_deleted" -- AFTER UPDATE ON "%s_users" -- WHEN OLD.deleted != NEW.deleted -- BEGIN -- INSERT INTO "%s_user_changes" ( -- user_id, attribute, value, op -- ) VALUES -- (NEW.id, 'deleted', OLD.deleted, false), -- (NEW.id, 'deleted', NEW.deleted, true) -- ; -- END; CREATE TABLE IF NOT EXISTS "%s_networks" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, creator_id INTEGER NOT NULL REFERENCES "%s_users"(id), name TEXT NOT NULL, description TEXT NOT NULL, type TEXT NOT NULL CHECK( type IN ('public', 'private', 'unlisted') ) ) STRICT; CREATE TABLE IF NOT EXISTS "%s_network_changes" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), network_id INTEGER NOT NULL REFERENCES "%s_networks"(id), attribute TEXT NOT NULL CHECK( attribute IN ( 'name', 'description', 'type' ) ), value TEXT NOT NULL, op INT NOT NULL CHECK(op IN (0, 1)) ) STRICT; CREATE TABLE IF NOT EXISTS "%s_members" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), network_id INTEGER NOT NULL REFERENCES "%s_networks"(id), user_id INTEGER NOT NULL, username TEXT NOT NULL, display_name TEXT NOT NULL, picture_uuid BLOB UNIQUE, status TEXT NOT NULL CHECK( status IN ('active', 'inactive', 'removed') ), active_uniq TEXT CHECK( active_uniq IN ('active', NULL) ), UNIQUE (network_id, username, active_uniq), UNIQUE (network_id, user_id) ) STRICT; CREATE TABLE IF NOT EXISTS "%s_member_roles" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, member_id INTEGER NOT NULL REFERENCES "%s_members"(id), role TEXT NOT NULL, UNIQUE (member_id, role) ) STRICT; -- FIXME: use a trigger CREATE TABLE IF NOT EXISTS "%s_member_changes" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), member_id INTEGER NOT NULL REFERENCES "%s_members"(id), attribute TEXT NOT NULL, value TEXT NOT NULL, op INT NOT NULL CHECK(op IN (0, 1)) ) STRICT; CREATE TABLE IF NOT EXISTS "%s_channels" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, network_id INTEGER -- FIXME NOT NULL REFERENCES "%s_networks"(id), public_name TEXT UNIQUE, label TEXT NOT NULL, description TEXT NOT NULL, virtual INT NOT NULL CHECK(virtual IN (0, 1)) ) STRICT; -- FIXME: use a trigger CREATE TABLE IF NOT EXISTS "%s_channel_changes" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), channel_id INTEGER NOT NULL REFERENCES "%s_channels"(id), attribute TEXT NOT NULL, value TEXT NOT NULL, op INT NOT NULL CHECK(op IN (0, 1)) ) STRICT; CREATE TABLE IF NOT EXISTS "%s_participants" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, channel_id INTEGER NOT NULL REFERENCES "%s_channels"(id), member_id INTEGER NOT NULL REFERENCES "%s_members"(id), UNIQUE (channel_id, member_id) ) STRICT; -- FIXME: create database table for connections? -- A user can have multiple sessions (different browsers, -- mobile, etc.), and each session has multiple connections, as -- the user connects and disconnections using the same session -- id, all while it is valid. -- FIXME: can a connection have multiple sessions? A long-lived -- connection that spans multiple sessions would fit into this. CREATE TABLE IF NOT EXISTS "%s_channel_events" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, channel_id INTEGER NOT NULL REFERENCES "%s_channels"(id), connection_uuid BLOB NOT NULL, -- FIXME: join type TEXT NOT NULL CHECK( type IN ( 'user-join', 'user-message' ) ), payload TEXT NOT NULL ) STRICT; ` return queryT{ write: fmt.Sprintf( tmpl_write, prefix, g.SQLiteNow, prefix, g.SQLiteNow, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, g.SQLiteNow, prefix, prefix, g.SQLiteNow, prefix, prefix, g.SQLiteNow, prefix, prefix, prefix, prefix, g.SQLiteNow, prefix, prefix, g.SQLiteNow, prefix, prefix, g.SQLiteNow, prefix, prefix, prefix, prefix, prefix, g.SQLiteNow, prefix, ), } } func createTables(db *sql.DB, prefix string) error { q := createTablesSQL(prefix) return inTx(db, func(tx *sql.Tx) error { _, err := tx.Exec(q.write) return err }) } func createUserSQL(prefix string) queryT { const tmpl_write = ` INSERT INTO "%s_users" ( uuid, username, display_name, picture_uuid, deleted ) VALUES ( ?, ?, ?, NULL, false ) RETURNING id, timestamp; ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func createUserStmt( cfg dbconfigT, ) (func(newUserT) (userT, error), func() error, error) { q := createUserSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(newUser newUserT) (userT, error) { user := userT{ uuid: newUser.uuid, username: newUser.username, displayName: newUser.displayName, } var timestr string err := writeStmt.QueryRow( newUser.uuid[:], newUser.username, newUser.displayName, ).Scan(&user.id, ×tr) if err != nil { return userT{}, err } user.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return userT{}, err } return user, nil } return fn, writeStmt.Close, nil } func userByUUIDSQL(prefix string) queryT { const tmpl_read = ` SELECT id, timestamp, username, display_name, picture_uuid FROM "%s_users" WHERE uuid = ? AND deleted = false; ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), } } func userByUUIDStmt( cfg dbconfigT, ) (func(guuid.UUID) (userT, error), func() error, error) { q := userByUUIDSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(userID guuid.UUID) (userT, error) { user := userT{ uuid: userID, } var ( timestr string picture_id_bytes []byte ) err := readStmt.QueryRow(userID[:]).Scan( &user.id, ×tr, &user.username, &user.displayName, &picture_id_bytes, ) if err != nil { return userT{}, err } if picture_id_bytes != nil { pictureID := guuid.UUID(picture_id_bytes) user.pictureID = &pictureID } user.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return userT{}, err } return user, err } return fn, readStmt.Close, nil } func updateUserSQL(prefix string) queryT { const tmpl_write = ` UPDATE "%s_users" SET username = ?, display_name = ?, picture_uuid = ? WHERE id = ? AND deleted = false RETURNING id; ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func updateUserStmt( cfg dbconfigT, ) (func(userT) error, func() error, error) { q := updateUserSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(user userT) error { var picture_id_bytes []byte = nil if user.pictureID != nil { picture_id_bytes = user.pictureID[:] } var _id int64 return writeStmt.QueryRow( user.username, user.displayName, picture_id_bytes, user.id, ).Scan(&_id) } return fn, writeStmt.Close, nil } func deleteUserSQL(prefix string) queryT { const tmpl_write = ` UPDATE "%s_users" SET deleted = true WHERE uuid = ? AND deleted = false RETURNING id; ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func deleteUserStmt( cfg dbconfigT, ) (func(guuid.UUID) error, func() error, error) { q := deleteUserSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(userID guuid.UUID) error { var _id int64 return writeStmt.QueryRow(userID[:]).Scan(&_id) } return fn, writeStmt.Close, nil } func addNetworkSQL(prefix string) queryT { const tmpl_write = ` INSERT INTO "%s_networks" ( uuid, name, description, type, creator_id ) VALUES ( ?, ?, ?, ?, ( SELECT id FROM "%s_users" WHERE id = ? AND deleted = false ) ) RETURNING id, timestamp; INSERT INTO "%s_members" ( network_id, user_id, username, display_name, picture_uuid, status, active_uniq ) VALUES ( last_insert_rowid(), ?, ( SELECT username, display_name, picture_uuid FROM "%s_users" WHERE id = ? AND deleted = false ), 'active', 'active' ) RETURNING id, timestamp; ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix, prefix, prefix, prefix), } } func addNetworkStmt( cfg dbconfigT, ) (func(userT, newNetworkT) (networkT, error), func() error, error) { q := addNetworkSQL(cfg.prefix) privateDB, err := sql.Open(golite.DriverName, cfg.dbpath) if err != nil { return nil, nil, err } writeFn, writeFnClose := execSerialized(q.write, privateDB) fn := func( user userT, newNetwork newNetworkT, ) (networkT, error) { network := networkT{ uuid: newNetwork.uuid, createdBy: user.uuid, name: newNetwork.name, description: newNetwork.description, type_: newNetwork.type_, } err := writeFn( newNetwork.uuid[:], newNetwork.name, newNetwork.description, newNetwork.type_, user.id, ) if err != nil { return networkT{}, err } return network, nil /* member := memberT{ } var timestr string { FIXME rows, err := writeStmt.Query( newNetwork.uuid[:], newNetwork.name, newNetwork.description, newNetwork.type_, user.id, ) if err != nil { return networkT{}, err } defer rows.Close() { if !rows.Next() { return networkT{}, sql.ErrNoRows } err := rows.Scan(&network.id, ×tr) if err != nil { return networkT{}, err } network.timestamp, err = time.Parse( time.RFC3339Nano, timestr, ) if err != nil { return networkT{}, err } } { if !rows.Next() { return networkT{}, sql.ErrNoRows } err := rows.Scan(&member.id, ×tr) if err != nil { return networkT{}, err } member.timestamp, err = time.Parse( time.RFC3339Nano, timestr, ) if err != nil { return networkT{}, err } } { if rows.Next() { return networkT{}, errors.New("FIXME") } err := rows.Err() if err != nil { return networkT{}, err } } } */ } closeFn := func() error { writeFnClose() return privateDB.Close() } return fn, closeFn, nil } func getNetworkSQL(prefix string) queryT { const tmpl_read = ` SELECT "%s_networks".id, "%s_networks".timestamp, "%s_users".uuid, "%s_networks".name, "%s_networks".description, "%s_networks".type FROM "%s_networks" JOIN "%s_users" ON "%s_users".id = "%s_networks".creator_id WHERE "%s_networks".uuid = $networkUUID AND $userID IN ( SELECT id FROM "%s_users" WHERE id = $userID AND deleted = false ) AND ( "%s_networks".type IN ('public', 'unlisted') OR $userID IN ( SELECT user_id FROM "%s_members" WHERE user_id = $userID AND network_id = "%s_networks".id ) ); ` return queryT{ read: fmt.Sprintf( tmpl_read, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, ), } } func getNetworkStmt( cfg dbconfigT, ) (func(userT, guuid.UUID) (networkT, error), func() error, error) { q := getNetworkSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(user userT, networkID guuid.UUID) (networkT, error) { network := networkT{ uuid: networkID, } var ( timestr string creator_id_bytes []byte ) err := readStmt.QueryRow(networkID[:], user.id).Scan( &network.id, ×tr, &creator_id_bytes, &network.name, &network.description, &network.type_, ) if err != nil { return networkT{}, err } network.createdBy = guuid.UUID(creator_id_bytes) network.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return networkT{}, err } return network, nil } return fn, readStmt.Close, nil } func networkEach(rows *sql.Rows, callback func(networkT) error) error { if rows == nil { return nil } for rows.Next() { var ( network networkT timestr string network_id_bytes []byte ) err := rows.Scan( &network.id, ×tr, &network_id_bytes, ) if err != nil { return g.WrapErrors(rows.Close(), err) } network.uuid = guuid.UUID(network_id_bytes) network.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return g.WrapErrors(rows.Close(), err) } err = callback(network) if err != nil { return g.WrapErrors(rows.Close(), err) } } return g.WrapErrors(rows.Err(), rows.Close()) } func networksSQL(prefix string) queryT { const tmpl_read = ` -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), } } func networksStmt( cfg dbconfigT, ) (func(userT) (*sql.Rows, error), func() error, error) { q := networksSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(user userT) (*sql.Rows, error) { return readStmt.Query(user.uuid[:]) } return fn, readStmt.Close, nil } func setNetworkSQL(prefix string) queryT { const tmpl_write = ` -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func setNetworkStmt( cfg dbconfigT, ) (func(userT, networkT) error, func() error, error) { q := setNetworkSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(user userT, network networkT) error { _, err := writeStmt.Exec(network) return err } return fn, writeStmt.Close, nil } func nipNetworkSQL(prefix string) queryT { const tmpl_write = ` -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func nipNetworkStmt( cfg dbconfigT, ) (func(userT, guuid.UUID) error, func() error, error) { q := nipNetworkSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(user userT, networkID guuid.UUID) error { _, err := writeStmt.Exec(networkID[:]) return err } return fn, writeStmt.Close, nil } func addMemberSQL(prefix string) queryT { const tmpl_write = ` -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func addMemberStmt( cfg dbconfigT, ) (func(userT, networkT, newMemberT) (memberT, error), func() error, error) { q := addMemberSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func( user userT, network networkT, newMember newMemberT, ) (memberT, error) { member := memberT{ } var timestr string err := writeStmt.QueryRow(network.uuid[:], newMember).Scan( &member.id, ×tr, ) if err != nil { return memberT{}, err } member.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return memberT{}, err } return member, nil } return fn, writeStmt.Close, nil } func showMemberSQL(prefix string) queryT { const tmpl_read = ` -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), } } func showMemberStmt( cfg dbconfigT, ) (func(userT, guuid.UUID) (memberT, error), func() error, error) { q := showMemberSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(user userT, memberID guuid.UUID) (memberT, error) { member := memberT{ uuid: memberID, } var timestr string err := readStmt.QueryRow(memberID[:]).Scan( &member.id, ×tr, ) if err != nil { return memberT{}, err } member.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return memberT{}, err } return member, err } return fn, readStmt.Close, nil } func memberEach(rows *sql.Rows, callback func(memberT) error) error { if rows == nil { return nil } for rows.Next() { var ( member memberT timestr string member_id_bytes []byte ) err := rows.Scan( &member.id, ×tr, &member_id_bytes, ) if err != nil { return g.WrapErrors(rows.Close(), err) } // member.uuid = guuid.UUID(member_id_bytes) FIXME member.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return g.WrapErrors(rows.Close(), err) } err = callback(member) if err != nil { return g.WrapErrors(rows.Close(), err) } } return g.WrapErrors(rows.Err(), rows.Close()) } func membersSQL(prefix string) queryT { const tmpl_read = ` -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), } } func membersStmt( cfg dbconfigT, ) (func(userT, guuid.UUID) (*sql.Rows, error), func() error, error) { q := membersSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(user userT, networkID guuid.UUID) (*sql.Rows, error) { return readStmt.Query(networkID[:]) } return fn, readStmt.Close, nil } func editMemberSQL(prefix string) queryT { const tmpl_write = ` -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func editMemberStmt( cfg dbconfigT, ) (func(userT, memberT) error, func() error, error) { q := editMemberSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(user userT, member memberT) error { _, err := writeStmt.Exec(member) return err } return fn, writeStmt.Close, nil } func dropMemberSQL(prefix string) queryT { const tmpl_write = ` -- FIXME ` return queryT{ write: fmt.Sprintf(tmpl_write), } } func dropMemberStmt( cfg dbconfigT, ) (func(userT, guuid.UUID) error, func() error, error) { q := dropMemberSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(user userT, memberID guuid.UUID) error { _, err := writeStmt.Exec(memberID[:]) return err } return fn, writeStmt.Close, nil } func addChannelSQL(prefix string) queryT { const tmpl_write = ` INSERT INTO "%s_channels" ( uuid, public_name, label, description, virtual ) VALUES (?, ?, ?, ?, ?) RETURNING id, timestamp; ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func addChannelStmt( cfg dbconfigT, ) (func (guuid.UUID, newChannelT) (channelT, error), func() error, error) { q := addChannelSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func( networkID guuid.UUID, newChannel newChannelT, ) (channelT, error) { channel := channelT{ uuid: newChannel.uuid, // networkID[:], publicName: newChannel.publicName, label: newChannel.label, description: newChannel.description, virtual: newChannel.virtual, } var timestr string err := writeStmt.QueryRow( newChannel.uuid[:], newChannel.publicName, newChannel.label, newChannel.description, newChannel.virtual, ).Scan(&channel.id, ×tr) if err != nil { return channelT{}, err } channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return channelT{}, err } return channel, nil } return fn, writeStmt.Close, nil } func channelEach(rows *sql.Rows, callback func(channelT) error) error { if rows == nil { return nil } for rows.Next() { var ( channel channelT timestr string channel_id_bytes []byte ) err := rows.Scan( &channel.id, ×tr, &channel_id_bytes, ) if err != nil { return g.WrapErrors(rows.Close(), err) } channel.uuid = guuid.UUID(channel_id_bytes) channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return g.WrapErrors(rows.Close(), err) } err = callback(channel) if err != nil { return g.WrapErrors(rows.Close(), err) } } return g.WrapErrors(rows.Err(), rows.Close()) } func channelsSQL(prefix string) queryT { const tmpl_read = ` -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), } } func channelsStmt( cfg dbconfigT, ) (func(guuid.UUID) (*sql.Rows, error), func() error, error) { q := channelsSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(networkID guuid.UUID) (*sql.Rows, error) { return readStmt.Query(networkID[:]) } return fn, readStmt.Close, nil } func topicSQL(prefix string) queryT { const tmpl_write = ` -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func topicStmt( cfg dbconfigT, ) (func(channelT) error, func() error, error) { q := topicSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(channel channelT) error { _, err := writeStmt.Exec(channel) return err } return fn, writeStmt.Close, nil } func endChannelSQL(prefix string) queryT { const tmpl_write = ` -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func endChannelStmt( cfg dbconfigT, ) (func(guuid.UUID) error, func() error, error) { q := endChannelSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(channelID guuid.UUID) error { _, err := writeStmt.Exec(channelID[:]) return err } return fn, writeStmt.Close, nil } func joinSQL(prefix string) queryT { const tmpl_write = ` -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func joinStmt( cfg dbconfigT, ) (func(guuid.UUID, guuid.UUID) error, func() error, error) { q := joinSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(memberID guuid.UUID, channelID guuid.UUID) error { _, err := writeStmt.Exec(memberID[:], channelID[:]) return err } return fn, writeStmt.Close, nil } func partSQL(prefix string) queryT { const tmpl_write = ` -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func partStmt( cfg dbconfigT, ) (func(guuid.UUID, guuid.UUID) error, func() error, error) { q := partSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(memberID guuid.UUID, channelID guuid.UUID) error { _, err := writeStmt.Exec(memberID[:], channelID[:]) return err } return fn, writeStmt.Close, nil } func nameEach(rows *sql.Rows, callback func(memberT) error) error { if rows == nil { return nil } for rows.Next() { var ( member memberT timestr string member_id_bytes []byte ) err := rows.Scan( &member.id, ×tr, &member_id_bytes, ) if err != nil { return g.WrapErrors(rows.Close(), err) } member.uuid = guuid.UUID(member_id_bytes) member.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return g.WrapErrors(rows.Close(), err) } err = callback(member) if err != nil { return g.WrapErrors(rows.Close(), err) } } return g.WrapErrors(rows.Err(), rows.Close()) } func namesSQL(prefix string) queryT { const tmpl_read = ` -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), } } func namesStmt( cfg dbconfigT, ) (func(guuid.UUID) (*sql.Rows, error), func() error, error) { q := namesSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(channelID guuid.UUID) (*sql.Rows, error) { return readStmt.Query(channelID[:]) } return fn, readStmt.Close, nil } func addEventSQL(prefix string) queryT { const tmpl_write = ` INSERT INTO "%s_channel_events" ( uuid, channel_id, connection_uuid, type, payload ) VALUES ( ?, (SELECT id FROM "%s_channels" WHERE uuid = ?), ?, ?, ? ) RETURNING id, timestamp; ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix, prefix), } } func addEventStmt( cfg dbconfigT, ) (func (newEventT) (eventT, error), func() error, error) { q := addEventSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(newEvent newEventT) (eventT, error) { event := eventT{ uuid: newEvent.eventID, channelID: newEvent.channelID, connectionID: newEvent.connectionID, type_: newEvent.type_, payload: newEvent.payload, } var timestr string err := writeStmt.QueryRow( newEvent.eventID[:], newEvent.channelID[:], newEvent.connectionID[:], newEvent.type_, newEvent.payload, ).Scan(&event.id, ×tr) if err != nil { return eventT{}, err } event.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return eventT{}, err } return event, nil } return fn, writeStmt.Close, nil } func eventEach(rows *sql.Rows, callback func(eventT) error) error { if rows == nil { return nil } for rows.Next() { var ( event eventT timestr string event_id_bytes []byte channel_id_bytes []byte connection_id_bytes []byte ) err := rows.Scan( &event.id, ×tr, &event_id_bytes, &channel_id_bytes, &connection_id_bytes, &event.type_, &event.payload, ) if err != nil { rows.Close() return err } event.uuid = guuid.UUID(event_id_bytes) event.channelID = guuid.UUID(channel_id_bytes) event.connectionID = guuid.UUID(connection_id_bytes) event.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { rows.Close() return err } err = callback(event) if err != nil { rows.Close() return err } } return g.WrapErrors(rows.Err(), rows.Close()) } func allAfterSQL(prefix string) queryT { const tmpl_read = ` WITH landmark_event AS ( SELECT id, channel_id FROM "%s_channel_events" WHERE uuid = ? ) SELECT "%s_channel_events".id, "%s_channel_events".timestamp, "%s_channel_events".uuid, "%s_channels".uuid, "%s_channel_events".connection_uuid, "%s_channel_events".type, "%s_channel_events".payload FROM "%s_channel_events" JOIN "%s_channels" ON "%s_channel_events".channel_id = "%s_channels".id WHERE "%s_channel_events".id > ( SELECT id FROM landmark_event ) AND channel_id = ( SELECT channel_id FROM landmark_event ); ` return queryT{ read: fmt.Sprintf( tmpl_read, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, prefix, ), } } func allAfterStmt( cfg dbconfigT, ) (func (guuid.UUID) (*sql.Rows, error), func() error, error) { q := allAfterSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(eventID guuid.UUID) (*sql.Rows, error) { return readStmt.Query(eventID[:]) } return fn, readStmt.Close, nil } func logMessageSQL(prefix string) queryT{ const tmpl_write = ` -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func logMessageStmt( cfg dbconfigT, ) (func(userT, messageT) error, func() error, error) { q := logMessageSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(user userT, message messageT) error { return nil // FIXME _, err := writeStmt.Exec(user, message) return err } return fn, writeStmt.Close, nil } func initDB( dbpath string, prefix string, ) (queriesT, error) { err := g.ValidateSQLTablePrefix(prefix) if err != nil { return queriesT{}, err } shared, err := sql.Open(golite.DriverName, dbpath) if err != nil { return queriesT{}, err } cfg := dbconfigT{ shared: shared, dbpath: dbpath, prefix: prefix, } createTablesErr := createTables(shared, prefix) createUser, createUserClose, createUserErr := createUserStmt(cfg) userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(cfg) updateUser, updateUserClose, updateUserErr := updateUserStmt(cfg) deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(cfg) addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(cfg) getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(cfg) networks, networksClose, networksErr := networksStmt(cfg) setNetwork, setNetworkClose, setNetworkErr := setNetworkStmt(cfg) nipNetwork, nipNetworkClose, nipNetworkErr := nipNetworkStmt(cfg) addMember, addMemberClose, addMemberErr := addMemberStmt(cfg) showMember, showMemberClose, showMemberErr := showMemberStmt(cfg) members, membersClose, membersErr := membersStmt(cfg) editMember, editMemberClose, editMemberErr := editMemberStmt(cfg) dropMember, dropMemberClose, dropMemberErr := dropMemberStmt(cfg) addChannel, addChannelClose, addChannelErr := addChannelStmt(cfg) channels, channelsClose, channelsErr := channelsStmt(cfg) topic, topicClose, topicErr := topicStmt(cfg) endChannel, endChannelClose, endChannelErr := endChannelStmt(cfg) join, joinClose, joinErr := joinStmt(cfg) part, partClose, partErr := partStmt(cfg) names, namesClose, namesErr := namesStmt(cfg) addEvent, addEventClose, addEventErr := addEventStmt(cfg) allAfter, allAfterClose, allAfterErr := allAfterStmt(cfg) logMessage, logMessageClose, logMessageErr := logMessageStmt(cfg) closeFn := func() error { return g.SomeFnError( createUserClose, userByUUIDClose, updateUserClose, deleteUserClose, addNetworkClose, getNetworkClose, networksClose, setNetworkClose, nipNetworkClose, addMemberClose, showMemberClose, membersClose, editMemberClose, dropMemberClose, addChannelClose, channelsClose, topicClose, endChannelClose, joinClose, partClose, namesClose, addEventClose, allAfterClose, logMessageClose, ) } err = g.SomeError( createTablesErr, createUserErr, userByUUIDErr, updateUserErr, deleteUserErr, addNetworkErr, getNetworkErr, networksErr, setNetworkErr, nipNetworkErr, addMemberErr, showMemberErr, membersErr, editMemberErr, dropMemberErr, addChannelErr, channelsErr, topicErr, endChannelErr, joinErr, partErr, namesErr, addEventErr, allAfterErr, logMessageErr, ) if err != nil { ferr := g.SomeError( createUserErr, ) fmt.Printf("ferr: %#v\n", ferr) closeFn() return queriesT{}, err } var connMutex sync.RWMutex return queriesT{ createUser: func(a newUserT) (userT, error) { connMutex.RLock() defer connMutex.RUnlock() return createUser(a) }, userByUUID: func(a guuid.UUID) (userT, error) { connMutex.RLock() defer connMutex.RUnlock() return userByUUID(a) }, updateUser: func(a userT) error { connMutex.RLock() defer connMutex.RUnlock() return updateUser(a) }, deleteUser: func(a guuid.UUID) error { connMutex.RLock() defer connMutex.RUnlock() return deleteUser(a) }, addNetwork: func(a userT, b newNetworkT) (networkT, error) { connMutex.RLock() defer connMutex.RUnlock() return addNetwork(a, b) }, getNetwork: func(a userT, b guuid.UUID) (networkT, error) { connMutex.RLock() defer connMutex.RUnlock() return getNetwork(a, b) }, networks: func( a userT, callback func(networkT) error, ) error { var ( err error rows *sql.Rows ) { connMutex.RLock() defer connMutex.RUnlock() rows, err = networks(a) } if err != nil { return err } return networkEach(rows, callback) }, setNetwork: func(a userT, b networkT) error { connMutex.RLock() defer connMutex.RUnlock() return setNetwork(a, b) }, nipNetwork: func(a userT, b guuid.UUID) error { connMutex.RLock() defer connMutex.RUnlock() return nipNetwork(a, b) }, addMember: func( a userT, b networkT, c newMemberT, ) (memberT, error) { connMutex.RLock() defer connMutex.RUnlock() return addMember(a, b, c) }, showMember: func(a userT, b guuid.UUID) (memberT, error) { connMutex.RLock() defer connMutex.RUnlock() return showMember(a, b) }, members: func( a userT, b guuid.UUID, callback func(memberT) error, ) error { var ( err error rows *sql.Rows ) { connMutex.RLock() defer connMutex.RUnlock() rows, err = members(a, b) } if err != nil { return err } return memberEach(rows, callback) }, editMember: func(a userT, b memberT) error { connMutex.RLock() defer connMutex.RUnlock() return editMember(a, b) }, dropMember: func(a userT, b guuid.UUID) error { connMutex.RLock() defer connMutex.RUnlock() return dropMember(a, b) }, addChannel: func( a guuid.UUID, b newChannelT, ) (channelT, error) { connMutex.RLock() defer connMutex.RUnlock() return addChannel(a, b) }, channels: func( a guuid.UUID, callback func(channelT) error, ) error { var ( err error rows *sql.Rows ) { connMutex.RLock() defer connMutex.RUnlock() rows, err = channels(a) } if err != nil { return err } return channelEach(rows, callback) }, topic: func(a channelT) error { connMutex.RLock() defer connMutex.RUnlock() return topic(a) }, endChannel: func(a guuid.UUID) error { connMutex.RLock() defer connMutex.RUnlock() return endChannel(a) }, join: func(a guuid.UUID, b guuid.UUID) error { connMutex.RLock() defer connMutex.RUnlock() return join(a, b) }, part: func(a guuid.UUID, b guuid.UUID) error { connMutex.RLock() defer connMutex.RUnlock() return part(a, b) }, names: func(a guuid.UUID, callback func(memberT) error) error { var ( err error rows *sql.Rows ) { connMutex.RLock() defer connMutex.RUnlock() rows, err = names(a) } if err != nil { return err } return nameEach(rows, callback) }, addEvent: func(a newEventT) (eventT, error) { connMutex.RLock() defer connMutex.RUnlock() return addEvent(a) }, allAfter: func( a guuid.UUID, callback func(eventT) error, ) error { var ( err error rows *sql.Rows ) { connMutex.RLock() defer connMutex.RUnlock() rows, err = allAfter(a) } if err != nil { return err } return eventEach(rows, callback) }, logMessage: func(a userT, b messageT) error { connMutex.Lock() defer connMutex.Unlock() return logMessage(a, b) }, close: func() error { connMutex.Lock() defer connMutex.Unlock() return closeFn() }, }, nil } func newChannelHandler(papod papodT) func(fiinha.Message) error { return func(message fiinha.Message) error { return nil } } func buildConsumers(prefix string) []consumerT { return []consumerT{ consumerT{ topic: NEW_CHANNEL, name: prefix + NEW_CHANNEL, handlerFn: newChannelHandler, }, } } func unregisterConsumers( unsubscribeFn func(string, string), consumers []consumerT, ) { for _, c := range consumers { unsubscribeFn(c.topic, c.name) } } func registerConsumers( subscribeFn func(string, string, func(fiinha.Message) error) error, unsubscribeFn func(string, string), papod papodT, consumers []consumerT, ) error { for _, c := range consumers { err := subscribeFn(c.topic, c.name, c.handlerFn(papod)) if err != nil { unregisterConsumers(unsubscribeFn, consumers) return err } } return nil } func initListeners( daemonSocketPath string, commanderSocketPath string, ) (listenersT, error) { daemon, err := net.Listen("unix", daemonSocketPath) if err != nil { return listenersT{}, err } commander, err := net.Listen("unix", commanderSocketPath) if err != nil { daemon.Close() return listenersT{}, err } return listenersT{ daemon: daemon, commander: commander, close: func() error { return g.SomeError( daemon.Close(), commander.Close(), ) }, }, nil } func makeReceivers() receiversT { var rwmutex sync.Mutex return receiversT{ add: func(receiver receiverT) { }, remove: func(receiver receiverT) { }, get: func(guuid.UUID) []receiverT{ return nil }, close: func() { rwmutex.Lock() defer rwmutex.Unlock() }, } } func buildMetrics(prefix string) metricsT { return metricsT{ activeConnections: g.MakeGauge( "active-connection", "prefix", prefix, ), nicksInChannel: g.MakeGauge( "nicks-in-channel", "prefix", prefix, ), sendToClientError: g.MakeCounter( "send-to-client-error", "prefix", prefix, ), receivedMessage: g.MakeCounter( "received-message", "prefix", prefix, ), sentReply: g.MakeCounter( "sent-reply", "prefix", prefix, ), } } func NewWithPrefix( databasePath string, prefix string, daemonSocketPath string, commanderSocketPath string, ) (IPapod, error) { queue, err := fiinha.New(databasePath) if err != nil { return papodT{}, err } auth, err := cracha.New(databasePath) if err != nil { queue.Close() return papodT{}, err } listeners, err := initListeners(daemonSocketPath, commanderSocketPath) if err != nil { queue.Close() auth.Close() return papodT{}, err } queries, err := initDB(databasePath, prefix) if err != nil { queue.Close() auth.Close() listeners.close() return papodT{}, err } consumers := buildConsumers(prefix) receivers := makeReceivers() metrics := buildMetrics(prefix) // logger := g.NewLogger("prefix", prefix, "program", "papod") return papodT{ queue: queue, auth: auth, queries: queries, listeners: listeners, consumers: consumers, receivers: receivers, metrics: metrics, // logger: logger, }, nil } func New(basePath string) (IPapod, error) { return NewWithPrefix( basePath + "/papod.db", defaultPrefix, basePath + "/papod.daemon.sock", basePath + "/papod.commander.sock", ) } func splitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) { idx := bytes.Index(data, []byte { '\r', '\n' }) if idx == -1 { return 0, nil, nil } return idx + 2, data[0:idx], nil } func splitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) { advance, token, error := splitOnCRLF(data, atEOF) if len(token) == 0 { return advance, nil, error } return advance, token, error } func splitSpaces(r rune) bool { return r == ' ' } func parseMessageParams(params string) messageParamsT { const sep = " :" var middle string var trailing string idx := strings.Index(params, sep) if idx == -1 { middle = params trailing = "" } else { middle = params[:idx] trailing = params[idx + len(sep):] } return messageParamsT{ middle: strings.FieldsFunc(middle, splitSpaces), trailing: trailing, } } var messageRegex = regexp.MustCompilePOSIX( // //1 2 3 4 `^(:([^ ]+) +)?([a-zA-Z]+) *( .*)$`, ) func parseMessage(rawMessage string) (messageT, error) { var msg messageT components := messageRegex.FindStringSubmatch(rawMessage) if components == nil { return msg, errors.New("Can't parse message") } msg = messageT{ prefix: components[2], command: components[3], params: parseMessageParams(components[4]), raw: rawMessage, } return msg, nil } func asNewEvent(msg messageT) newEventT { return newEventT{} } func joinEvent(member memberT) eventT { return eventT{} } func asMessage(event eventT) messageT { return messageT{} } func asReply(event eventT) replyT { return replyT{} } func broadcastEvent(event eventT, receiversFn func(guuid.UUID) []receiverT) { message := asMessage(event) for _, receiver := range receiversFn(event.channelID) { // FIXME: // is this death by a thousand goroutines? Is the runtime // able to handle the creation and destruction of hundreds of // thousands of goroutines per second? go receiver.send(message) } } var ( replyErrUnknown = replyT{ command: "421", params: messageParamsT{ middle: []string{}, trailing: "Unknown command", }, } replyErrNotRegistered = replyT{ command: "451", params: messageParamsT{ middle: []string{}, trailing: "You have not registered", }, } replyErrFileError = replyT{ command: "424", params: messageParamsT{ middle: []string{}, trailing: "File error doing query on database", }, } RPL_WELCOME = replyT{ command: "001", params: messageParamsT{ middle: []string{}, trailing: "", }, } ) func handleUnknown( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { // FIXME: user doesn't exist when unauthenticated err := papod.queries.logMessage(userT{ }, msg) if err != nil { g.Warning( "Failed to log message", fmt.Sprintf("%#v", msg), "group-as", "db-write", "handler-action", "log-and-ignore", "connection", connection.uuid.String(), "err", err, ) } return []replyT{ replyErrUnknown }, nil } func handleUSER( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { u := connection.user.username m := []string{ u } return []replyT{ replyT{ command: "001", params: messageParamsT{ middle: m, trailing: "Welcome to the Internet Relay Network " + u, }, }, replyT{ command: "002", params: messageParamsT{ middle: m, trailing: "Your host is FIXME, running version " + Version, }, }, replyT{ command: "003", params: messageParamsT{ middle: m, trailing: "This server was create FIXME", }, }, replyT{ command: "004", params: messageParamsT{ middle: m, trailing: "FIXME " + Version + " i x", }, }, }, nil } func handleNICK( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { connection.user.username = msg.params.middle[0] return []replyT{}, nil } func handlePRIVMSG( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { // FIXME: check missing params // FIXME: check if user is member of channel, and is authorized to post // FIXME: adapt to handle multiple targets return []replyT{}, nil event, err := papod.queries.addEvent(asNewEvent(msg)) if err != nil { // FIXME: not allowed reply per RFC 1459, check other specs return []replyT{ replyErrFileError }, nil } go broadcastEvent(event, papod.receivers.get) reply := asReply(event) return []replyT{ reply }, nil } func handleTOPIC( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { return []replyT{ replyT{ command: "JOIN", params: messageParamsT{ middle: []string{ msg.params.middle[0] }, trailing: "", }, } }, nil } func handleJOIN( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { u := connection.user.username channel := msg.params.middle[0] return []replyT{ replyT{ command: "JOIN", params: messageParamsT{ middle: []string{ channel }, trailing: "", }, }, replyT{ command: "331", params: messageParamsT{ middle: []string{ u, channel }, trailing: "No topic is set", }, }, replyT{ command: "353", params: messageParamsT{ middle: []string{ u, "=", channel }, trailing: u + " virtualuser", }, }, replyT{ command: "366", params: messageParamsT{ middle: []string{ u, channel }, trailing: "End of NAMES list", }, } }, nil member, err := papod.queries.addMember( *connection.user, networkT{}, newMemberT{}, ) if err != nil { // FIXME: not allowed per RFC 1459 return []replyT{ replyErrFileError }, nil } event := joinEvent(member) papod.metrics.nicksInChannel.Inc() go broadcastEvent(event, papod.receivers.get) reply := asReply(event) return []replyT{ reply }, nil } func handleMODE( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { u := connection.user.username channel := msg.params.middle[0] return []replyT{ replyT{ command: "324", params: messageParamsT{ middle: []string{ u, channel, "+Cnst" }, trailing: "", }, } }, nil } func handleWHOIS( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { u := connection.user.username user := msg.params.middle[0] return []replyT{ replyT{ command: "311", params: messageParamsT{ middle: []string{ u, user, user, "samehost", "*" }, trailing: "my real name is: " + user, }, }, replyT{ command: "312", params: messageParamsT{ middle: []string{ u, user, "stillsamehost" }, trailing: "some server info", }, }, replyT{ command: "319", params: messageParamsT{ middle: []string{ u, user }, trailing: "#default", }, }, replyT{ command: "318", params: messageParamsT{ middle: []string{ u, user }, trailing: "End of WHOIS list", }, } }, nil } func handleAWAY( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { u := connection.user.username if msg.params.trailing == "" { return []replyT{ replyT{ command: "305", params: messageParamsT{ middle: []string{ u }, trailing: "You are no longer marked as away", }, } }, nil } else { return []replyT{ replyT{ command: "306", params: messageParamsT{ middle: []string{ u }, trailing: "You have been marked as away", }, } }, nil } } func handlePING( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { return []replyT{ { command: "PONG", params: messageParamsT{ middle: []string{}, trailing: msg.params.middle[0], }, } }, nil } func handleQUIT( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { connection.conn.Close() return []replyT{}, nil } func handleCAP( papod papodT, connection *connectionT, msg messageT, ) ([]replyT, error) { if msg.params.middle[0] == "END" { return nil, nil } return []replyT{ replyT{ command: "CAP", params: messageParamsT{ middle: []string { "*", "LS" }, trailing: "", }, } }, nil } func authRequired( fn func(papodT, *connectionT, messageT) ([]replyT, error), ) func(papodT, *connectionT, messageT) ([]replyT, error) { return func( papod papodT, connection *connectionT, message messageT, ) ([]replyT, error) { if connection.user == nil { return []replyT{ replyErrNotRegistered }, nil } return fn(papod, connection, message) } } var commands = map[string]func( papodT, *connectionT, messageT, ) ([]replyT, error) { "USER": handleUSER, "NICK": handleNICK, "QUIT": handleQUIT, "CAP": handleCAP, "AWAY": authRequired(handleAWAY), "PRIVMSG": authRequired(handlePRIVMSG), "PING": authRequired(handlePING), "JOIN": authRequired(handleJOIN), "MODE": authRequired(handleMODE), "TOPIC": authRequired(handleTOPIC), "WHOIS": authRequired(handleWHOIS), } func actionFnFor( command string, ) func(papodT, *connectionT, messageT) ([]replyT, error) { fn := commands[command] if fn != nil { return fn } return handleUnknown } func replyString(reply replyT) string { if reply.params.trailing == "" { return fmt.Sprintf( "%s %s\r\n", reply.command, strings.Join(reply.params.middle, " "), ) } else { return fmt.Sprintf( "%s %s :%s\r\n", reply.command, strings.Join(reply.params.middle, " "), reply.params.trailing, ) } } func processMessage(papod papodT, connection *connectionT, rawMessage string) { msg, err := parseMessage(rawMessage) if err != nil { g.Info( "Error processing message", "process-message", "text", rawMessage, "err", err, ) return } papod.metrics.receivedMessage( "message", fmt.Sprintf("%#v", msg), "text", rawMessage, ) var replyErrors []error replies, actionErr := actionFnFor(msg.command)(papod, connection, msg) for _, reply := range replies { text := replyString(reply) _, err = io.WriteString(connection.conn, text) if err != nil { replyErrors = append(replyErrors, err) } papod.metrics.sentReply( "message", rawMessage, "reply", fmt.Sprintf("%#v", reply), "text", text, ) } if actionErr != nil || len(replyErrors) != 0 { if actionErr != nil { g.Info( "Handler returned error", "handler-error", "from", "daemon", "err", err, ) } if len(replyErrors) != 0 { g.Info( "Failed to send reply", "send-reply-error", "from", "daemon", "err", replyErrors, ) } // FIXME: Close the connection } } func handleConnection(papod papodT, conn net.Conn) { connection := connectionT{ conn: conn, uuid: guuid.New(), // user: nil, // FIXME: SASL shenanigan probably goes here user: &userT{}, } scanner := bufio.NewScanner(conn) scanner.Split(splitOnRawMessage) for scanner.Scan() { processMessage(papod, &connection, scanner.Text()) } } func handleCommand(papod papodT, conn net.Conn) { // FIXME } func daemonLoop(papod papodT) { for { conn, err := papod.listeners.daemon.Accept() if err != nil { if errors.Is(err, net.ErrClosed) { break } g.Warning( "Error accepting daemon connection", "accept-connection", "from", "daemon", "err", err, ) continue } // FIXME: where does it get closed go handleConnection(papod, conn) } } func commanderLoop(papod papodT) { for { conn, err := papod.listeners.commander.Accept() if err != nil { if errors.Is(err, net.ErrClosed) { break } g.Warning( "Error accepting commander connection", "accept-connection", "from", "commander", "err", err, ) continue } go handleCommand(papod, conn) } } func mkbgrun() (func(func()), func()) { var wg sync.WaitGroup bgrun := func(f func()) { wg.Add(1) go func() { f() wg.Done() }() } return bgrun, wg.Wait } func (papod papodT) Start() error { err := registerConsumers( papod.queue.Subscribe, papod.queue.Unsubscribe, papod, papod.consumers, ) if err != nil { return err } // FIXME: papod.logger.Info g.Info("Starting service", "lifecycle-event", "event", "starting-server", slog.Group( "versions", "gobang", g.Version, "cracha", cracha.Version, "fiinha", fiinha.Version, "golite", golite.Version, "guuid", guuid.Version, "papod", Version, "this", Version, ), ) run, wait := mkbgrun() run(func() { daemonLoop(papod) }) run(func() { commanderLoop(papod) }) wait() return nil } func (papod papodT) Close() error { // FIXME: does this wait for current handlers to wait? Well, it should. unregisterConsumers(papod.queue.Unsubscribe, papod.consumers) return g.WrapErrors( papod.listeners.close(), // papod.connCloser.closeAll(), papod.auth.Close(), papod.queue.Close(), papod.queries.close(), ) } func basePathFrom(args []string) (string, error) { if len(args) < 2 { return os.Getwd() } else { return args[1], nil } } func Main() { g.Init("program", "papod") basePath, err := basePathFrom(os.Args) if err != nil { fmt.Printf("%v\n", err) os.Exit(1) } ipapod, err := New(basePath) if err != nil { fmt.Printf("Failed to create papod: %v\n", err) os.Exit(1) } err = ipapod.Start() if err != nil { fmt.Printf("Failed to start: %v\n", err) os.Exit(1) } }