diff options
Diffstat (limited to 'src/papod.go')
-rw-r--r-- | src/papod.go | 583 |
1 files changed, 583 insertions, 0 deletions
diff --git a/src/papod.go b/src/papod.go new file mode 100644 index 0000000..91e1145 --- /dev/null +++ b/src/papod.go @@ -0,0 +1,583 @@ +package papod + +import ( + "bufio" + "bytes" + "database/sql" + "errors" + "flag" + "fmt" + "io" + "io/ioutil" + "net" + "os" + "regexp" + "sort" + "strings" + "sync" + "time" + + g "gobang" + _ "golite" +) + + + + + +// Global variables +var ( + Version string + Colour string +) + + + +func SetEnvironmentVariables() { + Version = os.Getenv("PAPOD_VERSION") + if Version == "" { + Version = "PAPOD-VERSION-UNKNOWN" + } + + Colour = os.Getenv("PAPOD_COLOUR") + if Colour == "" { + Colour = "PAPOD-COLOUR-UNKNOWN" + } +} + + +// FIXME: reorder +var EmitActiveConnection = g.MakeGauge("active-connections") +var EmitNicksInChannel = g.MakeGauge("nicks-in-channel") +var EmitReceivedMessage = g.MakeCounter("received-message") +var EmitWriteToClientError = g.MakeCounter("write-to-client") + +const pingFrequency = time.Duration(30) * time.Second +const pongMaxLatency = time.Duration(5) * time.Second + +// type UUID string + +type Channel struct { +} + +type Connection struct { + conn net.Conn + replyChan chan string + lastReadFrom time.Time + lastWrittenTo time.Time + // id *UUID + id string + isAuthenticated bool +} + +type User struct { + connections []Connection +} + +type State struct { + users map[string]*User +} + +type Context struct { + db *sql.DB + state State + tx chan int +} + +type MessageParams struct { + Middle []string + Trailing string +} + +type Message struct { + Prefix string + Command string + Params MessageParams + Raw string +} + +var ( + CmdUSER = Message { Command: "USER" } + CmdPRIVMSG = Message { Command: "PRIVMSG" } + CmdJOIN = Message { Command: "JOIN" } +) + +func SplitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) { + idx := bytes.Index(data, []byte { '\r', '\n' }) + if idx == -1 { + return 0, nil, nil + } + + return idx + 2, data[0:idx], nil +} + +func SplitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) { + advance, token, error := SplitOnCRLF(data, atEOF) + + if len(token) == 0 { + return advance, nil, error + } + + return advance, token, error +} + +func SplitSpaces(r rune) bool { + return r == ' ' +} + +func ParseMessageParams(params string) MessageParams { + const sep = " :" + + var middle string + var trailing string + + idx := strings.Index(params, sep) + if idx == -1 { + middle = params + trailing = "" + } else { + middle = params[:idx] + trailing = params[idx + len(sep):] + } + + return MessageParams { + Middle: strings.FieldsFunc(middle, SplitSpaces), + Trailing: trailing, + } +} + +var MessageRegex = regexp.MustCompilePOSIX( + // <prefix> <command> <params> + //1 2 3 4 + `^(:([^ ]+) +)?([a-zA-Z]+) *( .*)$`, +) +func ParseMessage(rawMessage string) (Message, error) { + var msg Message + + components := MessageRegex.FindStringSubmatch(rawMessage) + if components == nil { + return msg, errors.New("Can't parse message") + } + + msg = Message { + Prefix: components[2], + Command: components[3], + Params: ParseMessageParams(components[4]), + Raw: rawMessage, + } + return msg, nil +} + +func HandleUnknown(ctx *Context, msg Message) { + g.Warning( + "Unsupported command", "unsupported-command", + "command", msg.Command, + ) + var r Reply = ReplyUnknown + r.Prefix = "dunno" + // return []Action { r } +} + +func HandleUSER(ctx *Context, msg Message) { + fmt.Printf("USER: %#v\n", msg) +} + +func HandlePRIVMSG(ctx *Context, msg Message) { + // . assert no missing params + // . write to DB: (after auth) + // . channel timeline: message from $USER + // . reply to $USER + // . broadcast new timeline event to members of the channel + + stmt, err := ctx.db.Prepare(` + INSERT INTO messages + (id, sender_id, body, timestamp) + VALUES + (?, ?, ?, ? ); + `) + if err != nil { + // FIXME: reply error + fmt.Println("can't prepare: ", err) + return + } + defer stmt.Close() + + ret, err := stmt.Exec( + g.NewUUID().String(), + "FIXME", + "FIXME", + time.Now(), + ) + if err != nil { + // FIXME: reply error + fmt.Println("xablau can't prepare: ", err) + return + } + + fmt.Println("ret: ", ret) +} + +func HandleJOIN(ctx *Context, msg Message) { + fmt.Printf("JOIN: %#v\n", msg) + + // . write to DB: (after auth) + // . $USER now in channel + // . channel timeline: $USER joined + // . reply to $USER + // . broadcast new timeline event to members of the channel +} + +func ReplyAnonymous() { +} + +func PersistMessage(msg Message) { +} + +type ActionType int +const ( + ActionReply = iota +) + +type Action interface { + Type() ActionType +} + +type Reply struct { + Prefix string + Command int + Params MessageParams +} + +func (reply Reply) Type() ActionType { + return ActionReply +} + +var ( + ReplyUnknown = Reply { + Command: 421, + Params: MessageParams { + Middle: []string { }, + Trailing: "Unknown command", + }, + } +) + +var Commands = map[string]func(*Context, Message) { + CmdUSER.Command: HandleUSER, + CmdPRIVMSG.Command: HandlePRIVMSG, + CmdJOIN.Command: HandleJOIN, +} + +func ActionFnFor(command string) func(*Context, Message) { + fn := Commands[command] + if fn != nil { + return fn + } + + return HandleUnknown +} + +func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) { + connection.lastReadFrom = time.Now() + + msg, err := ParseMessage(rawMessage) + if err != nil { + g.Info( + "Error processing message", + "process-message", + "err", err, + ) + return + } + + if msg.Command == CmdUSER.Command { + args := msg.Params.Middle + if len(args) == 0 { + go ReplyAnonymous() + return + } + connection.id = args[0] + connection.isAuthenticated = true + } + + if !connection.isAuthenticated { + go ReplyAnonymous() + return + } + + ActionFnFor(msg.Command)(ctx, msg) +} + +func ReadLoop(ctx *Context, connection *Connection) { + scanner := bufio.NewScanner(connection.conn) + scanner.Split(SplitOnRawMessage) + for scanner.Scan() { + ProcessMessage(ctx, connection, scanner.Text()) + } +} + +func WriteLoop(ctx *Context, connection *Connection) { + for message := range connection.replyChan { + _, err := io.WriteString(connection.conn, message) + if err != nil { + g.Error( + "Failed to send data to user", + "user-reply-error", + "err", err, + ) + EmitWriteToClientError() + continue + } + + connection.lastWrittenTo = time.Now() + } + + EmitActiveConnection.Dec() + connection.conn.Close() +} + +func Kill(ctx *Context, connection *Connection) { + // lock? + delete(ctx.state.users, connection.id) + // unlock? + close(connection.replyChan) + connection.conn.Close() // Ignore errors? +} + +const PingWindow = 30 * time.Second +func PingLoop(ctx *Context, connection *Connection) { + for { + time.Sleep(PingWindow) + if (time.Since(connection.lastReadFrom) <= PingWindow) { + continue + } + window := connection.lastWrittenTo.Sub(connection.lastReadFrom) + if (window <= PingWindow) { + connection.replyChan <- "PING" + continue + } + + Kill(ctx, connection) + break + } +} + +func HandleConnection(ctx *Context, conn net.Conn) { + EmitActiveConnection.Inc() + // FIXME: WaitGroup here? + now := time.Now() + connection := Connection { + conn: conn, + isAuthenticated: false, + lastReadFrom: now, + lastWrittenTo: now, + } + go ReadLoop(ctx, &connection) + go WriteLoop(ctx, &connection) + go PingLoop(ctx, &connection) +} + +func IRCdLoop(ctx *Context, publicSocketPath string) { + listener, err := net.Listen("unix", publicSocketPath) + g.FatalIf(err) + g.Info("IRCd started", "component-up", "component", "ircd") + + for { + conn, err := listener.Accept() + if err != nil { + g.Warning( + "Error accepting a public IRCd connection", + "accept-connection", + "err", err, + ) + // conn.Close() // FIXME: is conn nil? + continue + } + // FIXME: where does it get closed + go HandleConnection(ctx, conn) + } +} + +func CommandListenerLoop(ctx *Context, commandSocketPath string) { + listener, err := net.Listen("unix", commandSocketPath) + g.FatalIf(err) + g.Info( + "command listener started", + "component-up", + "component", "command-listener", + ) + + for { + conn, err := listener.Accept() + if err != nil { + g.Warning( + "Error accepting a command connection", + "accept-command", + "err", err, + ) + continue + } + defer conn.Close() + + // TODO: handle commands + } +} + +func TransactorLoop(ctx *Context) { + g.Info("transactor started", "component-up", "component", "transactor") + EmitActiveConnection.Inc() + + for tx := range ctx.tx { + fmt.Println(tx) + } +} + +func InitMigrations(db *sql.DB) { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS migrations ( + filename TEXT PRIMARY KEY + ); + `) + g.FatalIf(err) +} + +const MIGRATIONS_DIR = "src/sql/migrations/" +func PendingMigrations(db *sql.DB) []string { + files, err := ioutil.ReadDir(MIGRATIONS_DIR) + g.FatalIf(err) + + set := make(map[string]bool) + for _, file := range files { + set[file.Name()] = true + } + + rows, err := db.Query(`SELECT filename FROM migrations;`) + g.FatalIf(err) + defer rows.Close() + + for rows.Next() { + var filename string + err := rows.Scan(&filename) + g.FatalIf(err) + delete(set, filename) + } + g.FatalIf(rows.Err()) + + difference := make([]string, 0) + for filename := range set { + difference = append(difference, filename) + } + + sort.Sort(sort.StringSlice(difference)) + return difference +} + +func RunMigrations(db *sql.DB) { + InitMigrations(db) + + stmt, err := db.Prepare(`INSERT INTO migrations (filename) VALUES (?);`) + g.FatalIf(err) + defer stmt.Close() + + for _, filename := range PendingMigrations(db) { + g.Info("Running migration file", "exec-migration-file", + "filename", filename, + ) + + tx, err := db.Begin() + g.FatalIf(err) + + sql, err := os.ReadFile(MIGRATIONS_DIR + filename) + g.FatalIf(err) + + _, err = tx.Exec(string(sql)) + g.FatalIf(err) + + _, err = tx.Stmt(stmt).Exec(filename) + g.FatalIf(err) + + err = tx.Commit() + g.FatalIf(err) + } +} + +func InitDB(databasePath string) *sql.DB { + db, err := sql.Open("sqlite3", databasePath) + g.FatalIf(err) + RunMigrations(db) + return db +} + +func Init() { + g.Init() + SetEnvironmentVariables() +} + +func Start(ctx *Context, publicSocketPath string, commandSocketPath string) { + /* + buildInfo, ok := debug.ReadBuildInfo() + if !ok { + g.Fatal(errors.New("error on debug.ReadBuildInfo()")) + } + */ + + g.Info("-", "lifecycle-event", + "event", "starting-server", + /* + slog.Group( + "go", + "version", buildInfo.GoVersion, + "settings", buildInfo.Settings, + "deps", buildInfo.Deps, + ), + */ + ) + + var wg sync.WaitGroup + bgRun := func(f func()) { + wg.Add(1) + go func() { + f() + wg.Done() + }() + } + bgRun(func() { IRCdLoop(ctx, publicSocketPath) }) + bgRun(func() { CommandListenerLoop(ctx, commandSocketPath) }) + bgRun(func() { TransactorLoop(ctx) }) + wg.Wait() +} + +func BuildContext(databasePath string) *Context { + db := InitDB(databasePath) + tx := make(chan int, 100) + return &Context { + db: db, + tx: tx, + } +} + +var ( + databasePath = flag.String( + "f", + "papod.db", + "The path to the database file", + ) + publicSocketPath = flag.String( + "s", + "papod.public.socket", + "The path to the socket that handles the public traffic", + ) + commandSocketPath = flag.String( + "S", + "papod.command.socket", + "The path to the private IPC commands socket", + ) +) + + +func Main() { + Init() + flag.Parse() + ctx := BuildContext(*databasePath) + Start(ctx, *publicSocketPath, *commandSocketPath) +} |