diff options
Diffstat (limited to 'src/papod.go')
-rw-r--r-- | src/papod.go | 1577 |
1 files changed, 1000 insertions, 577 deletions
diff --git a/src/papod.go b/src/papod.go index 00086c3..4bc5b4d 100644 --- a/src/papod.go +++ b/src/papod.go @@ -5,18 +5,18 @@ import ( "bytes" "database/sql" "errors" - "flag" "fmt" "io" "log/slog" "net" + "os" "regexp" "strings" "sync" "time" "cracha" - "q" + "fiinha" "golite" "guuid" g "gobang" @@ -29,10 +29,19 @@ const ( rollbackErrorFmt = "rollback error: %w; while executing: %w" NEW_CHANNEL = "new-channel" + + pingFrequency = time.Duration(30) * time.Second + pongMaxLatency = time.Duration(5) * time.Second ) +type dbconfigT struct{ + shared *sql.DB + dbpath string + prefix string +} + type queryT struct{ write string read string @@ -62,6 +71,7 @@ type queriesT struct{ names func(guuid.UUID, func(memberT) error) error addEvent func(newEventT) (eventT, error) allAfter func(guuid.UUID, func(eventT) error) error + logMessage func(userT, messageT) error close func() error } @@ -125,7 +135,7 @@ type newChannelT struct{ virtual bool } -type channelT struct { +type channelT struct{ id int64 timestamp time.Time uuid guuid.UUID @@ -152,72 +162,80 @@ type eventT struct{ connectionID guuid.UUID type_ string payload string + previous *eventT + isFist bool } -type papodT struct{ - auth cracha.IAuth - queue q.IQueue - db *sql.DB - queries queriesT +type messageParamsT struct{ + middle []string + trailing string } -type consumerT struct{ - topic string - handlerFn func(papodT) func(q.Message) error +type messageT struct{ + prefix string + command string + params messageParamsT + raw string } -type connectionT struct { - conn net.Conn - replyChan chan string - lastReadFrom time.Time - lastWrittenTo time.Time - // id *UUID - id string - isAuthenticated bool +type replyT struct{ + command string + params messageParamsT } -type userT2 struct { - connections []connectionT +type listenersT struct{ + daemon net.Listener + commander net.Listener + close func() error } -type stateT struct { - users map[string]*userT +type consumerT struct{ + topic string + name string + // FIXME: use generic to avoid circular reference? + handlerFn func(papodT) func(fiinha.Message) error } -type contextT struct { - db *sql.DB - state stateT - tx chan int +type connectionT struct{ + conn net.Conn + uuid guuid.UUID + user *userT } -type messageParamsT struct { - middle []string - trailing string +type receiverT struct{ + send func(messageT) + close func() } -type messageT struct { - prefix string - command string - params messageParamsT - raw string +type receiversT struct{ + add func(receiverT) + remove func(receiverT) + get func(guuid.UUID) []receiverT + close func() } -type actionType int -const ( - actionReply actionType = iota -) - -type action interface { - typeOf() actionType +type metricsT struct{ + activeConnections g.Gauge + nicksInChannel g.Gauge + sendToClientError func(...any) + receivedMessage func(...any) + sentReply func(...any) } -type replyT struct { - prefix string - command int - params messageParamsT +type papodT struct{ + auth cracha.IAuth + queue fiinha.IQueue + queries queriesT + listeners listenersT + consumers []consumerT + receivers receiversT + metrics metricsT + // logger g.Logger } type IPapod interface{ + Start() error + Close() error } @@ -304,11 +322,11 @@ func inTx(db *sql.DB, fn func(*sql.Tx) error) error { /// "papod_users".uuid is the same as cracha_users.uuid. Not a foreign key to /// allow them to live in different physical locations. Adding it here feels /// less like an optimization related decision, and more of a coupling one. The -/// way that New() works now uses the same databasePath for the q.IQueue *and* -/// cracha.IAuth, but cracha in no way exposes where it stores the user UUID or -/// how the it is handled. This has similarities to how events here don't -/// reference the q.Message.ID via foreign keys either. They're treated only as -/// opaque IDs. +/// way that New() works now uses the same databasePath for the fiinha.IQueue +/// *and* cracha.IAuth, but cracha in no way exposes where it stores the user +/// UUID or how the it is handled. This has similarities to how events here +/// don't reference the fiinha.Message.ID via foreign keys either. They're +/// treated only as opaque IDs. func createTablesSQL(prefix string) queryT { const tmpl_write = ` -- FIXME: unconfirmed premise: statements within a trigger are @@ -326,88 +344,88 @@ func createTablesSQL(prefix string) queryT { username TEXT NOT NULL, display_name TEXT NOT NULL, picture_uuid BLOB UNIQUE, - deleted INT NOT NULL + deleted INT NOT NULL CHECK(deleted IN (0, 1)) ) STRICT; - CREATE TABLE IF NOT EXISTS "%s_user_changes" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - user_id INTEGER NOT NULL REFERENCES "%s_users"(id), - attribute TEXT NOT NULL CHECK( - attribute IN ( - 'username', - 'display_name', - 'picture_uuid', - 'deleted' - ) - ), - value TEXT NOT NULL, - op INT NOT NULL CHECK(op IN (0, 1)) - ) STRICT; - CREATE TRIGGER IF NOT EXISTS "%s_user_creation" - AFTER INSERT ON "%s_users" - BEGIN - INSERT INTO "%s_user_changes" ( - user_id, attribute, value, op - ) VALUES - (NEW.id, 'username', NEW.username, true), - (NEW.id, 'display_name', NEW.display_name, true), - (NEW.id, 'deleted', NEW.deleted, true) - ; - END; - CREATE TRIGGER IF NOT EXISTS "%s_user_creation_picture_uuid" - AFTER INSERT ON "%s_users" - WHEN NEW.picture_uuid != NULL - BEGIN - INSERT INTO "%s_user_changes" ( - user_id, attribute, value, op - ) VALUES - (NEW.id, 'picture_uuid', NEW.picture_uuid, true) - ; - END; - CREATE TRIGGER IF NOT EXISTS "%s_user_update_username" - AFTER UPDATE ON "%s_users" - WHEN OLD.username != NEW.username - BEGIN - INSERT INTO "%s_user_changes" ( - user_id, attribute, value, op - ) VALUES - (NEW.id, 'username', OLD.username, false), - (NEW.id, 'username', NEW.username, true) - ; - END; - CREATE TRIGGER IF NOT EXISTS "%s_user_update_display_name" - AFTER UPDATE ON "%s_users" - WHEN OLD.display_name != NEW.display_name - BEGIN - INSERT INTO "%s_user_changes" ( - user_id, attribute, value, op - ) VALUES - (NEW.id, 'display_name', OLD.display_name, false), - (NEW.id, 'display_name', NEW.display_name, true) - ; - END; - CREATE TRIGGER IF NOT EXISTS "%s_user_update_picture_uuid" - AFTER UPDATE ON "%s_users" - WHEN OLD.picture_uuid != NEW.picture_uuid - BEGIN - INSERT INTO "%s_user_changes" ( - user_id, attribute, value, op - ) VALUES - (NEW.id, 'picture_uuid', OLD.picture_uuid, false), - (NEW.id, 'picture_uuid', NEW.picture_uuid, true) - ; - END; - CREATE TRIGGER IF NOT EXISTS "%s_user_update_deleted" - AFTER UPDATE ON "%s_users" - WHEN OLD.deleted != NEW.deleted - BEGIN - INSERT INTO "%s_user_changes" ( - user_id, attribute, value, op - ) VALUES - (NEW.id, 'deleted', OLD.deleted, false), - (NEW.id, 'deleted', NEW.deleted, true) - ; - END; +-- CREATE TABLE IF NOT EXISTS "%s_user_changes" ( +-- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, +-- timestamp TEXT NOT NULL DEFAULT (%s), +-- user_id INTEGER NOT NULL REFERENCES "%s_users"(id), +-- attribute TEXT NOT NULL CHECK( +-- attribute IN ( +-- 'username', +-- 'display_name', +-- 'picture_uuid', +-- 'deleted' +-- ) +-- ), +-- value TEXT NOT NULL, +-- op INT NOT NULL CHECK(op IN (0, 1)) +-- ) STRICT; +-- CREATE TRIGGER IF NOT EXISTS "%s_user_creation" +-- AFTER INSERT ON "%s_users" +-- BEGIN +-- INSERT INTO "%s_user_changes" ( +-- user_id, attribute, value, op +-- ) VALUES +-- (NEW.id, 'username', NEW.username, true), +-- (NEW.id, 'display_name', NEW.display_name, true), +-- (NEW.id, 'deleted', NEW.deleted, true) +-- ; +-- END; +-- CREATE TRIGGER IF NOT EXISTS "%s_user_creation_picture_uuid" +-- AFTER INSERT ON "%s_users" +-- WHEN NEW.picture_uuid != NULL +-- BEGIN +-- INSERT INTO "%s_user_changes" ( +-- user_id, attribute, value, op +-- ) VALUES +-- (NEW.id, 'picture_uuid', NEW.picture_uuid, true) +-- ; +-- END; +-- CREATE TRIGGER IF NOT EXISTS "%s_user_update_username" +-- AFTER UPDATE ON "%s_users" +-- WHEN OLD.username != NEW.username +-- BEGIN +-- INSERT INTO "%s_user_changes" ( +-- user_id, attribute, value, op +-- ) VALUES +-- (NEW.id, 'username', OLD.username, false), +-- (NEW.id, 'username', NEW.username, true) +-- ; +-- END; +-- CREATE TRIGGER IF NOT EXISTS "%s_user_update_display_name" +-- AFTER UPDATE ON "%s_users" +-- WHEN OLD.display_name != NEW.display_name +-- BEGIN +-- INSERT INTO "%s_user_changes" ( +-- user_id, attribute, value, op +-- ) VALUES +-- (NEW.id, 'display_name', OLD.display_name, false), +-- (NEW.id, 'display_name', NEW.display_name, true) +-- ; +-- END; +-- CREATE TRIGGER IF NOT EXISTS "%s_user_update_picture_uuid" +-- AFTER UPDATE ON "%s_users" +-- WHEN OLD.picture_uuid != NEW.picture_uuid +-- BEGIN +-- INSERT INTO "%s_user_changes" ( +-- user_id, attribute, value, op +-- ) VALUES +-- (NEW.id, 'picture_uuid', OLD.picture_uuid, false), +-- (NEW.id, 'picture_uuid', NEW.picture_uuid, true) +-- ; +-- END; +-- CREATE TRIGGER IF NOT EXISTS "%s_user_update_deleted" +-- AFTER UPDATE ON "%s_users" +-- WHEN OLD.deleted != NEW.deleted +-- BEGIN +-- INSERT INTO "%s_user_changes" ( +-- user_id, attribute, value, op +-- ) VALUES +-- (NEW.id, 'deleted', OLD.deleted, false), +-- (NEW.id, 'deleted', NEW.deleted, true) +-- ; +-- END; CREATE TABLE IF NOT EXISTS "%s_networks" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, @@ -506,7 +524,7 @@ func createTablesSQL(prefix string) queryT { UNIQUE (channel_id, member_id) ) STRICT; - -- FIXME: create table for connections? + -- FIXME: create database table for connections? -- A user can have multiple sessions (different browsers, -- mobile, etc.), and each session has multiple connections, as -- the user connects and disconnections using the same session @@ -608,12 +626,11 @@ func createUserSQL(prefix string) queryT { } func createUserStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(newUserT) (userT, error), func() error, error) { - q := createUserSQL(prefix) + q := createUserSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -665,12 +682,11 @@ func userByUUIDSQL(prefix string) queryT { } func userByUUIDStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(guuid.UUID) (userT, error), func() error, error) { - q := userByUUIDSQL(prefix) + q := userByUUIDSQL(cfg.prefix) - readStmt, err := db.Prepare(q.read) + readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } @@ -728,12 +744,11 @@ func updateUserSQL(prefix string) queryT { } func updateUserStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT) error, func() error, error) { - q := updateUserSQL(prefix) + q := updateUserSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -770,12 +785,11 @@ func deleteUserSQL(prefix string) queryT { } func deleteUserStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(guuid.UUID) error, func() error, error) { - q := deleteUserSQL(prefix) + q := deleteUserSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -820,21 +834,22 @@ func addNetworkSQL(prefix string) queryT { ) RETURNING id, timestamp; ` return queryT{ - write: fmt.Sprintf(tmpl_write, prefix, prefix), + write: fmt.Sprintf(tmpl_write, prefix, prefix, prefix, prefix), } } func addNetworkStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT, newNetworkT) (networkT, error), func() error, error) { - q := addNetworkSQL(prefix) + q := addNetworkSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + privateDB, err := sql.Open(golite.DriverName, cfg.dbpath) if err != nil { return nil, nil, err } + writeFn, writeFnClose := execSerialized(q.write, privateDB) + fn := func( user userT, newNetwork newNetworkT, @@ -847,11 +862,26 @@ func addNetworkStmt( type_: newNetwork.type_, } + err := writeFn( + newNetwork.uuid[:], + newNetwork.name, + newNetwork.description, + newNetwork.type_, + user.id, + ) + if err != nil { + return networkT{}, err + } + + return network, nil + + /* member := memberT{ } var timestr string { + FIXME rows, err := writeStmt.Query( newNetwork.uuid[:], newNetwork.name, @@ -913,11 +943,15 @@ func addNetworkStmt( } } } + */ + } - return network, nil + closeFn := func() error { + writeFnClose() + return privateDB.Close() } - return fn, writeStmt.Close, nil + return fn, closeFn, nil } func getNetworkSQL(prefix string) queryT { @@ -971,12 +1005,11 @@ func getNetworkSQL(prefix string) queryT { } func getNetworkStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT, guuid.UUID) (networkT, error), func() error, error) { - q := getNetworkSQL(prefix) + q := getNetworkSQL(cfg.prefix) - readStmt, err := db.Prepare(q.read) + readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } @@ -1051,7 +1084,7 @@ func networkEach(rows *sql.Rows, callback func(networkT) error) error { func networksSQL(prefix string) queryT { const tmpl_read = ` - -- FIXME + -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), @@ -1059,12 +1092,11 @@ func networksSQL(prefix string) queryT { } func networksStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT) (*sql.Rows, error), func() error, error) { - q := networksSQL(prefix) + q := networksSQL(cfg.prefix) - readStmt, err := db.Prepare(q.read) + readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } @@ -1078,6 +1110,7 @@ func networksStmt( func setNetworkSQL(prefix string) queryT { const tmpl_write = ` + -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), @@ -1085,12 +1118,11 @@ func setNetworkSQL(prefix string) queryT { } func setNetworkStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT, networkT) error, func() error, error) { - q := setNetworkSQL(prefix) + q := setNetworkSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1105,6 +1137,7 @@ func setNetworkStmt( func nipNetworkSQL(prefix string) queryT { const tmpl_write = ` + -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), @@ -1112,12 +1145,11 @@ func nipNetworkSQL(prefix string) queryT { } func nipNetworkStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT, guuid.UUID) error, func() error, error) { - q := nipNetworkSQL(prefix) + q := nipNetworkSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1133,20 +1165,19 @@ func nipNetworkStmt( func addMemberSQL(prefix string) queryT { const tmpl_write = ` - -- FIXME + -- FIXME %s ` return queryT{ - write: fmt.Sprintf(tmpl_write), + write: fmt.Sprintf(tmpl_write, prefix), } } func addMemberStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT, networkT, newMemberT) (memberT, error), func() error, error) { - q := addMemberSQL(prefix) + q := addMemberSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1181,6 +1212,7 @@ func addMemberStmt( func showMemberSQL(prefix string) queryT { const tmpl_read = ` + -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), @@ -1188,12 +1220,11 @@ func showMemberSQL(prefix string) queryT { } func showMemberStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT, guuid.UUID) (memberT, error), func() error, error) { - q := showMemberSQL(prefix) + q := showMemberSQL(cfg.prefix) - readStmt, err := db.Prepare(q.read) + readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } @@ -1260,7 +1291,7 @@ func memberEach(rows *sql.Rows, callback func(memberT) error) error { func membersSQL(prefix string) queryT { const tmpl_read = ` - -- FIXME + -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), @@ -1268,12 +1299,11 @@ func membersSQL(prefix string) queryT { } func membersStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT, guuid.UUID) (*sql.Rows, error), func() error, error) { - q := membersSQL(prefix) + q := membersSQL(cfg.prefix) - readStmt, err := db.Prepare(q.read) + readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } @@ -1287,6 +1317,7 @@ func membersStmt( func editMemberSQL(prefix string) queryT { const tmpl_write = ` + -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), @@ -1294,12 +1325,11 @@ func editMemberSQL(prefix string) queryT { } func editMemberStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT, memberT) error, func() error, error) { - q := editMemberSQL(prefix) + q := editMemberSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1314,6 +1344,7 @@ func editMemberStmt( func dropMemberSQL(prefix string) queryT { const tmpl_write = ` + -- FIXME ` return queryT{ write: fmt.Sprintf(tmpl_write), @@ -1321,12 +1352,11 @@ func dropMemberSQL(prefix string) queryT { } func dropMemberStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(userT, guuid.UUID) error, func() error, error) { - q := dropMemberSQL(prefix) + q := dropMemberSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1351,12 +1381,11 @@ func addChannelSQL(prefix string) queryT { } func addChannelStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func (guuid.UUID, newChannelT) (channelT, error), func() error, error) { - q := addChannelSQL(prefix) + q := addChannelSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1434,7 +1463,7 @@ func channelEach(rows *sql.Rows, callback func(channelT) error) error { func channelsSQL(prefix string) queryT { const tmpl_read = ` - -- FIXME + -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), @@ -1442,12 +1471,11 @@ func channelsSQL(prefix string) queryT { } func channelsStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(guuid.UUID) (*sql.Rows, error), func() error, error) { - q := channelsSQL(prefix) + q := channelsSQL(cfg.prefix) - readStmt, err := db.Prepare(q.read) + readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } @@ -1461,6 +1489,7 @@ func channelsStmt( func topicSQL(prefix string) queryT { const tmpl_write = ` + -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), @@ -1468,12 +1497,11 @@ func topicSQL(prefix string) queryT { } func topicStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(channelT) error, func() error, error) { - q := topicSQL(prefix) + q := topicSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1488,6 +1516,7 @@ func topicStmt( func endChannelSQL(prefix string) queryT { const tmpl_write = ` + -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), @@ -1495,12 +1524,11 @@ func endChannelSQL(prefix string) queryT { } func endChannelStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(guuid.UUID) error, func() error, error) { - q := endChannelSQL(prefix) + q := endChannelSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1515,7 +1543,7 @@ func endChannelStmt( func joinSQL(prefix string) queryT { const tmpl_write = ` - -- FIXME + -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), @@ -1523,12 +1551,11 @@ func joinSQL(prefix string) queryT { } func joinStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(guuid.UUID, guuid.UUID) error, func() error, error) { - q := joinSQL(prefix) + q := joinSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1543,7 +1570,7 @@ func joinStmt( func partSQL(prefix string) queryT { const tmpl_write = ` - -- FIXME + -- FIXME %s ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), @@ -1551,12 +1578,11 @@ func partSQL(prefix string) queryT { } func partStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(guuid.UUID, guuid.UUID) error, func() error, error) { - q := partSQL(prefix) + q := partSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1606,7 +1632,7 @@ func nameEach(rows *sql.Rows, callback func(memberT) error) error { func namesSQL(prefix string) queryT { const tmpl_read = ` - -- FIXME + -- FIXME %s ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), @@ -1614,12 +1640,11 @@ func namesSQL(prefix string) queryT { } func namesStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func(guuid.UUID) (*sql.Rows, error), func() error, error) { - q := namesSQL(prefix) + q := namesSQL(cfg.prefix) - readStmt, err := db.Prepare(q.read) + readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } @@ -1649,12 +1674,11 @@ func addEventSQL(prefix string) queryT { } func addEventStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func (newEventT) (eventT, error), func() error, error) { - q := addEventSQL(prefix) + q := addEventSQL(cfg.prefix) - writeStmt, err := db.Prepare(q.write) + writeStmt, err := cfg.shared.Prepare(q.write) if err != nil { return nil, nil, err } @@ -1783,12 +1807,11 @@ func allAfterSQL(prefix string) queryT { } func allAfterStmt( - db *sql.DB, - prefix string, + cfg dbconfigT, ) (func (guuid.UUID) (*sql.Rows, error), func() error, error) { - q := allAfterSQL(prefix) + q := allAfterSQL(cfg.prefix) - readStmt, err := db.Prepare(q.read) + readStmt, err := cfg.shared.Prepare(q.read) if err != nil { return nil, nil, err } @@ -1800,66 +1823,81 @@ func allAfterStmt( return fn, readStmt.Close, nil } +func logMessageSQL(prefix string) queryT{ + const tmpl_write = ` + -- FIXME %s + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func logMessageStmt( + cfg dbconfigT, +) (func(userT, messageT) error, func() error, error) { + q := logMessageSQL(cfg.prefix) + + writeStmt, err := cfg.shared.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(user userT, message messageT) error { + return nil // FIXME + _, err := writeStmt.Exec(user, message) + return err + } + + return fn, writeStmt.Close, nil +} + func initDB( - db *sql.DB, + dbpath string, prefix string, ) (queriesT, error) { - createTablesErr := createTables(db, prefix) - createUser, createUserClose, createUserErr := createUserStmt(db, prefix) - userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(db, prefix) - updateUser, updateUserClose, updateUserErr := updateUserStmt(db, prefix) - deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(db, prefix) - addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix) - getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(db, prefix) - networks, networksClose, networksErr := networksStmt(db, prefix) - setNetwork, setNetworkClose, setNetworkErr := setNetworkStmt(db, prefix) - nipNetwork, nipNetworkClose, nipNetworkErr := nipNetworkStmt(db, prefix) - addMember, addMemberClose, addMemberErr := addMemberStmt(db, prefix) - showMember, showMemberClose, showMemberErr := showMemberStmt(db, prefix) - members, membersClose, membersErr := membersStmt(db, prefix) - editMember, editMemberClose, editMemberErr := editMemberStmt(db, prefix) - dropMember, dropMemberClose, dropMemberErr := dropMemberStmt(db, prefix) - addChannel, addChannelClose, addChannelErr := addChannelStmt(db, prefix) - channels, channelsClose, channelsErr := channelsStmt(db, prefix) - topic, topicClose, topicErr := topicStmt(db, prefix) - endChannel, endChannelClose, endChannelErr := endChannelStmt(db, prefix) - join, joinClose, joinErr := joinStmt(db, prefix) - part, partClose, partErr := partStmt(db, prefix) - names, namesClose, namesErr := namesStmt(db, prefix) - addEvent, addEventClose, addEventErr := addEventStmt(db, prefix) - allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix) - - err := g.SomeError( - createTablesErr, - createUserErr, - userByUUIDErr, - updateUserErr, - deleteUserErr, - addNetworkErr, - getNetworkErr, - networksErr, - setNetworkErr, - nipNetworkErr, - addMemberErr, - showMemberErr, - membersErr, - editMemberErr, - dropMemberErr, - addChannelErr, - channelsErr, - topicErr, - endChannelErr, - joinErr, - partErr, - namesErr, - addEventErr, - allAfterErr, - ) + err := g.ValidateSQLTablePrefix(prefix) if err != nil { return queriesT{}, err } - close := func() error { + shared, err := sql.Open(golite.DriverName, dbpath) + if err != nil { + return queriesT{}, err + } + + cfg := dbconfigT{ + shared: shared, + dbpath: dbpath, + prefix: prefix, + } + + createTablesErr := createTables(shared, prefix) + createUser, createUserClose, createUserErr := createUserStmt(cfg) + userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(cfg) + updateUser, updateUserClose, updateUserErr := updateUserStmt(cfg) + deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(cfg) + addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(cfg) + getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(cfg) + networks, networksClose, networksErr := networksStmt(cfg) + setNetwork, setNetworkClose, setNetworkErr := setNetworkStmt(cfg) + nipNetwork, nipNetworkClose, nipNetworkErr := nipNetworkStmt(cfg) + addMember, addMemberClose, addMemberErr := addMemberStmt(cfg) + showMember, showMemberClose, showMemberErr := showMemberStmt(cfg) + members, membersClose, membersErr := membersStmt(cfg) + editMember, editMemberClose, editMemberErr := editMemberStmt(cfg) + dropMember, dropMemberClose, dropMemberErr := dropMemberStmt(cfg) + addChannel, addChannelClose, addChannelErr := addChannelStmt(cfg) + channels, channelsClose, channelsErr := channelsStmt(cfg) + topic, topicClose, topicErr := topicStmt(cfg) + endChannel, endChannelClose, endChannelErr := endChannelStmt(cfg) + join, joinClose, joinErr := joinStmt(cfg) + part, partClose, partErr := partStmt(cfg) + names, namesClose, namesErr := namesStmt(cfg) + addEvent, addEventClose, addEventErr := addEventStmt(cfg) + allAfter, allAfterClose, allAfterErr := allAfterStmt(cfg) + logMessage, logMessageClose, logMessageErr := logMessageStmt(cfg) + + closeFn := func() error { return g.SomeFnError( createUserClose, userByUUIDClose, @@ -1884,9 +1922,48 @@ func initDB( namesClose, addEventClose, allAfterClose, + logMessageClose, ) } + err = g.SomeError( + createTablesErr, + createUserErr, + userByUUIDErr, + updateUserErr, + deleteUserErr, + addNetworkErr, + getNetworkErr, + networksErr, + setNetworkErr, + nipNetworkErr, + addMemberErr, + showMemberErr, + membersErr, + editMemberErr, + dropMemberErr, + addChannelErr, + channelsErr, + topicErr, + endChannelErr, + joinErr, + partErr, + namesErr, + addEventErr, + allAfterErr, + logMessageErr, + ) + if err != nil { + ferr := g.SomeError( + + createUserErr, + + ) + fmt.Printf("ferr: %#v\n", ferr) + closeFn() + return queriesT{}, err + } + var connMutex sync.RWMutex return queriesT{ createUser: func(a newUserT) (userT, error) { @@ -2078,98 +2155,187 @@ func initDB( return eventEach(rows, callback) }, + logMessage: func(a userT, b messageT) error { + connMutex.Lock() + defer connMutex.Unlock() + return logMessage(a, b) + }, close: func() error { connMutex.Lock() defer connMutex.Unlock() - return close() + return closeFn() }, }, nil } -var consumers = []consumerT{ +func newChannelHandler(papod papodT) func(fiinha.Message) error { + return func(message fiinha.Message) error { + return nil + } } -func registerConsumers(papod papodT, consumers []consumerT) { - for _, consumer := range consumers { - papod.queue.Subscribe( - consumer.topic, - defaultPrefix + "-" + consumer.topic, - consumer.handlerFn(papod), - ) + +func buildConsumers(prefix string) []consumerT { + return []consumerT{ + consumerT{ + topic: NEW_CHANNEL, + name: prefix + NEW_CHANNEL, + handlerFn: newChannelHandler, + }, + } +} + +func unregisterConsumers( + unsubscribeFn func(string, string), + consumers []consumerT, +) { + for _, c := range consumers { + unsubscribeFn(c.topic, c.name) + } +} + +func registerConsumers( + subscribeFn func(string, string, func(fiinha.Message) error) error, + unsubscribeFn func(string, string), + papod papodT, + consumers []consumerT, +) error { + for _, c := range consumers { + err := subscribeFn(c.topic, c.name, c.handlerFn(papod)) + if err != nil { + unregisterConsumers(unsubscribeFn, consumers) + return err + } } + return nil } -func NewWithPrefix(databasePath string, prefix string) (IPapod, error) { - queue, err := q.NewWithPrefix(databasePath, prefix) +func initListeners( + daemonSocketPath string, + commanderSocketPath string, +) (listenersT, error) { + daemon, err := net.Listen("unix", daemonSocketPath) if err != nil { - return papodT{}, err + return listenersT{}, err + } + + commander, err := net.Listen("unix", commanderSocketPath) + if err != nil { + daemon.Close() + return listenersT{}, err + } + + return listenersT{ + daemon: daemon, + commander: commander, + close: func() error { + return g.SomeError( + daemon.Close(), + commander.Close(), + ) + }, + }, nil +} + +func makeReceivers() receiversT { + var rwmutex sync.Mutex + return receiversT{ + add: func(receiver receiverT) { + }, + remove: func(receiver receiverT) { + }, + get: func(guuid.UUID) []receiverT{ + return nil + }, + close: func() { + rwmutex.Lock() + defer rwmutex.Unlock() + }, + } +} + +func buildMetrics(prefix string) metricsT { + return metricsT{ + activeConnections: g.MakeGauge( + "active-connection", + "prefix", prefix, + ), + nicksInChannel: g.MakeGauge( + "nicks-in-channel", + "prefix", prefix, + ), + sendToClientError: g.MakeCounter( + "send-to-client-error", + "prefix", prefix, + ), + receivedMessage: g.MakeCounter( + "received-message", + "prefix", prefix, + ), + sentReply: g.MakeCounter( + "sent-reply", + "prefix", prefix, + ), } +} - auth, err := cracha.NewWithPrefix(databasePath, prefix) +func NewWithPrefix( + databasePath string, + prefix string, + daemonSocketPath string, + commanderSocketPath string, +) (IPapod, error) { + queue, err := fiinha.New(databasePath) if err != nil { return papodT{}, err } - db, err := sql.Open(golite.DriverName, databasePath) + auth, err := cracha.New(databasePath) if err != nil { + queue.Close() return papodT{}, err } - err = g.ValidateSQLTablePrefix(prefix) + listeners, err := initListeners(daemonSocketPath, commanderSocketPath) if err != nil { + queue.Close() + auth.Close() return papodT{}, err } - queries, err := initDB(db, prefix) + queries, err := initDB(databasePath, prefix) if err != nil { + queue.Close() + auth.Close() + listeners.close() return papodT{}, err } + consumers := buildConsumers(prefix) + receivers := makeReceivers() + metrics := buildMetrics(prefix) + // logger := g.NewLogger("prefix", prefix, "program", "papod") + return papodT{ - queue: queue, - auth: auth, - db: db, - queries: queries, + queue: queue, + auth: auth, + queries: queries, + listeners: listeners, + consumers: consumers, + receivers: receivers, + metrics: metrics, + // logger: logger, }, nil } -func New(databasePath string) (IPapod, error) { - return NewWithPrefix(databasePath, defaultPrefix) -} - -func (papod papodT) Close() error { - return g.WrapErrors( - papod.queries.close(), - papod.db.Close(), - papod.auth.Close(), - papod.queue.Close(), +func New(basePath string) (IPapod, error) { + return NewWithPrefix( + basePath + "/papod.db", + defaultPrefix, + basePath + "/papod.daemon.sock", + basePath + "/papod.commander.sock", ) } - - - - - - - - - - -// FIXME: reorder -var emitActiveConnection = g.MakeGauge("active-connections") -var emitNicksInChannel = g.MakeGauge("nicks-in-channel") -var emitReceivedMessage = g.MakeCounter("received-message") -var emitWriteToClientError = g.MakeCounter("write-to-client") - -const pingFrequency = time.Duration(30) * time.Second -const pongMaxLatency = time.Duration(5) * time.Second - -var ( - cmdUSER = messageT{ command: "USER" } - cmdPRIVMSG = messageT{ command: "PRIVMSG" } - cmdJOIN = messageT{ command: "JOIN" } -) - func splitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) { idx := bytes.Index(data, []byte { '\r', '\n' }) if idx == -1 { @@ -2236,92 +2402,372 @@ func parseMessage(rawMessage string) (messageT, error) { return msg, nil } -func handleUnknown(ctx *contextT, msg messageT) { - g.Warning( - "Unsupported command", "unsupported-command", - "command", msg.command, - ) - var r replyT = replyUnknown - r.prefix = "dunno" - // return []Action { r } +func asNewEvent(msg messageT) newEventT { + return newEventT{} } -func handleUSER(ctx *contextT, msg messageT) { - fmt.Printf("USER: %#v\n", msg) +func joinEvent(member memberT) eventT { + return eventT{} } -func handlePRIVMSG(ctx *contextT, msg messageT) { - // . assert no missing params - // . write to DB: (after auth) - // . channel timeline: message from $USER - // . reply to $USER - // . broadcast new timeline event to members of the channel +func asMessage(event eventT) messageT { + return messageT{} +} - stmt, err := ctx.db.Prepare(` - INSERT INTO messages - (id, sender_id, body, timestamp) - VALUES - (?, ?, ?, ? ); - `) - if err != nil { - // FIXME: reply error - fmt.Println("can't prepare: ", err) - return +func asReply(event eventT) replyT { + return replyT{} +} + +func broadcastEvent(event eventT, receiversFn func(guuid.UUID) []receiverT) { + message := asMessage(event) + for _, receiver := range receiversFn(event.channelID) { + // FIXME: + // is this death by a thousand goroutines? Is the runtime + // able to handle the creation and destruction of hundreds of + // thousands of goroutines per second? + go receiver.send(message) } - defer stmt.Close() +} - ret, err := stmt.Exec( - guuid.New().String(), - "FIXME", - "FIXME", - time.Now(), - ) +var ( + replyErrUnknown = replyT{ + command: "421", + params: messageParamsT{ + middle: []string{}, + trailing: "Unknown command", + }, + } + replyErrNotRegistered = replyT{ + command: "451", + params: messageParamsT{ + middle: []string{}, + trailing: "You have not registered", + }, + } + replyErrFileError = replyT{ + command: "424", + params: messageParamsT{ + middle: []string{}, + trailing: "File error doing query on database", + }, + } + RPL_WELCOME = replyT{ + command: "001", + params: messageParamsT{ + middle: []string{}, + trailing: "", + }, + } +) + +func handleUnknown( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + // FIXME: user doesn't exist when unauthenticated + err := papod.queries.logMessage(userT{ }, msg) if err != nil { - // FIXME: reply error - fmt.Println("xablau can't prepare: ", err) - return + g.Warning( + "Failed to log message", fmt.Sprintf("%#v", msg), + "group-as", "db-write", + "handler-action", "log-and-ignore", + "connection", connection.uuid.String(), + "err", err, + ) } - fmt.Println("ret: ", ret) + return []replyT{ replyErrUnknown }, nil } -func handleJOIN(ctx *contextT, msg messageT) { - fmt.Printf("JOIN: %#v\n", msg) - - // . write to DB: (after auth) - // . $USER now in channel - // . channel timeline: $USER joined - // . reply to $USER - // . broadcast new timeline event to members of the channel +func handleUSER( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + u := connection.user.username + m := []string{ u } + return []replyT{ replyT{ + command: "001", + params: messageParamsT{ + middle: m, + trailing: "Welcome to the Internet Relay Network " + u, + }, + }, replyT{ + command: "002", + params: messageParamsT{ + middle: m, + trailing: "Your host is FIXME, running version " + + Version, + }, + }, replyT{ + command: "003", + params: messageParamsT{ + middle: m, + trailing: "This server was create FIXME", + }, + }, replyT{ + command: "004", + params: messageParamsT{ + middle: m, + trailing: "FIXME " + Version + " i x", + }, + }, }, nil } -func replyAnonymous() { +func handleNICK( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + connection.user.username = msg.params.middle[0] + return []replyT{}, nil } -func persistMessage(msg messageT) { +func handlePRIVMSG( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + // FIXME: check missing params + // FIXME: check if user is member of channel, and is authorized to post + // FIXME: adapt to handle multiple targets + + return []replyT{}, nil + + event, err := papod.queries.addEvent(asNewEvent(msg)) + if err != nil { + // FIXME: not allowed reply per RFC 1459, check other specs + return []replyT{ replyErrFileError }, nil + } + + go broadcastEvent(event, papod.receivers.get) + + reply := asReply(event) + return []replyT{ reply }, nil } -func (reply replyT) typeOf() actionType { - return actionReply +func handleTOPIC( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + return []replyT{ replyT{ + command: "JOIN", + params: messageParamsT{ + middle: []string{ msg.params.middle[0] }, + trailing: "", + }, + } }, nil +} + +func handleJOIN( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + u := connection.user.username + channel := msg.params.middle[0] + return []replyT{ replyT{ + command: "JOIN", + params: messageParamsT{ + middle: []string{ channel }, + trailing: "", + }, + }, replyT{ + command: "331", + params: messageParamsT{ + middle: []string{ u, channel }, + trailing: "No topic is set", + }, + }, replyT{ + command: "353", + params: messageParamsT{ + middle: []string{ u, "=", channel }, + trailing: u + " virtualuser", + }, + }, replyT{ + command: "366", + params: messageParamsT{ + middle: []string{ u, channel }, + trailing: "End of NAMES list", + }, + } }, nil + + + member, err := papod.queries.addMember( + *connection.user, + networkT{}, + newMemberT{}, + ) + if err != nil { + // FIXME: not allowed per RFC 1459 + return []replyT{ replyErrFileError }, nil + } + event := joinEvent(member) + + papod.metrics.nicksInChannel.Inc() + + go broadcastEvent(event, papod.receivers.get) + + reply := asReply(event) + return []replyT{ reply }, nil } -var ( - replyUnknown = replyT{ - command: 421, +func handleMODE( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + u := connection.user.username + channel := msg.params.middle[0] + return []replyT{ replyT{ + command: "324", + params: messageParamsT{ + middle: []string{ u, channel, "+Cnst" }, + trailing: "", + }, + } }, nil +} + +func handleWHOIS( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + u := connection.user.username + user := msg.params.middle[0] + return []replyT{ replyT{ + command: "311", + params: messageParamsT{ + middle: []string{ u, user, user, "samehost", "*" }, + trailing: "my real name is: " + user, + }, + }, replyT{ + command: "312", + params: messageParamsT{ + middle: []string{ u, user, "stillsamehost" }, + trailing: "some server info", + }, + }, replyT{ + command: "319", + params: messageParamsT{ + middle: []string{ u, user }, + trailing: "#default", + }, + }, replyT{ + command: "318", + params: messageParamsT{ + middle: []string{ u, user }, + trailing: "End of WHOIS list", + }, + } }, nil +} + +func handleAWAY( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + u := connection.user.username + + if msg.params.trailing == "" { + return []replyT{ replyT{ + command: "305", + params: messageParamsT{ + middle: []string{ u }, + trailing: "You are no longer marked as away", + }, + } }, nil + } else { + return []replyT{ replyT{ + command: "306", + params: messageParamsT{ + middle: []string{ u }, + trailing: "You have been marked as away", + }, + } }, nil + } +} + +func handlePING( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + return []replyT{ { + command: "PONG", params: messageParamsT{ middle: []string{}, - trailing: "Unknown command", + trailing: msg.params.middle[0], }, - } -) + } }, nil +} -var commands = map[string]func(*contextT, messageT) { - cmdUSER.command: handleUSER, - cmdPRIVMSG.command: handlePRIVMSG, - cmdJOIN.command: handleJOIN, +func handleQUIT( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + connection.conn.Close() + return []replyT{}, nil } -func actionFnFor(command string) func(*contextT, messageT) { +func handleCAP( + papod papodT, + connection *connectionT, + msg messageT, +) ([]replyT, error) { + if msg.params.middle[0] == "END" { + return nil, nil + } + + return []replyT{ replyT{ + command: "CAP", + params: messageParamsT{ + middle: []string { "*", "LS" }, + trailing: "", + }, + } }, nil +} + +func authRequired( + fn func(papodT, *connectionT, messageT) ([]replyT, error), +) func(papodT, *connectionT, messageT) ([]replyT, error) { + return func( + papod papodT, + connection *connectionT, + message messageT, + ) ([]replyT, error) { + if connection.user == nil { + return []replyT{ replyErrNotRegistered }, nil + } + + return fn(papod, connection, message) + } +} + +var commands = map[string]func( + papodT, + *connectionT, + messageT, +) ([]replyT, error) { + "USER": handleUSER, + "NICK": handleNICK, + "QUIT": handleQUIT, + "CAP": handleCAP, + "AWAY": authRequired(handleAWAY), + "PRIVMSG": authRequired(handlePRIVMSG), + "PING": authRequired(handlePING), + "JOIN": authRequired(handleJOIN), + "MODE": authRequired(handleMODE), + "TOPIC": authRequired(handleTOPIC), + "WHOIS": authRequired(handleWHOIS), +} + +func actionFnFor( + command string, +) func(papodT, *connectionT, messageT) ([]replyT, error) { fn := commands[command] if fn != nil { return fn @@ -2330,244 +2776,221 @@ func actionFnFor(command string) func(*contextT, messageT) { return handleUnknown } -func processMessage(ctx *contextT, connection *connectionT, rawMessage string) { - connection.lastReadFrom = time.Now() +func replyString(reply replyT) string { + if reply.params.trailing == "" { + return fmt.Sprintf( + "%s %s\r\n", + reply.command, + strings.Join(reply.params.middle, " "), + ) + } else { + return fmt.Sprintf( + "%s %s :%s\r\n", + reply.command, + strings.Join(reply.params.middle, " "), + reply.params.trailing, + ) + } +} +func processMessage(papod papodT, connection *connectionT, rawMessage string) { msg, err := parseMessage(rawMessage) if err != nil { g.Info( "Error processing message", "process-message", + "text", rawMessage, "err", err, ) return } - if msg.command == cmdUSER.command { - args := msg.params.middle - if len(args) == 0 { - go replyAnonymous() - return - } - connection.id = args[0] - connection.isAuthenticated = true - } - - if !connection.isAuthenticated { - go replyAnonymous() - return - } - - actionFnFor(msg.command)(ctx, msg) -} + papod.metrics.receivedMessage( + "message", fmt.Sprintf("%#v", msg), + "text", rawMessage, + ) -func readLoop(ctx *contextT, connection *connectionT) { - scanner := bufio.NewScanner(connection.conn) - scanner.Split(splitOnRawMessage) - for scanner.Scan() { - processMessage(ctx, connection, scanner.Text()) + var replyErrors []error + replies, actionErr := actionFnFor(msg.command)(papod, connection, msg) + for _, reply := range replies { + text := replyString(reply) + _, err = io.WriteString(connection.conn, text) + if err != nil { + replyErrors = append(replyErrors, err) + } + papod.metrics.sentReply( + "message", rawMessage, + "reply", fmt.Sprintf("%#v", reply), + "text", text, + ) } -} -func writeLoop(ctx *contextT, connection *connectionT) { - for message := range connection.replyChan { - _, err := io.WriteString(connection.conn, message) - if err != nil { - g.Error( - "Failed to send data to user", - "user-reply-error", + if actionErr != nil || len(replyErrors) != 0 { + if actionErr != nil { + g.Info( + "Handler returned error", "handler-error", + "from", "daemon", "err", err, ) - emitWriteToClientError() - continue } - connection.lastWrittenTo = time.Now() - } - - emitActiveConnection.Dec() - connection.conn.Close() -} - -func kill(ctx *contextT, connection *connectionT) { - // lock? - delete(ctx.state.users, connection.id) - // unlock? - close(connection.replyChan) - connection.conn.Close() // Ignore errors? -} - -const pingWindow = 30 * time.Second -func pingLoop(ctx *contextT, connection *connectionT) { - for { - time.Sleep(pingWindow) - if (time.Since(connection.lastReadFrom) <= pingWindow) { - continue - } - window := connection.lastWrittenTo.Sub(connection.lastReadFrom) - if (window <= pingWindow) { - connection.replyChan <- "PING" - continue + if len(replyErrors) != 0 { + g.Info( + "Failed to send reply", "send-reply-error", + "from", "daemon", + "err", replyErrors, + ) } - kill(ctx, connection) - break + // FIXME: Close the connection } } -func handleConnection(ctx *contextT, conn net.Conn) { - emitActiveConnection.Inc() - // FIXME: WaitGroup here? - now := time.Now() - connection := connectionT { +func handleConnection(papod papodT, conn net.Conn) { + connection := connectionT{ conn: conn, - isAuthenticated: false, - lastReadFrom: now, - lastWrittenTo: now, + uuid: guuid.New(), + // user: nil, // FIXME: SASL shenanigan probably goes here + user: &userT{}, + } + scanner := bufio.NewScanner(conn) + scanner.Split(splitOnRawMessage) + for scanner.Scan() { + processMessage(papod, &connection, scanner.Text()) } - go readLoop(ctx, &connection) - go writeLoop(ctx, &connection) - go pingLoop(ctx, &connection) } -func serverLoop(ctx *contextT, publicSocketPath string) { - listener, err := net.Listen("unix", publicSocketPath) - g.FatalIf(err) - g.Info("IRCd started", "component-up", "component", "ircd") +func handleCommand(papod papodT, conn net.Conn) { + // FIXME +} +func daemonLoop(papod papodT) { for { - conn, err := listener.Accept() + conn, err := papod.listeners.daemon.Accept() if err != nil { + if errors.Is(err, net.ErrClosed) { + break + } + g.Warning( - "Error accepting a public IRCd connection", + "Error accepting daemon connection", "accept-connection", + "from", "daemon", "err", err, ) - // conn.Close() // FIXME: is conn nil? continue } // FIXME: where does it get closed - go handleConnection(ctx, conn) + go handleConnection(papod, conn) } } -func commandListenerLoop(ctx *contextT, commandSocketPath string) { - listener, err := net.Listen("unix", commandSocketPath) - g.FatalIf(err) - g.Info( - "command listener started", - "component-up", - "component", "command-listener", - ) - +func commanderLoop(papod papodT) { for { - conn, err := listener.Accept() + conn, err := papod.listeners.commander.Accept() if err != nil { + if errors.Is(err, net.ErrClosed) { + break + } + g.Warning( - "Error accepting a command connection", - "accept-command", + "Error accepting commander connection", + "accept-connection", + "from", "commander", "err", err, ) continue } - defer conn.Close() - - // TODO: handle commands + go handleCommand(papod, conn) } } -func transactorLoop(ctx *contextT) { - g.Info("transactor started", "component-up", "component", "transactor") - emitActiveConnection.Inc() - - for tx := range ctx.tx { - fmt.Println(tx) +func mkbgrun() (func(func()), func()) { + var wg sync.WaitGroup + bgrun := func(f func()) { + wg.Add(1) + go func() { + f() + wg.Done() + }() } + return bgrun, wg.Wait } -func initDB2(databasePath string) *sql.DB { - db, err := sql.Open(golite.DriverName, databasePath) - g.FatalIf(err) - return db -} - -func start(ctx *contextT, publicSocketPath string, commandSocketPath string) { - /* - buildInfo, ok := debug.ReadBuildInfo() - if !ok { - g.Fatal(errors.New("error on debug.ReadBuildInfo()")) +func (papod papodT) Start() error { + err := registerConsumers( + papod.queue.Subscribe, + papod.queue.Unsubscribe, + papod, + papod.consumers, + ) + if err != nil { + return err } - */ - g.Info("-", "lifecycle-event", + // FIXME: papod.logger.Info + g.Info("Starting service", "lifecycle-event", "event", "starting-server", - /* slog.Group( - "go", - "version", buildInfo.GoVersion, - "settings", buildInfo.Settings, - "deps", buildInfo.Deps, + "versions", + "gobang", g.Version, + "cracha", cracha.Version, + "fiinha", fiinha.Version, + "golite", golite.Version, + "guuid", guuid.Version, + "papod", Version, + "this", Version, ), - */ ) - var wg sync.WaitGroup - bgRun := func(f func()) { - wg.Add(1) - go func() { - f() - wg.Done() - }() - } - bgRun(func() { serverLoop(ctx, publicSocketPath) }) - bgRun(func() { commandListenerLoop(ctx, commandSocketPath) }) - bgRun(func() { transactorLoop(ctx) }) - wg.Wait() + run, wait := mkbgrun() + run(func() { daemonLoop(papod) }) + run(func() { commanderLoop(papod) }) + wait() + + return nil } -func buildContext(databasePath string) *contextT { - db := initDB2(databasePath) - tx := make(chan int, 100) - return &contextT { - db: db, - tx: tx, +func (papod papodT) Close() error { + // FIXME: does this wait for current handlers to wait? Well, it should. + unregisterConsumers(papod.queue.Unsubscribe, papod.consumers) + return g.WrapErrors( + papod.listeners.close(), + // papod.connCloser.closeAll(), + papod.auth.Close(), + papod.queue.Close(), + papod.queries.close(), + ) +} + +func basePathFrom(args []string) (string, error) { + if len(args) < 2 { + return os.Getwd() + } else { + return args[1], nil } } -var ( - databasePath = flag.String( - "f", - "papod.db", - "The path to the database file", - ) - publicSocketPath = flag.String( - "s", - "papod.public.socket", - "The path to the socket that handles the public traffic", - ) - commandSocketPath = flag.String( - "S", - "papod.command.socket", - "The path to the private IPC commands socket", - ) -) func Main() { - g.Init(slog.Group( - "versions", - "cracha", cracha.Version, - "q", q.Version, - "golite", golite.Version, - "guuid", guuid.Version, - "gobang", g.Version, - "papod", Version, - "this", Version, - )) - flag.Parse() - ctx := buildContext(*databasePath) - start(ctx, *publicSocketPath, *commandSocketPath) -} - -// FIXME: review usage of g.Fatal() -// https://gist.github.com/xero/2d6e4b061b4ecbeb9f99 + g.Init("program", "papod") + + basePath, err := basePathFrom(os.Args) + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } + + ipapod, err := New(basePath) + if err != nil { + fmt.Printf("Failed to create papod: %v\n", err) + os.Exit(1) + } + + err = ipapod.Start() + if err != nil { + fmt.Printf("Failed to start: %v\n", err) + os.Exit(1) + } +} |