package papod import ( "bufio" "bytes" "context" "database/sql" "errors" "flag" "fmt" "io" "log/slog" "net" "regexp" "strings" "sync" "time" "cracha" "q" "golite" "guuid" g "gobang" ) const ( defaultPrefix = "papod" rollbackErrorFmt = "rollback error: %w; while executing: %w" NEW_CHANNEL = "new-channel" ) type queryT struct{ write string read string } type queriesT struct{ addNetwork func(guuid.UUID, newNetworkT) (networkT, error) addChannel func(guuid.UUID, newChannelT) (channelT, error) channels func(guuid.UUID, func(channelT) error) error allAfter func(guuid.UUID, func(eventT) error) error addEvent func(string, string) error close func() error } type newNetworkT struct{ uuid guuid.UUID name string } type networkT struct{ id int64 timestamp time.Time uuid guuid.UUID name string createdBy guuid.UUID } type newChannelT struct{ name string uuid guuid.UUID } type channelT struct { id int64 timestamp time.Time uuid guuid.UUID name string } type eventT struct{ id int64 timestamp time.Time uuid guuid.UUID } type papodT struct{ auth cracha.IAuth queue q.IQueue db *sql.DB queries queriesT } type consumerT struct{ topic string handlerFn func(papodT) func(q.Message) error } type connectionT struct { conn net.Conn replyChan chan string lastReadFrom time.Time lastWrittenTo time.Time // id *UUID id string isAuthenticated bool } type userT struct { connections []connectionT } type stateT struct { users map[string]*userT } type contextT struct { db *sql.DB state stateT tx chan int } type messageParamsT struct { middle []string trailing string } type messageT struct { prefix string command string params messageParamsT raw string } type actionType int const ( actionReply actionType = iota ) type action interface { typeOf() actionType } type replyT struct { prefix string command int params messageParamsT } type IPapod interface{ } func tryRollback(db *sql.DB, ctx context.Context, err error) error { _, rollbackErr := db.ExecContext(ctx, "ROLLBACK;") if rollbackErr != nil { return fmt.Errorf( rollbackErrorFmt, rollbackErr, err, ) } return err } // FIXME // See: // https://sqlite.org/forum/forumpost/2507664507 func inTx(db *sql.DB, fn func(context.Context) error) error { ctx := context.Background() _, err := db.ExecContext(ctx, "BEGIN IMMEDIATE;") if err != nil { return err } err = fn(ctx) if err != nil { return tryRollback(db, ctx, err) } _, err = db.ExecContext(ctx, "COMMIT;") if err != nil { return tryRollback(db, ctx, err) } return nil } /// "papod_users".uuid is the same as cracha_users.uuid. Not a foreign key to /// allow them to live in different physical locations. Adding it here feels /// less like an optimization related decision, and more of a coupling one. The /// way that New() works now uses the same databasePath for the q.IQueue *and* /// cracha.IAuth, but cracha in no way exposes where it stores the user UUID or /// how the it is handled. This has similarities to how events here don't /// reference the q.Message.ID via foreign keys either. They're treated only as /// opaque IDs. func createTablesSQL(prefix string) queryT { const tmpl_write = ` CREATE TABLE IF NOT EXISTS "%s_workspaces" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, name TEXT NOT NULL, description TEXT NOT NULL, -- "public", "private", "unlisted" type TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS "%s_workspace_changes ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), workspace_id INTEGER NOT NULL REFERENCES "%s_workspaces"(id), attribute TEXT NOT NULL, value TEXT NOT NULL, op BOOLEAN NOT NULL ); CREATE TABLE IF NOT EXISTS "%s_users" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), -- provided by cracha uuid BLOB NOT NULL UNIQUE, username TEXT NOT NULL, display_name TEXT NOT NULL, picture_uuid BLOB NOT NULL UNIQUE, deleted BOOLEAN NOT NULL ); CREATE TABLE IF NOT EXISTS "%s_user_changes" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), user_id INTEGER NOT NULL REFERENCES "%s_users"(id), attribute TEXT NOT NULL, value TEXT NOT NULL, op BOOLEAN NOT NULL ); CREATE TABLE IF NOT EXISTS "%s_members" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), workspace_id INTEGER NOT NULL REFERENCES "%s_workspaces"(id), user_id INTEGER NOT NULL, username TEXT NOT NULL, display_name TEXT NOT NULL, picture_uuid BLOB NOT NULL UNIQUE, -- "active", "inactive", "removed" status TEXT NOT NULL, -- "active", always active_uniq TEXT, UNIQUE (workspace_id, username, active_uniq), UNIQUE (workspace_id, user_id) ); CREATE TABLE IF NOT EXISTS "%s_member_roles" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, member_id INTEGER NOT NULL REFERENCES "%s_members"(id), role TEXT NOT NULL, UNIQUE (member_id, role) ); CREATE TABLE IF NOT EXISTS "%s_member_changes" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), member_id INTEGER NOT NULL REFERENCES "%s_members"(id), attribute TEXT NOT NULL, value TEXT NOT NULL, op BOOLEAN NOT NULL ); CREATE TABLE IF NOT EXISTS "%s_channels" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, workspace_id INTEGER NOT NULL REFERENCES "%s_workspaces"(id), public_name TEXT UNIQUE, label TEXT NOT NULL, description TEXT, virtual BOOLEAN NOT NULL ); CREATE TABLE IF NOT EXISTS "%s_channel_changes" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), channel_id INTEGER NOT NULL REFERENCES "%s_channels"(id), attribute TEXT NOT NULL, value TEXT NOT NULL, op BOOLEAN NOT NULL ); CREATE TABLE IF NOT EXISTS "%s_participants" ( member_id ); CREATE TABLE IF NOT EXISTS "%s_channel_events" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, channel_id INTEGER NOT NULL REFERENCES "%s"(id), connection_id INTEGER NOT NULL, -- payload FIXME: vary by type? ); -- FIXME: group conversations? -- user: person -- member: workspace user -- participant: channel member ` return queryT{ write: fmt.Sprintf( tmpl_write, prefix, g.SQLiteNow, prefix, g.SQLiteNow, prefix, prefix, g.SQLiteNow, prefix, prefix, g.SQLiteNow, prefix, ), } } func createTables(db *sql.DB, prefix string) error { q := createTablesSQL(prefix) return inTx(db, func(ctx context.Context) error { _, err := db.ExecContext(ctx, q.write) return err }) } // addServer // addWorkspace // addNetwork FIXME func addNetworkSQL(prefix string) queryT { const tmpl_write = ` -- FIXME ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func addNetworkStmt( db *sql.DB, prefix string, ) (func(guuid.UUID, newNetworkT) (networkT, error), func() error, error) { q := addNetworkSQL(prefix) writeStmt, err := db.Prepare(q.write) if err != nil { return nil, nil, err } fn := func( userID guuid.UUID, newNetwork newNetworkT, ) (networkT, error) { network := networkT{ uuid: newNetwork.uuid, name: newNetwork.name, } var timestr string network_id_bytes := newNetwork.uuid[:] user_id_bytes := userID[:] err := writeStmt.QueryRow( network_id_bytes, newNetwork.name, user_id_bytes, ).Scan(&network.id, ×tr) if err != nil { return networkT{}, err } network.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return networkT{}, err } return network, nil } return fn, writeStmt.Close, nil } func addChannelSQL(prefix string) queryT { const tmpl_write = ` -- FIXME ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func addChannelStmt( db *sql.DB, prefix string, ) (func (guuid.UUID, newChannelT) (channelT, error), func() error, error) { q := addChannelSQL(prefix) writeStmt, err := db.Prepare(q.write) if err != nil { return nil, nil, err } fn := func( networkID guuid.UUID, newChannel newChannelT, ) (channelT, error) { channel := channelT{ name: newChannel.name, } var timestr string network_id_bytes := networkID[:] err := writeStmt.QueryRow( network_id_bytes, newChannel.name, ).Scan(&channel.id, ×tr) if err != nil { return channelT{}, err } channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return channelT{}, err } return channel, nil } return fn, nil, nil } func channelEach(rows *sql.Rows, callback func(channelT) error) error { if rows == nil { return nil } for rows.Next() { var ( channel channelT timestr string channel_id_bytes []byte ) err := rows.Scan( &channel.id, ×tr, &channel_id_bytes, ) if err != nil { g.WrapErrors(rows.Close(), err) } channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { g.WrapErrors(rows.Close(), err) } err = callback(channel) if err != nil { g.WrapErrors(rows.Close(), err) } } return g.WrapErrors(rows.Err(), rows.Close()) } func channelsSQL(prefix string) queryT { const tmpl_read = ` -- FIXME ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), } } func channelsStmt( db *sql.DB, prefix string, ) (func(guuid.UUID) (*sql.Rows, error), func() error, error) { q := channelsSQL(prefix) readStmt, err := db.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(workspaceID guuid.UUID) (*sql.Rows, error) { return readStmt.Query(workspaceID) } return fn, readStmt.Close, nil } func eventEach(rows *sql.Rows, callback func(eventT) error) error { if rows == nil { return nil } for rows.Next() { var ( event eventT timestr string event_id_bytes []byte ) err := rows.Scan( &event.id, ×tr, &event_id_bytes, ) if err != nil { return g.WrapErrors(rows.Close(), err) } event.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return g.WrapErrors(rows.Close(), err) } err = callback(event) if err != nil { return g.WrapErrors(rows.Close(), err) } } return g.WrapErrors(rows.Err(), rows.Close()) } func allAfterSQL(prefix string) queryT { const tmpl_read = ` -- FIXME ` return queryT{ read: fmt.Sprintf(tmpl_read, prefix), } } func allAfterStmt( db *sql.DB, prefix string, ) (func (guuid.UUID) (*sql.Rows, error), func() error, error) { q := allAfterSQL(prefix) readStmt, err := db.Prepare(q.read) if err != nil { return nil, nil, err } fn := func(eventID guuid.UUID) (*sql.Rows, error) { return readStmt.Query(eventID) } return fn, readStmt.Close, nil } func addEventSQL(prefix string) queryT { const tmpl_write = ` -- FIXME ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } func addEventStmt( db *sql.DB, prefix string, ) (func (string, string) error, func() error, error) { q := addEventSQL(prefix) writeStmt, err := db.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(type_ string, payload string) error { _, err := writeStmt.Exec(type_, payload) return err } return fn, writeStmt.Close, nil } func initDB( db *sql.DB, prefix string, ) (queriesT, error) { createTablesErr := createTables(db, prefix) // addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceStmt(db, prefix) // addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceQ(db, p) addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix) addChannel, addChannelClose, addChannelErr := addChannelStmt(db, prefix) channels, channelsClose, channelsErr := channelsStmt(db, prefix) allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix) addEvent, addEventClose, addEventErr := addEventStmt(db, prefix) err := g.SomeError( createTablesErr, addNetworkErr, addChannelErr, channelsErr, allAfterErr, addEventErr, ) if err != nil { return queriesT{}, err } close := func() error { return g.SomeFnError( addNetworkClose, addChannelClose, channelsClose, allAfterClose, addEventClose, ) } var connMutex sync.RWMutex return queriesT{ addNetwork: func(a guuid.UUID, b newNetworkT) (networkT, error) { connMutex.RLock() defer connMutex.RUnlock() return addNetwork(a, b) }, addChannel: func( a guuid.UUID, b newChannelT, ) (channelT, error) { connMutex.RLock() defer connMutex.RUnlock() return addChannel(a, b) }, channels: func( a guuid.UUID, callback func(channelT) error, ) error { var ( err error rows *sql.Rows ) { connMutex.RLock() defer connMutex.RUnlock() rows, err = channels(a) } if err != nil { return err } return channelEach(rows, callback) }, allAfter: func( a guuid.UUID, callback func(eventT) error, ) error { var ( err error rows *sql.Rows ) { connMutex.RLock() defer connMutex.RUnlock() rows, err = allAfter(a) } if err != nil { return err } return eventEach(rows, callback) }, addEvent: func(a string, b string) error { connMutex.RLock() defer connMutex.RUnlock() return addEvent(a, b) }, close: func() error { connMutex.Lock() defer connMutex.Unlock() return close() }, }, nil } var consumers = []consumerT{ } func registerConsumers(papod papodT, consumers []consumerT) { for _, consumer := range consumers { papod.queue.Subscribe( consumer.topic, defaultPrefix + "-" + consumer.topic, consumer.handlerFn(papod), ) } } func NewWithPrefix(databasePath string, prefix string) (IPapod, error) { queue, err := q.NewWithPrefix(databasePath, prefix) if err != nil { return papodT{}, err } auth, err := cracha.NewWithPrefix(databasePath, prefix) if err != nil { return papodT{}, err } db, err := sql.Open(golite.DriverName, databasePath) if err != nil { return papodT{}, err } err = g.ValidateSQLTablePrefix(prefix) if err != nil { return papodT{}, err } queries, err := initDB(db, prefix) if err != nil { return papodT{}, err } return papodT{ queue: queue, auth: auth, db: db, queries: queries, }, nil } func New(databasePath string) (IPapod, error) { return NewWithPrefix(databasePath, defaultPrefix) } func (papod papodT) Close() error { return g.WrapErrors( papod.queries.close(), papod.db.Close(), papod.auth.Close(), papod.queue.Close(), ) } // FIXME: reorder var emitActiveConnection = g.MakeGauge("active-connections") var emitNicksInChannel = g.MakeGauge("nicks-in-channel") var emitReceivedMessage = g.MakeCounter("received-message") var emitWriteToClientError = g.MakeCounter("write-to-client") const pingFrequency = time.Duration(30) * time.Second const pongMaxLatency = time.Duration(5) * time.Second var ( cmdUSER = messageT{ command: "USER" } cmdPRIVMSG = messageT{ command: "PRIVMSG" } cmdJOIN = messageT{ command: "JOIN" } ) func splitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) { idx := bytes.Index(data, []byte { '\r', '\n' }) if idx == -1 { return 0, nil, nil } return idx + 2, data[0:idx], nil } func splitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) { advance, token, error := splitOnCRLF(data, atEOF) if len(token) == 0 { return advance, nil, error } return advance, token, error } func splitSpaces(r rune) bool { return r == ' ' } func parseMessageParams(params string) messageParamsT { const sep = " :" var middle string var trailing string idx := strings.Index(params, sep) if idx == -1 { middle = params trailing = "" } else { middle = params[:idx] trailing = params[idx + len(sep):] } return messageParamsT{ middle: strings.FieldsFunc(middle, splitSpaces), trailing: trailing, } } var messageRegex = regexp.MustCompilePOSIX( // //1 2 3 4 `^(:([^ ]+) +)?([a-zA-Z]+) *( .*)$`, ) func parseMessage(rawMessage string) (messageT, error) { var msg messageT components := messageRegex.FindStringSubmatch(rawMessage) if components == nil { return msg, errors.New("Can't parse message") } msg = messageT{ prefix: components[2], command: components[3], params: parseMessageParams(components[4]), raw: rawMessage, } return msg, nil } func handleUnknown(ctx *contextT, msg messageT) { g.Warning( "Unsupported command", "unsupported-command", "command", msg.command, ) var r replyT = replyUnknown r.prefix = "dunno" // return []Action { r } } func handleUSER(ctx *contextT, msg messageT) { fmt.Printf("USER: %#v\n", msg) } func handlePRIVMSG(ctx *contextT, msg messageT) { // . assert no missing params // . write to DB: (after auth) // . channel timeline: message from $USER // . reply to $USER // . broadcast new timeline event to members of the channel stmt, err := ctx.db.Prepare(` INSERT INTO messages (id, sender_id, body, timestamp) VALUES (?, ?, ?, ? ); `) if err != nil { // FIXME: reply error fmt.Println("can't prepare: ", err) return } defer stmt.Close() ret, err := stmt.Exec( guuid.New().String(), "FIXME", "FIXME", time.Now(), ) if err != nil { // FIXME: reply error fmt.Println("xablau can't prepare: ", err) return } fmt.Println("ret: ", ret) } func handleJOIN(ctx *contextT, msg messageT) { fmt.Printf("JOIN: %#v\n", msg) // . write to DB: (after auth) // . $USER now in channel // . channel timeline: $USER joined // . reply to $USER // . broadcast new timeline event to members of the channel } func replyAnonymous() { } func persistMessage(msg messageT) { } func (reply replyT) typeOf() actionType { return actionReply } var ( replyUnknown = replyT{ command: 421, params: messageParamsT{ middle: []string{}, trailing: "Unknown command", }, } ) var commands = map[string]func(*contextT, messageT) { cmdUSER.command: handleUSER, cmdPRIVMSG.command: handlePRIVMSG, cmdJOIN.command: handleJOIN, } func actionFnFor(command string) func(*contextT, messageT) { fn := commands[command] if fn != nil { return fn } return handleUnknown } func processMessage(ctx *contextT, connection *connectionT, rawMessage string) { connection.lastReadFrom = time.Now() msg, err := parseMessage(rawMessage) if err != nil { g.Info( "Error processing message", "process-message", "err", err, ) return } if msg.command == cmdUSER.command { args := msg.params.middle if len(args) == 0 { go replyAnonymous() return } connection.id = args[0] connection.isAuthenticated = true } if !connection.isAuthenticated { go replyAnonymous() return } actionFnFor(msg.command)(ctx, msg) } func readLoop(ctx *contextT, connection *connectionT) { scanner := bufio.NewScanner(connection.conn) scanner.Split(splitOnRawMessage) for scanner.Scan() { processMessage(ctx, connection, scanner.Text()) } } func writeLoop(ctx *contextT, connection *connectionT) { for message := range connection.replyChan { _, err := io.WriteString(connection.conn, message) if err != nil { g.Error( "Failed to send data to user", "user-reply-error", "err", err, ) emitWriteToClientError() continue } connection.lastWrittenTo = time.Now() } emitActiveConnection.Dec() connection.conn.Close() } func kill(ctx *contextT, connection *connectionT) { // lock? delete(ctx.state.users, connection.id) // unlock? close(connection.replyChan) connection.conn.Close() // Ignore errors? } const pingWindow = 30 * time.Second func pingLoop(ctx *contextT, connection *connectionT) { for { time.Sleep(pingWindow) if (time.Since(connection.lastReadFrom) <= pingWindow) { continue } window := connection.lastWrittenTo.Sub(connection.lastReadFrom) if (window <= pingWindow) { connection.replyChan <- "PING" continue } kill(ctx, connection) break } } func handleConnection(ctx *contextT, conn net.Conn) { emitActiveConnection.Inc() // FIXME: WaitGroup here? now := time.Now() connection := connectionT { conn: conn, isAuthenticated: false, lastReadFrom: now, lastWrittenTo: now, } go readLoop(ctx, &connection) go writeLoop(ctx, &connection) go pingLoop(ctx, &connection) } func serverLoop(ctx *contextT, publicSocketPath string) { listener, err := net.Listen("unix", publicSocketPath) g.FatalIf(err) g.Info("IRCd started", "component-up", "component", "ircd") for { conn, err := listener.Accept() if err != nil { g.Warning( "Error accepting a public IRCd connection", "accept-connection", "err", err, ) // conn.Close() // FIXME: is conn nil? continue } // FIXME: where does it get closed go handleConnection(ctx, conn) } } func commandListenerLoop(ctx *contextT, commandSocketPath string) { listener, err := net.Listen("unix", commandSocketPath) g.FatalIf(err) g.Info( "command listener started", "component-up", "component", "command-listener", ) for { conn, err := listener.Accept() if err != nil { g.Warning( "Error accepting a command connection", "accept-command", "err", err, ) continue } defer conn.Close() // TODO: handle commands } } func transactorLoop(ctx *contextT) { g.Info("transactor started", "component-up", "component", "transactor") emitActiveConnection.Inc() for tx := range ctx.tx { fmt.Println(tx) } } func initDB2(databasePath string) *sql.DB { db, err := sql.Open(golite.DriverName, databasePath) g.FatalIf(err) return db } func start(ctx *contextT, publicSocketPath string, commandSocketPath string) { /* buildInfo, ok := debug.ReadBuildInfo() if !ok { g.Fatal(errors.New("error on debug.ReadBuildInfo()")) } */ g.Info("-", "lifecycle-event", "event", "starting-server", /* slog.Group( "go", "version", buildInfo.GoVersion, "settings", buildInfo.Settings, "deps", buildInfo.Deps, ), */ ) var wg sync.WaitGroup bgRun := func(f func()) { wg.Add(1) go func() { f() wg.Done() }() } bgRun(func() { serverLoop(ctx, publicSocketPath) }) bgRun(func() { commandListenerLoop(ctx, commandSocketPath) }) bgRun(func() { transactorLoop(ctx) }) wg.Wait() } func buildContext(databasePath string) *contextT { db := initDB2(databasePath) tx := make(chan int, 100) return &contextT { db: db, tx: tx, } } var ( databasePath = flag.String( "f", "papod.db", "The path to the database file", ) publicSocketPath = flag.String( "s", "papod.public.socket", "The path to the socket that handles the public traffic", ) commandSocketPath = flag.String( "S", "papod.command.socket", "The path to the private IPC commands socket", ) ) func Main() { g.Init(slog.Group( "versions", "cracha", cracha.Version, "q", q.Version, "golite", golite.Version, "guuid", guuid.Version, "gobang", g.Version, "papod", Version, "this", Version, )) flag.Parse() ctx := buildContext(*databasePath) start(ctx, *publicSocketPath, *commandSocketPath) } // FIXME: review usage of g.Fatal()