package papod import ( "bufio" "bytes" "database/sql" "errors" "flag" "fmt" "io" "io/ioutil" "log/slog" "net" "os" "regexp" "sort" "strings" "sync" "time" "gracha" "q" // "golite" "guuid" g "gobang" ) type tablesT struct{ servers string users string channels string channelEvents string } type queriesT struct{ addChannel func(string) error eventsAfter func([]byte, func(eventT) error) error addEvent func(string, string) error close func() error } type eventT struct{ id int64 } type PapoD struct{ queries queriesT auth gracha.IAuth queue q.IQueue } type consumerT struct{ topic string handlerFn func(PapoD) func(q.Message) error } const ( NEW_CHANNEL_EVENT = "new-channel-event" defaultPrefix = "papod" ) func tablesFrom(prefix string) (tablesT, error) { err := g.ValidateSQLTablePrefix(prefix) if err != nil { return tablesT{}, err } servers := prefix + "-servers" users := prefix + "-users" channels := prefix + "-channels" channelEvents := prefix + "-channel-events" return tablesT{ servers: servers, users: users, channels: channels, channelEvents: channelEvents, }, nil } func createTables(db *sql.DB, tables tablesT) error { const tmpl = ` BEGIN TRANSACTION; CREATE TABLE IF NOT EXISTS "%s" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, name TEXT NOT NULL, description TEXT NOT NULL, public BOOLEAN NOT NULL, metadata TEXT ); CREATE TABLE IF NOT EXISTS "%s" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, server_id INTEGER NOT NULL REFERENCES "%s"(id), authuser_uuid BLOB NOT NULL UNIQUE, human BOOLEAN NOT NULL, visible BOOLEAN NOT NULL, enabled BOOLEAN NOT NULL, metadata TEXT ); CREATE TABLE IF NOT EXISTS "%s" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, server_id INTEGER NOT NULL REFERENCES "%s"(id), name TEXT, description TEXT, virtual BOOLEAN NOT NULL, metadata TEXT ); -- FIXME: group conversations? CREATE TABLE IF NOT EXISTS "%s" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, channel_id INTEGER NOT NULL REFERENCES "%s"(id), connection_id INTEGER NOT NULL -- payload FIXME: vary by type? ); -- FIXME:indexes COMMIT TRANSACTION; ` sql := fmt.Sprintf( tmpl, tables.servers, g.SQLiteNow, tables.users, g.SQLiteNow, tables.servers, tables.channels, g.SQLiteNow, tables.servers, tables.channelEvents, g.SQLiteNow, tables.channels, ) fmt.Println(sql) /// _, err := db.Exec(sql) return err } func addChannelQuery( db *sql.DB, tables tablesT, ) (func (string) error, func() error, error) { const tmpl = ` ` sql := fmt.Sprintf(tmpl) /// fmt.Println(sql) /// stmt, err := db.Prepare(sql) if err != nil { return nil, nil, err } fn := func(name string) error { _, err := stmt.Exec(name) return err } return fn, nil, nil } func eventEach( stmt *sql.Stmt, uuid []byte, callback func(eventT) error, ) error { rows, err := stmt.Query(uuid) if err != nil { return err } defer rows.Close() for rows.Next() { var event eventT err = rows.Scan( &event.id, ) if err != nil { return err } err = callback(event) if err != nil { return err } } return rows.Err() } func eventsAfterQuery( db *sql.DB, tables tablesT, ) (func ([]byte, func(eventT) error) error, func() error, error) { const tmpl = ` -- INSERT ` sql := fmt.Sprintf(tmpl) /// fmt.Println(sql) /// stmt, err := db.Prepare(sql) if err != nil { return nil, nil, err } fn := func(uuid []byte, callback func(eventT) error) error { return eventEach(stmt, uuid, callback) } return fn, stmt.Close, nil } func addEventQuery( db *sql.DB, tables tablesT, ) (func (string, string) error, func() error, error) { const tmpl = ` ` sql := fmt.Sprintf(tmpl) /// fmt.Println(sql) /// stmt, err := db.Prepare(sql) if err != nil { return nil, nil, err } fn := func(type_ string, payload string) error { _, err := stmt.Exec(type_, payload) return err } return fn, stmt.Close, nil } func initDB(db *sql.DB, tables tablesT) (queriesT, error) { createTablesErr := createTables(db, tables) addChannel, addChannelClose, addChannelErr := addChannelQuery(db, tables) eventsAfter, eventsAfterClose, eventsAfterErr := eventsAfterQuery(db, tables) addEvent, addEventClose, addEventErr := addEventQuery(db, tables) err := g.SomeError( createTablesErr, addChannelErr, eventsAfterErr, addEventErr, ) if err != nil { return queriesT{}, err } close := func() error { return g.SomeFnError( addChannelClose, eventsAfterClose, addEventClose, ) } return queriesT{ addChannel: addChannel, eventsAfter: eventsAfter, addEvent: addEvent, close: close, }, nil } func (papod PapoD) Close() error { return papod.queries.close() } var consumers = []consumerT{ } func registerConsumers(papod PapoD, consumers []consumerT) { for _, consumer := range consumers { papod.queue.Subscribe( consumer.topic, defaultPrefix + "-" + consumer.topic, consumer.handlerFn(papod), ) } } func NewWithPrefix(db *sql.DB, queue q.IQueue, prefix string) (PapoD, error) { tables, err := tablesFrom(prefix) if err != nil { return PapoD{}, err } queries, err := initDB(db, tables) if err != nil { return PapoD{}, err } return PapoD{ queries: queries, queue: queue, }, nil } func New(db *sql.DB, queue q.IQueue) (PapoD, error) { return NewWithPrefix(db, queue, defaultPrefix) } // Global variables var Colour string func SetEnvironmentVariables() { Colour = os.Getenv("PAPOD_COLOUR") if Colour == "" { Colour = "PAPOD-COLOUR-UNKNOWN" } } // FIXME: reorder var EmitActiveConnection = g.MakeGauge("active-connections") var EmitNicksInChannel = g.MakeGauge("nicks-in-channel") var EmitReceivedMessage = g.MakeCounter("received-message") var EmitWriteToClientError = g.MakeCounter("write-to-client") const pingFrequency = time.Duration(30) * time.Second const pongMaxLatency = time.Duration(5) * time.Second type Channel struct { } type Connection struct { conn net.Conn replyChan chan string lastReadFrom time.Time lastWrittenTo time.Time // id *UUID id string isAuthenticated bool } type User struct { connections []Connection } type State struct { users map[string]*User } type Context struct { db *sql.DB state State tx chan int } type MessageParams struct { Middle []string Trailing string } type Message struct { Prefix string Command string Params MessageParams Raw string } var ( CmdUSER = Message { Command: "USER" } CmdPRIVMSG = Message { Command: "PRIVMSG" } CmdJOIN = Message { Command: "JOIN" } ) func SplitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) { idx := bytes.Index(data, []byte { '\r', '\n' }) if idx == -1 { return 0, nil, nil } return idx + 2, data[0:idx], nil } func SplitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) { advance, token, error := SplitOnCRLF(data, atEOF) if len(token) == 0 { return advance, nil, error } return advance, token, error } func SplitSpaces(r rune) bool { return r == ' ' } func ParseMessageParams(params string) MessageParams { const sep = " :" var middle string var trailing string idx := strings.Index(params, sep) if idx == -1 { middle = params trailing = "" } else { middle = params[:idx] trailing = params[idx + len(sep):] } return MessageParams { Middle: strings.FieldsFunc(middle, SplitSpaces), Trailing: trailing, } } var MessageRegex = regexp.MustCompilePOSIX( // //1 2 3 4 `^(:([^ ]+) +)?([a-zA-Z]+) *( .*)$`, ) func ParseMessage(rawMessage string) (Message, error) { var msg Message components := MessageRegex.FindStringSubmatch(rawMessage) if components == nil { return msg, errors.New("Can't parse message") } msg = Message { Prefix: components[2], Command: components[3], Params: ParseMessageParams(components[4]), Raw: rawMessage, } return msg, nil } func HandleUnknown(ctx *Context, msg Message) { g.Warning( "Unsupported command", "unsupported-command", "command", msg.Command, ) var r Reply = ReplyUnknown r.Prefix = "dunno" // return []Action { r } } func HandleUSER(ctx *Context, msg Message) { fmt.Printf("USER: %#v\n", msg) } func HandlePRIVMSG(ctx *Context, msg Message) { // . assert no missing params // . write to DB: (after auth) // . channel timeline: message from $USER // . reply to $USER // . broadcast new timeline event to members of the channel stmt, err := ctx.db.Prepare(` INSERT INTO messages (id, sender_id, body, timestamp) VALUES (?, ?, ?, ? ); `) if err != nil { // FIXME: reply error fmt.Println("can't prepare: ", err) return } defer stmt.Close() ret, err := stmt.Exec( guuid.New().String(), "FIXME", "FIXME", time.Now(), ) if err != nil { // FIXME: reply error fmt.Println("xablau can't prepare: ", err) return } fmt.Println("ret: ", ret) } func HandleJOIN(ctx *Context, msg Message) { fmt.Printf("JOIN: %#v\n", msg) // . write to DB: (after auth) // . $USER now in channel // . channel timeline: $USER joined // . reply to $USER // . broadcast new timeline event to members of the channel } func ReplyAnonymous() { } func PersistMessage(msg Message) { } type ActionType int const ( ActionReply = iota ) type Action interface { Type() ActionType } type Reply struct { Prefix string Command int Params MessageParams } func (reply Reply) Type() ActionType { return ActionReply } var ( ReplyUnknown = Reply { Command: 421, Params: MessageParams { Middle: []string { }, Trailing: "Unknown command", }, } ) var Commands = map[string]func(*Context, Message) { CmdUSER.Command: HandleUSER, CmdPRIVMSG.Command: HandlePRIVMSG, CmdJOIN.Command: HandleJOIN, } func ActionFnFor(command string) func(*Context, Message) { fn := Commands[command] if fn != nil { return fn } return HandleUnknown } func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) { connection.lastReadFrom = time.Now() msg, err := ParseMessage(rawMessage) if err != nil { g.Info( "Error processing message", "process-message", "err", err, ) return } if msg.Command == CmdUSER.Command { args := msg.Params.Middle if len(args) == 0 { go ReplyAnonymous() return } connection.id = args[0] connection.isAuthenticated = true } if !connection.isAuthenticated { go ReplyAnonymous() return } ActionFnFor(msg.Command)(ctx, msg) } func ReadLoop(ctx *Context, connection *Connection) { scanner := bufio.NewScanner(connection.conn) scanner.Split(SplitOnRawMessage) for scanner.Scan() { ProcessMessage(ctx, connection, scanner.Text()) } } func WriteLoop(ctx *Context, connection *Connection) { for message := range connection.replyChan { _, err := io.WriteString(connection.conn, message) if err != nil { g.Error( "Failed to send data to user", "user-reply-error", "err", err, ) EmitWriteToClientError() continue } connection.lastWrittenTo = time.Now() } EmitActiveConnection.Dec() connection.conn.Close() } func Kill(ctx *Context, connection *Connection) { // lock? delete(ctx.state.users, connection.id) // unlock? close(connection.replyChan) connection.conn.Close() // Ignore errors? } const PingWindow = 30 * time.Second func PingLoop(ctx *Context, connection *Connection) { for { time.Sleep(PingWindow) if (time.Since(connection.lastReadFrom) <= PingWindow) { continue } window := connection.lastWrittenTo.Sub(connection.lastReadFrom) if (window <= PingWindow) { connection.replyChan <- "PING" continue } Kill(ctx, connection) break } } func HandleConnection(ctx *Context, conn net.Conn) { EmitActiveConnection.Inc() // FIXME: WaitGroup here? now := time.Now() connection := Connection { conn: conn, isAuthenticated: false, lastReadFrom: now, lastWrittenTo: now, } go ReadLoop(ctx, &connection) go WriteLoop(ctx, &connection) go PingLoop(ctx, &connection) } func IRCdLoop(ctx *Context, publicSocketPath string) { listener, err := net.Listen("unix", publicSocketPath) g.FatalIf(err) g.Info("IRCd started", "component-up", "component", "ircd") for { conn, err := listener.Accept() if err != nil { g.Warning( "Error accepting a public IRCd connection", "accept-connection", "err", err, ) // conn.Close() // FIXME: is conn nil? continue } // FIXME: where does it get closed go HandleConnection(ctx, conn) } } func CommandListenerLoop(ctx *Context, commandSocketPath string) { listener, err := net.Listen("unix", commandSocketPath) g.FatalIf(err) g.Info( "command listener started", "component-up", "component", "command-listener", ) for { conn, err := listener.Accept() if err != nil { g.Warning( "Error accepting a command connection", "accept-command", "err", err, ) continue } defer conn.Close() // TODO: handle commands } } func TransactorLoop(ctx *Context) { g.Info("transactor started", "component-up", "component", "transactor") EmitActiveConnection.Inc() for tx := range ctx.tx { fmt.Println(tx) } } func InitMigrations(db *sql.DB) { _, err := db.Exec(` CREATE TABLE IF NOT EXISTS migrations ( filename TEXT PRIMARY KEY ); `) g.FatalIf(err) } const MIGRATIONS_DIR = "src/sql/migrations/" func PendingMigrations(db *sql.DB) []string { files, err := ioutil.ReadDir(MIGRATIONS_DIR) g.FatalIf(err) set := make(map[string]bool) for _, file := range files { set[file.Name()] = true } rows, err := db.Query(`SELECT filename FROM migrations;`) g.FatalIf(err) defer rows.Close() for rows.Next() { var filename string err := rows.Scan(&filename) g.FatalIf(err) delete(set, filename) } g.FatalIf(rows.Err()) difference := make([]string, 0) for filename := range set { difference = append(difference, filename) } sort.Sort(sort.StringSlice(difference)) return difference } func RunMigrations(db *sql.DB) { InitMigrations(db) stmt, err := db.Prepare(`INSERT INTO migrations (filename) VALUES (?);`) g.FatalIf(err) defer stmt.Close() for _, filename := range PendingMigrations(db) { g.Info("Running migration file", "exec-migration-file", "filename", filename, ) tx, err := db.Begin() g.FatalIf(err) sql, err := os.ReadFile(MIGRATIONS_DIR + filename) g.FatalIf(err) _, err = tx.Exec(string(sql)) g.FatalIf(err) _, err = tx.Stmt(stmt).Exec(filename) g.FatalIf(err) err = tx.Commit() g.FatalIf(err) } } func InitDB(databasePath string) *sql.DB { db, err := sql.Open("sqlite3", databasePath) g.FatalIf(err) // RunMigrations(db) return db } func Init() { g.Init(slog.Group( "versions", "gobang", g.Version, // "golite", golite.Version, "this", Version, )) SetEnvironmentVariables() } func Start(ctx *Context, publicSocketPath string, commandSocketPath string) { /* buildInfo, ok := debug.ReadBuildInfo() if !ok { g.Fatal(errors.New("error on debug.ReadBuildInfo()")) } */ g.Info("-", "lifecycle-event", "event", "starting-server", /* slog.Group( "go", "version", buildInfo.GoVersion, "settings", buildInfo.Settings, "deps", buildInfo.Deps, ), */ ) var wg sync.WaitGroup bgRun := func(f func()) { wg.Add(1) go func() { f() wg.Done() }() } bgRun(func() { IRCdLoop(ctx, publicSocketPath) }) bgRun(func() { CommandListenerLoop(ctx, commandSocketPath) }) bgRun(func() { TransactorLoop(ctx) }) wg.Wait() } func BuildContext(databasePath string) *Context { db := InitDB(databasePath) tx := make(chan int, 100) return &Context { db: db, tx: tx, } } var ( databasePath = flag.String( "f", "papod.db", "The path to the database file", ) publicSocketPath = flag.String( "s", "papod.public.socket", "The path to the socket that handles the public traffic", ) commandSocketPath = flag.String( "S", "papod.command.socket", "The path to the private IPC commands socket", ) ) func Main() { Init() flag.Parse() ctx := BuildContext(*databasePath) Start(ctx, *publicSocketPath, *commandSocketPath) }