package papod import ( "bufio" "bytes" "database/sql" "errors" "flag" "fmt" "io" "io/ioutil" "log/slog" "net" "os" "regexp" "runtime/debug" "sort" "strings" "sync" "time" g "euandre.org/gobang/src" _ "github.com/mattn/go-sqlite3" ) /* Global variables */ var ( Hostname string Version string Colour string ) var EmitActiveConnection = g.MakeGauge("active-connections") var EmitNicksInChannel = g.MakeGauge("nicks-in-channel") var EmitReceivedMessage = g.MakeCounter("received-message") var EmitWriteToClientError = g.MakeCounter("write-to-client") const pingFrequency = time.Duration(30) * time.Second const pongMaxLatency = time.Duration(5) * time.Second type Channel struct { } type Context struct { db *sql.DB tx chan int } type Connection struct { conn net.Conn replyChan chan string // id *UUID id string isAuthenticated bool } type MessageParams struct { Middle []string Trailing string } type Message struct { Prefix string Command string Params MessageParams Raw string } var ( CmdUSER = Message { Command: "USER" } CmdPRIVMSG = Message { Command: "PRIVMSG" } CmdJOIN = Message { Command: "JOIN" } ) func SplitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) { idx := bytes.Index(data, []byte { '\r', '\n' }) if idx == -1 { return 0, nil, nil } return idx + 2, data[0:idx], nil } func SplitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) { advance, token, error := SplitOnCRLF(data, atEOF) if len(token) == 0 { return advance, nil, error } return advance, token, error } func SplitSpaces(r rune) bool { return r == ' ' } func ParseMessageParams(params string) MessageParams { const sep = " :" var middle string var trailing string idx := strings.Index(params, sep) if idx == -1 { middle = params trailing = "" } else { middle = params[:idx] trailing = params[idx + len(sep):] } return MessageParams { Middle: strings.FieldsFunc(middle, SplitSpaces), Trailing: trailing, } } var MessageRegex = regexp.MustCompilePOSIX( // //1 2 3 4 `^(:([^ ]+) +)?([a-zA-Z]+) *( .*)$`, ) func ParseMessage(rawMessage string) (Message, error) { var msg Message components := MessageRegex.FindStringSubmatch(rawMessage) if components == nil { return msg, errors.New("Can't parse message") } msg = Message { Prefix: components[2], Command: components[3], Params: ParseMessageParams(components[4]), Raw: rawMessage, } return msg, nil } func HandleUnknown(ctx *Context, msg Message) { g.Warning( "Unsupported command", "unsupported-command", "command", msg.Command, ) var r Reply = ReplyUnknown r.Prefix = "dunno" // return []Action { r } } func HandleUSER(ctx *Context, msg Message) { fmt.Printf("USER: %#v\n", msg) } func HandlePRIVMSG(ctx *Context, msg Message) { // . assert no missing params // . write to DB: (after auth) // . channel timeline: message from $USER // . reply to $USER // . broadcast new timeline event to members of the channel stmt, err := ctx.db.Prepare(` INSERT INTO messages (id, sender_id, body, timestamp) VALUES (?, ?, ?, ? ); `) if err != nil { // FIXME: reply error fmt.Println("can't prepare: ", err) return } defer stmt.Close() ret, err := stmt.Exec(g.NewUUID().ToString(), "FIXME", "FIXME", time.Now()) if err != nil { // FIXME: reply error fmt.Println("xablau can't prepare: ", err) return } fmt.Println("ret: ", ret) } func HandleJOIN(ctx *Context, msg Message) { fmt.Printf("JOIN: %#v\n", msg) // . write to DB: (after auth) // . $USER now in channel // . channel timeline: $USER joined // . reply to $USER // . broadcast new timeline event to members of the channel } func ReplyAnonymous() { } func PersistMessage(msg Message) { } type ActionType int const ( ActionReply = iota ) type Action interface { Type() ActionType } type Reply struct { Prefix string Command int Params MessageParams } func (reply Reply) Type() ActionType { return ActionReply } var ( ReplyUnknown = Reply { Command: 421, Params: MessageParams { Middle: []string { }, Trailing: "Unknown command", }, } ) var Commands = map[string]func(*Context, Message) { CmdUSER.Command: HandleUSER, CmdPRIVMSG.Command: HandlePRIVMSG, CmdJOIN.Command: HandleJOIN, } func ActionFnFor(command string) func(*Context, Message) { fn := Commands[command] if fn != nil { return fn } return HandleUnknown } func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) { msg, err := ParseMessage(rawMessage) if err != nil { g.Info( "Error processing message", "process-message", "err", err, ) return } if msg.Command == CmdUSER.Command { args := msg.Params.Middle if len(args) == 0 { go ReplyAnonymous() return } connection.id = args[0] connection.isAuthenticated = true } if !connection.isAuthenticated { go ReplyAnonymous() return } ActionFnFor(msg.Command)(ctx, msg) } func ReadLoop(ctx *Context, connection *Connection) { scanner := bufio.NewScanner(connection.conn) scanner.Split(SplitOnRawMessage) for scanner.Scan() { ProcessMessage(ctx, connection, scanner.Text()) } } func WriteLoop(ctx *Context, connection *Connection) { for message := range connection.replyChan { _, err := io.WriteString(connection.conn, message) if err != nil { g.Error( "Failed to send data to user", "user-reply-error", "err", err, ) EmitWriteToClientError() continue } } EmitActiveConnection.Dec() connection.conn.Close() } func PingLoop(ctx *Context, connection *Connection) { // fmt.Println("PingLoop") } func HandleConnection(ctx *Context, conn net.Conn) { EmitActiveConnection.Inc() // FIXME: WaitGroup here? connection := Connection { conn: conn, isAuthenticated: false, } go ReadLoop(ctx, &connection) go WriteLoop(ctx, &connection) go PingLoop(ctx, &connection) } func IRCdLoop(ctx *Context, publicSocketPath string) { listener, err := net.Listen("unix", publicSocketPath) g.FatalIf(err) g.Info("IRCd started", "component-up", "component", "ircd") for { conn, err := listener.Accept() if err != nil { g.Warning( "Error accepting a public IRCd connection", "accept-connection", "err", err, ) // conn.Close() // FIXME: is conn nil? continue } go HandleConnection(ctx, conn) // FIXME: where does it get closed } } func CommandListenerLoop(ctx *Context, commandSocketPath string) { listener, err := net.Listen("unix", commandSocketPath) g.FatalIf(err) g.Info("command listener started", "component-up", "component", "command-listener") for { conn, err := listener.Accept() if err != nil { g.Warning( "Error accepting a command connection", "accept-command", "err", err, ) continue } defer conn.Close() // TODO: handle commands } } func TransactorLoop(ctx *Context) { g.Info("transactor started", "component-up", "component", "transactor") EmitActiveConnection.Inc() for tx := range ctx.tx { fmt.Println(tx) } } func SetHostname() { var err error Hostname, err = os.Hostname() g.FatalIf(err) } func SetEnvironmentVariables() { Version = os.Getenv("PAPOD_VERSION") if Version == "" { Version = "PAPOD-VERSION-UNKNOWN" } Colour = os.Getenv("PAPOD_COLOUR") if Colour == "" { Colour = "PAPOD-COLOUR-UNKNOWN" } } func InitMigrations(db *sql.DB) { _, err := db.Exec(` CREATE TABLE IF NOT EXISTS migrations ( filename TEXT PRIMARY KEY ); `) g.FatalIf(err) } const MIGRATIONS_DIR = "src/sql/migrations/" func PendingMigrations(db *sql.DB) []string { files, err := ioutil.ReadDir(MIGRATIONS_DIR) g.FatalIf(err) set := make(map[string]bool) for _, file := range files { set[file.Name()] = true } rows, err := db.Query(`SELECT filename FROM migrations;`) g.FatalIf(err) defer rows.Close() for rows.Next() { var filename string err := rows.Scan(&filename) g.FatalIf(err) delete(set, filename) } g.FatalIf(rows.Err()) difference := make([]string, 0) for filename := range set { difference = append(difference, filename) } sort.Sort(sort.StringSlice(difference)) return difference } func RunMigrations(db *sql.DB) { InitMigrations(db) stmt, err := db.Prepare(`INSERT INTO migrations (filename) VALUES (?);`) g.FatalIf(err) defer stmt.Close() for _, filename := range PendingMigrations(db) { g.Info("Running migration file", "exec-migration-file", "filename", filename, ) tx, err := db.Begin() g.FatalIf(err) sql, err := os.ReadFile(MIGRATIONS_DIR + filename) g.FatalIf(err) _, err = tx.Exec(string(sql)) g.FatalIf(err) _, err = tx.Stmt(stmt).Exec(filename) g.FatalIf(err) err = tx.Commit() g.FatalIf(err) } } func InitDB(databasePath string) *sql.DB { db, err := sql.Open("sqlite3", databasePath) g.FatalIf(err) RunMigrations(db) return db } func Init() { g.Init() SetHostname() SetEnvironmentVariables() } func Start(ctx *Context, publicSocketPath string, commandSocketPath string) { buildInfo, ok := debug.ReadBuildInfo() if !ok { g.Fatal(errors.New("error on debug.ReadBuildInfo()")) } g.Info("-", "lifecycle-event", "event", "starting-server", slog.Group( "go", "version", buildInfo.GoVersion, "settings", buildInfo.Settings, "deps", buildInfo.Deps, ), ) var wg sync.WaitGroup bgRun := func(f func()) { wg.Add(1) go func() { f() wg.Done() }() } bgRun(func() { IRCdLoop(ctx, publicSocketPath) }) bgRun(func() { CommandListenerLoop(ctx, commandSocketPath) }) bgRun(func() { TransactorLoop(ctx) }) wg.Wait() } func BuildContext(databasePath string) *Context { db := InitDB(databasePath) tx := make(chan int, 100) return &Context { db: db, tx: tx, } } var ( databasePath = flag.String( "f", "papod.db", "The path to the database file", ) publicSocketPath = flag.String( "s", "papod.public.socket", "The path to the socket that handles the public traffic", ) commandSocketPath = flag.String( "S", "papod.command.socket", "The path to the private IPC commands socket", ) ) func Main() { Init() flag.Parse() ctx := BuildContext(*databasePath) Start(ctx, *publicSocketPath, *commandSocketPath) }