summaryrefslogtreecommitdiff
path: root/src/papod.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/papod.go')
-rw-r--r--src/papod.go583
1 files changed, 583 insertions, 0 deletions
diff --git a/src/papod.go b/src/papod.go
new file mode 100644
index 0000000..91e1145
--- /dev/null
+++ b/src/papod.go
@@ -0,0 +1,583 @@
+package papod
+
+import (
+ "bufio"
+ "bytes"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net"
+ "os"
+ "regexp"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ g "gobang"
+ _ "golite"
+)
+
+
+
+
+
+// Global variables
+var (
+ Version string
+ Colour string
+)
+
+
+
+func SetEnvironmentVariables() {
+ Version = os.Getenv("PAPOD_VERSION")
+ if Version == "" {
+ Version = "PAPOD-VERSION-UNKNOWN"
+ }
+
+ Colour = os.Getenv("PAPOD_COLOUR")
+ if Colour == "" {
+ Colour = "PAPOD-COLOUR-UNKNOWN"
+ }
+}
+
+
+// FIXME: reorder
+var EmitActiveConnection = g.MakeGauge("active-connections")
+var EmitNicksInChannel = g.MakeGauge("nicks-in-channel")
+var EmitReceivedMessage = g.MakeCounter("received-message")
+var EmitWriteToClientError = g.MakeCounter("write-to-client")
+
+const pingFrequency = time.Duration(30) * time.Second
+const pongMaxLatency = time.Duration(5) * time.Second
+
+// type UUID string
+
+type Channel struct {
+}
+
+type Connection struct {
+ conn net.Conn
+ replyChan chan string
+ lastReadFrom time.Time
+ lastWrittenTo time.Time
+ // id *UUID
+ id string
+ isAuthenticated bool
+}
+
+type User struct {
+ connections []Connection
+}
+
+type State struct {
+ users map[string]*User
+}
+
+type Context struct {
+ db *sql.DB
+ state State
+ tx chan int
+}
+
+type MessageParams struct {
+ Middle []string
+ Trailing string
+}
+
+type Message struct {
+ Prefix string
+ Command string
+ Params MessageParams
+ Raw string
+}
+
+var (
+ CmdUSER = Message { Command: "USER" }
+ CmdPRIVMSG = Message { Command: "PRIVMSG" }
+ CmdJOIN = Message { Command: "JOIN" }
+)
+
+func SplitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) {
+ idx := bytes.Index(data, []byte { '\r', '\n' })
+ if idx == -1 {
+ return 0, nil, nil
+ }
+
+ return idx + 2, data[0:idx], nil
+}
+
+func SplitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) {
+ advance, token, error := SplitOnCRLF(data, atEOF)
+
+ if len(token) == 0 {
+ return advance, nil, error
+ }
+
+ return advance, token, error
+}
+
+func SplitSpaces(r rune) bool {
+ return r == ' '
+}
+
+func ParseMessageParams(params string) MessageParams {
+ const sep = " :"
+
+ var middle string
+ var trailing string
+
+ idx := strings.Index(params, sep)
+ if idx == -1 {
+ middle = params
+ trailing = ""
+ } else {
+ middle = params[:idx]
+ trailing = params[idx + len(sep):]
+ }
+
+ return MessageParams {
+ Middle: strings.FieldsFunc(middle, SplitSpaces),
+ Trailing: trailing,
+ }
+}
+
+var MessageRegex = regexp.MustCompilePOSIX(
+ // <prefix> <command> <params>
+ //1 2 3 4
+ `^(:([^ ]+) +)?([a-zA-Z]+) *( .*)$`,
+)
+func ParseMessage(rawMessage string) (Message, error) {
+ var msg Message
+
+ components := MessageRegex.FindStringSubmatch(rawMessage)
+ if components == nil {
+ return msg, errors.New("Can't parse message")
+ }
+
+ msg = Message {
+ Prefix: components[2],
+ Command: components[3],
+ Params: ParseMessageParams(components[4]),
+ Raw: rawMessage,
+ }
+ return msg, nil
+}
+
+func HandleUnknown(ctx *Context, msg Message) {
+ g.Warning(
+ "Unsupported command", "unsupported-command",
+ "command", msg.Command,
+ )
+ var r Reply = ReplyUnknown
+ r.Prefix = "dunno"
+ // return []Action { r }
+}
+
+func HandleUSER(ctx *Context, msg Message) {
+ fmt.Printf("USER: %#v\n", msg)
+}
+
+func HandlePRIVMSG(ctx *Context, msg Message) {
+ // . assert no missing params
+ // . write to DB: (after auth)
+ // . channel timeline: message from $USER
+ // . reply to $USER
+ // . broadcast new timeline event to members of the channel
+
+ stmt, err := ctx.db.Prepare(`
+ INSERT INTO messages
+ (id, sender_id, body, timestamp)
+ VALUES
+ (?, ?, ?, ? );
+ `)
+ if err != nil {
+ // FIXME: reply error
+ fmt.Println("can't prepare: ", err)
+ return
+ }
+ defer stmt.Close()
+
+ ret, err := stmt.Exec(
+ g.NewUUID().String(),
+ "FIXME",
+ "FIXME",
+ time.Now(),
+ )
+ if err != nil {
+ // FIXME: reply error
+ fmt.Println("xablau can't prepare: ", err)
+ return
+ }
+
+ fmt.Println("ret: ", ret)
+}
+
+func HandleJOIN(ctx *Context, msg Message) {
+ fmt.Printf("JOIN: %#v\n", msg)
+
+ // . write to DB: (after auth)
+ // . $USER now in channel
+ // . channel timeline: $USER joined
+ // . reply to $USER
+ // . broadcast new timeline event to members of the channel
+}
+
+func ReplyAnonymous() {
+}
+
+func PersistMessage(msg Message) {
+}
+
+type ActionType int
+const (
+ ActionReply = iota
+)
+
+type Action interface {
+ Type() ActionType
+}
+
+type Reply struct {
+ Prefix string
+ Command int
+ Params MessageParams
+}
+
+func (reply Reply) Type() ActionType {
+ return ActionReply
+}
+
+var (
+ ReplyUnknown = Reply {
+ Command: 421,
+ Params: MessageParams {
+ Middle: []string { },
+ Trailing: "Unknown command",
+ },
+ }
+)
+
+var Commands = map[string]func(*Context, Message) {
+ CmdUSER.Command: HandleUSER,
+ CmdPRIVMSG.Command: HandlePRIVMSG,
+ CmdJOIN.Command: HandleJOIN,
+}
+
+func ActionFnFor(command string) func(*Context, Message) {
+ fn := Commands[command]
+ if fn != nil {
+ return fn
+ }
+
+ return HandleUnknown
+}
+
+func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) {
+ connection.lastReadFrom = time.Now()
+
+ msg, err := ParseMessage(rawMessage)
+ if err != nil {
+ g.Info(
+ "Error processing message",
+ "process-message",
+ "err", err,
+ )
+ return
+ }
+
+ if msg.Command == CmdUSER.Command {
+ args := msg.Params.Middle
+ if len(args) == 0 {
+ go ReplyAnonymous()
+ return
+ }
+ connection.id = args[0]
+ connection.isAuthenticated = true
+ }
+
+ if !connection.isAuthenticated {
+ go ReplyAnonymous()
+ return
+ }
+
+ ActionFnFor(msg.Command)(ctx, msg)
+}
+
+func ReadLoop(ctx *Context, connection *Connection) {
+ scanner := bufio.NewScanner(connection.conn)
+ scanner.Split(SplitOnRawMessage)
+ for scanner.Scan() {
+ ProcessMessage(ctx, connection, scanner.Text())
+ }
+}
+
+func WriteLoop(ctx *Context, connection *Connection) {
+ for message := range connection.replyChan {
+ _, err := io.WriteString(connection.conn, message)
+ if err != nil {
+ g.Error(
+ "Failed to send data to user",
+ "user-reply-error",
+ "err", err,
+ )
+ EmitWriteToClientError()
+ continue
+ }
+
+ connection.lastWrittenTo = time.Now()
+ }
+
+ EmitActiveConnection.Dec()
+ connection.conn.Close()
+}
+
+func Kill(ctx *Context, connection *Connection) {
+ // lock?
+ delete(ctx.state.users, connection.id)
+ // unlock?
+ close(connection.replyChan)
+ connection.conn.Close() // Ignore errors?
+}
+
+const PingWindow = 30 * time.Second
+func PingLoop(ctx *Context, connection *Connection) {
+ for {
+ time.Sleep(PingWindow)
+ if (time.Since(connection.lastReadFrom) <= PingWindow) {
+ continue
+ }
+ window := connection.lastWrittenTo.Sub(connection.lastReadFrom)
+ if (window <= PingWindow) {
+ connection.replyChan <- "PING"
+ continue
+ }
+
+ Kill(ctx, connection)
+ break
+ }
+}
+
+func HandleConnection(ctx *Context, conn net.Conn) {
+ EmitActiveConnection.Inc()
+ // FIXME: WaitGroup here?
+ now := time.Now()
+ connection := Connection {
+ conn: conn,
+ isAuthenticated: false,
+ lastReadFrom: now,
+ lastWrittenTo: now,
+ }
+ go ReadLoop(ctx, &connection)
+ go WriteLoop(ctx, &connection)
+ go PingLoop(ctx, &connection)
+}
+
+func IRCdLoop(ctx *Context, publicSocketPath string) {
+ listener, err := net.Listen("unix", publicSocketPath)
+ g.FatalIf(err)
+ g.Info("IRCd started", "component-up", "component", "ircd")
+
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ g.Warning(
+ "Error accepting a public IRCd connection",
+ "accept-connection",
+ "err", err,
+ )
+ // conn.Close() // FIXME: is conn nil?
+ continue
+ }
+ // FIXME: where does it get closed
+ go HandleConnection(ctx, conn)
+ }
+}
+
+func CommandListenerLoop(ctx *Context, commandSocketPath string) {
+ listener, err := net.Listen("unix", commandSocketPath)
+ g.FatalIf(err)
+ g.Info(
+ "command listener started",
+ "component-up",
+ "component", "command-listener",
+ )
+
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ g.Warning(
+ "Error accepting a command connection",
+ "accept-command",
+ "err", err,
+ )
+ continue
+ }
+ defer conn.Close()
+
+ // TODO: handle commands
+ }
+}
+
+func TransactorLoop(ctx *Context) {
+ g.Info("transactor started", "component-up", "component", "transactor")
+ EmitActiveConnection.Inc()
+
+ for tx := range ctx.tx {
+ fmt.Println(tx)
+ }
+}
+
+func InitMigrations(db *sql.DB) {
+ _, err := db.Exec(`
+ CREATE TABLE IF NOT EXISTS migrations (
+ filename TEXT PRIMARY KEY
+ );
+ `)
+ g.FatalIf(err)
+}
+
+const MIGRATIONS_DIR = "src/sql/migrations/"
+func PendingMigrations(db *sql.DB) []string {
+ files, err := ioutil.ReadDir(MIGRATIONS_DIR)
+ g.FatalIf(err)
+
+ set := make(map[string]bool)
+ for _, file := range files {
+ set[file.Name()] = true
+ }
+
+ rows, err := db.Query(`SELECT filename FROM migrations;`)
+ g.FatalIf(err)
+ defer rows.Close()
+
+ for rows.Next() {
+ var filename string
+ err := rows.Scan(&filename)
+ g.FatalIf(err)
+ delete(set, filename)
+ }
+ g.FatalIf(rows.Err())
+
+ difference := make([]string, 0)
+ for filename := range set {
+ difference = append(difference, filename)
+ }
+
+ sort.Sort(sort.StringSlice(difference))
+ return difference
+}
+
+func RunMigrations(db *sql.DB) {
+ InitMigrations(db)
+
+ stmt, err := db.Prepare(`INSERT INTO migrations (filename) VALUES (?);`)
+ g.FatalIf(err)
+ defer stmt.Close()
+
+ for _, filename := range PendingMigrations(db) {
+ g.Info("Running migration file", "exec-migration-file",
+ "filename", filename,
+ )
+
+ tx, err := db.Begin()
+ g.FatalIf(err)
+
+ sql, err := os.ReadFile(MIGRATIONS_DIR + filename)
+ g.FatalIf(err)
+
+ _, err = tx.Exec(string(sql))
+ g.FatalIf(err)
+
+ _, err = tx.Stmt(stmt).Exec(filename)
+ g.FatalIf(err)
+
+ err = tx.Commit()
+ g.FatalIf(err)
+ }
+}
+
+func InitDB(databasePath string) *sql.DB {
+ db, err := sql.Open("sqlite3", databasePath)
+ g.FatalIf(err)
+ RunMigrations(db)
+ return db
+}
+
+func Init() {
+ g.Init()
+ SetEnvironmentVariables()
+}
+
+func Start(ctx *Context, publicSocketPath string, commandSocketPath string) {
+ /*
+ buildInfo, ok := debug.ReadBuildInfo()
+ if !ok {
+ g.Fatal(errors.New("error on debug.ReadBuildInfo()"))
+ }
+ */
+
+ g.Info("-", "lifecycle-event",
+ "event", "starting-server",
+ /*
+ slog.Group(
+ "go",
+ "version", buildInfo.GoVersion,
+ "settings", buildInfo.Settings,
+ "deps", buildInfo.Deps,
+ ),
+ */
+ )
+
+ var wg sync.WaitGroup
+ bgRun := func(f func()) {
+ wg.Add(1)
+ go func() {
+ f()
+ wg.Done()
+ }()
+ }
+ bgRun(func() { IRCdLoop(ctx, publicSocketPath) })
+ bgRun(func() { CommandListenerLoop(ctx, commandSocketPath) })
+ bgRun(func() { TransactorLoop(ctx) })
+ wg.Wait()
+}
+
+func BuildContext(databasePath string) *Context {
+ db := InitDB(databasePath)
+ tx := make(chan int, 100)
+ return &Context {
+ db: db,
+ tx: tx,
+ }
+}
+
+var (
+ databasePath = flag.String(
+ "f",
+ "papod.db",
+ "The path to the database file",
+ )
+ publicSocketPath = flag.String(
+ "s",
+ "papod.public.socket",
+ "The path to the socket that handles the public traffic",
+ )
+ commandSocketPath = flag.String(
+ "S",
+ "papod.command.socket",
+ "The path to the private IPC commands socket",
+ )
+)
+
+
+func Main() {
+ Init()
+ flag.Parse()
+ ctx := BuildContext(*databasePath)
+ Start(ctx, *publicSocketPath, *commandSocketPath)
+}