diff options
author | EuAndreh <eu@euandre.org> | 2024-10-27 09:34:59 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2024-10-27 09:34:59 -0300 |
commit | 11718f5c08a821eded93be54399576e8836a9a37 (patch) | |
tree | 43debc63238d06a4458e91b33dee1475c0dac824 /src | |
parent | Big bundle commit: adjust build project skeleton; include uncommitted code (diff) | |
download | papod-11718f5c08a821eded93be54399576e8836a9a37.tar.gz papod-11718f5c08a821eded93be54399576e8836a9a37.tar.xz |
src/papod.go: Make private most exported names and start actually implementing queriesT{}
Diffstat (limited to 'src')
-rw-r--r-- | src/papod.go | 1086 |
1 files changed, 688 insertions, 398 deletions
diff --git a/src/papod.go b/src/papod.go index 220ef55..aedfb5f 100644 --- a/src/papod.go +++ b/src/papod.go @@ -3,259 +3,601 @@ package papod import ( "bufio" "bytes" + "context" "database/sql" "errors" "flag" "fmt" "io" - "io/ioutil" "log/slog" "net" - "os" "regexp" - "sort" "strings" "sync" "time" - "gracha" + "cracha" "q" - // "golite" + "golite" "guuid" g "gobang" ) -type tablesT struct{ - servers string - users string - channels string - channelEvents string +const ( + defaultPrefix = "papod" + rollbackErrorFmt = "rollback error: %w; while executing: %w" + + NEW_CHANNEL = "new-channel" +) + + + +type queryT struct{ + write string + read string } type queriesT struct{ - addChannel func(string) error - eventsAfter func([]byte, func(eventT) error) error - addEvent func(string, string) error - close func() error + addNetwork func(guuid.UUID, newNetworkT) (networkT, error) + addChannel func(guuid.UUID, newChannelT) (channelT, error) + channels func(guuid.UUID, func(channelT) error) error + allAfter func(guuid.UUID, func(eventT) error) error + addEvent func(string, string) error + close func() error +} + +type newNetworkT struct{ + uuid guuid.UUID + name string +} + +type networkT struct{ + id int64 + timestamp time.Time + uuid guuid.UUID + name string + createdBy guuid.UUID +} + +type newChannelT struct{ + name string + uuid guuid.UUID +} + +type channelT struct { + id int64 + timestamp time.Time + uuid guuid.UUID + name string } type eventT struct{ - id int64 + id int64 + timestamp time.Time + uuid guuid.UUID } -type PapoD struct{ - queries queriesT - auth gracha.IAuth +type papodT struct{ + auth cracha.IAuth queue q.IQueue + db *sql.DB + queries queriesT } type consumerT struct{ topic string - handlerFn func(PapoD) func(q.Message) error + handlerFn func(papodT) func(q.Message) error } +type connectionT struct { + conn net.Conn + replyChan chan string + lastReadFrom time.Time + lastWrittenTo time.Time + // id *UUID + id string + isAuthenticated bool +} +type userT struct { + connections []connectionT +} -const ( - NEW_CHANNEL_EVENT = "new-channel-event" +type stateT struct { + users map[string]*userT +} + +type contextT struct { + db *sql.DB + state stateT + tx chan int +} + +type messageParamsT struct { + middle []string + trailing string +} + +type messageT struct { + prefix string + command string + params messageParamsT + raw string +} - defaultPrefix = "papod" +type actionType int +const ( + actionReply actionType = iota ) +type action interface { + typeOf() actionType +} + +type replyT struct { + prefix string + command int + params messageParamsT +} + +type IPapod interface{ +} + + + +func tryRollback(db *sql.DB, ctx context.Context, err error) error { + _, rollbackErr := db.ExecContext(ctx, "ROLLBACK;") + if rollbackErr != nil { + return fmt.Errorf( + rollbackErrorFmt, + rollbackErr, + err, + ) + } + + return err +} + +// FIXME +// See: +// https://sqlite.org/forum/forumpost/2507664507 +func inTx(db *sql.DB, fn func(context.Context) error) error { + ctx := context.Background() + _, err := db.ExecContext(ctx, "BEGIN IMMEDIATE;") + if err != nil { + return err + } -func tablesFrom(prefix string) (tablesT, error) { - err := g.ValidateSQLTablePrefix(prefix) + err = fn(ctx) if err != nil { - return tablesT{}, err - } - - servers := prefix + "-servers" - users := prefix + "-users" - channels := prefix + "-channels" - channelEvents := prefix + "-channel-events" - return tablesT{ - servers: servers, - users: users, - channels: channels, - channelEvents: channelEvents, - }, nil + return tryRollback(db, ctx, err) + } + + _, err = db.ExecContext(ctx, "COMMIT;") + if err != nil { + return tryRollback(db, ctx, err) + } + + return nil } -func createTables(db *sql.DB, tables tablesT) error { - const tmpl = ` - BEGIN TRANSACTION; - CREATE TABLE IF NOT EXISTS "%s" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - uuid BLOB NOT NULL UNIQUE, - name TEXT NOT NULL, - description TEXT NOT NULL, - public BOOLEAN NOT NULL, - metadata TEXT +/// "papod_users".uuid is the same as cracha_users.uuid. Not a foreign key to +/// allow them to live in different physical locations. Adding it here feels +/// less like an optimization related decision, and more of a coupling one. The +/// way that New() works now uses the same databasePath for the q.IQueue *and* +/// cracha.IAuth, but cracha in no way exposes where it stores the user UUID or +/// how the it is handled. This has similarities to how events here don't +/// reference the q.Message.ID via foreign keys either. They're treated only as +/// opaque IDs. +func createTablesSQL(prefix string) queryT { + const tmpl_write = ` + CREATE TABLE IF NOT EXISTS "%s_workspaces" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + uuid BLOB NOT NULL UNIQUE, + name TEXT NOT NULL, + description TEXT NOT NULL, + -- "public", "private", "unlisted" + type TEXT NOT NULL ); - CREATE TABLE IF NOT EXISTS "%s" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - uuid BLOB NOT NULL UNIQUE, - server_id INTEGER NOT NULL REFERENCES "%s"(id), - authuser_uuid BLOB NOT NULL UNIQUE, - human BOOLEAN NOT NULL, - visible BOOLEAN NOT NULL, - enabled BOOLEAN NOT NULL, - metadata TEXT + CREATE TABLE IF NOT EXISTS "%s_workspace_changes ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + workspace_id INTEGER NOT NULL + REFERENCES "%s_workspaces"(id), + attribute TEXT NOT NULL, + value TEXT NOT NULL, + op BOOLEAN NOT NULL ); - CREATE TABLE IF NOT EXISTS "%s" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - uuid BLOB NOT NULL UNIQUE, - server_id INTEGER NOT NULL REFERENCES "%s"(id), - name TEXT, - description TEXT, - virtual BOOLEAN NOT NULL, - metadata TEXT + CREATE TABLE IF NOT EXISTS "%s_users" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + -- provided by cracha + uuid BLOB NOT NULL UNIQUE, + username TEXT NOT NULL, + display_name TEXT NOT NULL, + picture_uuid BLOB NOT NULL UNIQUE, + deleted BOOLEAN NOT NULL ); - -- FIXME: group conversations? - CREATE TABLE IF NOT EXISTS "%s" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - uuid BLOB NOT NULL UNIQUE, - channel_id INTEGER NOT NULL REFERENCES "%s"(id), - connection_id INTEGER NOT NULL + CREATE TABLE IF NOT EXISTS "%s_user_changes" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + user_id INTEGER NOT NULL REFERENCES "%s_users"(id), + attribute TEXT NOT NULL, + value TEXT NOT NULL, + op BOOLEAN NOT NULL + ); + CREATE TABLE IF NOT EXISTS "%s_members" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + workspace_id INTEGER NOT NULL + REFERENCES "%s_workspaces"(id), + user_id INTEGER NOT NULL, + username TEXT NOT NULL, + display_name TEXT NOT NULL, + picture_uuid BLOB NOT NULL UNIQUE, + -- "active", "inactive", "removed" + status TEXT NOT NULL, + -- "active", always + active_uniq TEXT, + UNIQUE (workspace_id, username, active_uniq), + UNIQUE (workspace_id, user_id) + ); + CREATE TABLE IF NOT EXISTS "%s_member_roles" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + member_id INTEGER NOT NULL + REFERENCES "%s_members"(id), + role TEXT NOT NULL, + UNIQUE (member_id, role) + ); + CREATE TABLE IF NOT EXISTS "%s_member_changes" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + member_id INTEGER NOT NULL + REFERENCES "%s_members"(id), + attribute TEXT NOT NULL, + value TEXT NOT NULL, + op BOOLEAN NOT NULL + ); + CREATE TABLE IF NOT EXISTS "%s_channels" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + uuid BLOB NOT NULL UNIQUE, + workspace_id INTEGER NOT NULL + REFERENCES "%s_workspaces"(id), + public_name TEXT UNIQUE, + label TEXT NOT NULL, + description TEXT, + virtual BOOLEAN NOT NULL + ); + CREATE TABLE IF NOT EXISTS "%s_channel_changes" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + channel_id INTEGER NOT NULL + REFERENCES "%s_channels"(id), + attribute TEXT NOT NULL, + value TEXT NOT NULL, + op BOOLEAN NOT NULL + ); + CREATE TABLE IF NOT EXISTS "%s_participants" ( + member_id + ); + CREATE TABLE IF NOT EXISTS "%s_channel_events" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + uuid BLOB NOT NULL UNIQUE, + channel_id INTEGER NOT NULL REFERENCES "%s"(id), + connection_id INTEGER NOT NULL, -- payload FIXME: vary by type? ); + -- FIXME: group conversations? + -- user: person + -- member: workspace user + -- participant: channel member + ` + return queryT{ + write: fmt.Sprintf( + tmpl_write, + prefix, + g.SQLiteNow, + prefix, + g.SQLiteNow, + prefix, + prefix, + g.SQLiteNow, + prefix, + prefix, + g.SQLiteNow, + prefix, + ), + } +} +func createTables(db *sql.DB, prefix string) error { + q := createTablesSQL(prefix) - -- FIXME:indexes - COMMIT TRANSACTION; - ` - sql := fmt.Sprintf( - tmpl, - tables.servers, - g.SQLiteNow, - tables.users, - g.SQLiteNow, - tables.servers, - tables.channels, - g.SQLiteNow, - tables.servers, - tables.channelEvents, - g.SQLiteNow, - tables.channels, - ) - fmt.Println(sql) /// + return inTx(db, func(ctx context.Context) error { + _, err := db.ExecContext(ctx, q.write) + return err + }) +} - _, err := db.Exec(sql) - return err +// addServer +// addWorkspace +// addNetwork FIXME +func addNetworkSQL(prefix string) queryT { + const tmpl_write = ` + -- FIXME + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } } -func addChannelQuery( +func addNetworkStmt( db *sql.DB, - tables tablesT, -) (func (string) error, func() error, error) { - const tmpl = ` + prefix string, +) (func(guuid.UUID, newNetworkT) (networkT, error), func() error, error) { + q := addNetworkSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func( + userID guuid.UUID, + newNetwork newNetworkT, + ) (networkT, error) { + network := networkT{ + uuid: newNetwork.uuid, + name: newNetwork.name, + } + + var timestr string + network_id_bytes := newNetwork.uuid[:] + user_id_bytes := userID[:] + err := writeStmt.QueryRow( + network_id_bytes, + newNetwork.name, + user_id_bytes, + ).Scan(&network.id, ×tr) + if err != nil { + return networkT{}, err + } + + network.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return networkT{}, err + } + + return network, nil + } + + return fn, writeStmt.Close, nil +} + +func addChannelSQL(prefix string) queryT { + const tmpl_write = ` + -- FIXME ` - sql := fmt.Sprintf(tmpl) - /// fmt.Println(sql) /// + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func addChannelStmt( + db *sql.DB, + prefix string, +) (func (guuid.UUID, newChannelT) (channelT, error), func() error, error) { + q := addChannelSQL(prefix) - stmt, err := db.Prepare(sql) + writeStmt, err := db.Prepare(q.write) if err != nil { return nil, nil, err } - fn := func(name string) error { - _, err := stmt.Exec(name) - return err + fn := func( + networkID guuid.UUID, + newChannel newChannelT, + ) (channelT, error) { + channel := channelT{ + name: newChannel.name, + } + + var timestr string + network_id_bytes := networkID[:] + err := writeStmt.QueryRow( + network_id_bytes, + newChannel.name, + ).Scan(&channel.id, ×tr) + if err != nil { + return channelT{}, err + } + + channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return channelT{}, err + } + + return channel, nil } return fn, nil, nil } -func eventEach( - stmt *sql.Stmt, - uuid []byte, - callback func(eventT) error, -) error { - rows, err := stmt.Query(uuid) +func channelEach(rows *sql.Rows, callback func(channelT) error) error { + if rows == nil { + return nil + } + + for rows.Next() { + var ( + channel channelT + timestr string + channel_id_bytes []byte + ) + err := rows.Scan( + &channel.id, + ×tr, + &channel_id_bytes, + ) + if err != nil { + g.WrapErrors(rows.Close(), err) + } + + channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + g.WrapErrors(rows.Close(), err) + } + + err = callback(channel) + if err != nil { + g.WrapErrors(rows.Close(), err) + } + } + + return g.WrapErrors(rows.Err(), rows.Close()) +} + +func channelsSQL(prefix string) queryT { + const tmpl_read = ` + -- FIXME + ` + return queryT{ + read: fmt.Sprintf(tmpl_read, prefix), + } +} + +func channelsStmt( + db *sql.DB, + prefix string, +) (func(guuid.UUID) (*sql.Rows, error), func() error, error) { + q := channelsSQL(prefix) + + readStmt, err := db.Prepare(q.read) if err != nil { - return err + return nil, nil, err + } + + fn := func(workspaceID guuid.UUID) (*sql.Rows, error) { + return readStmt.Query(workspaceID) + } + + return fn, readStmt.Close, nil +} + +func eventEach(rows *sql.Rows, callback func(eventT) error) error { + if rows == nil { + return nil } - defer rows.Close() for rows.Next() { - var event eventT - err = rows.Scan( + var ( + event eventT + timestr string + event_id_bytes []byte + ) + err := rows.Scan( &event.id, + ×tr, + &event_id_bytes, ) if err != nil { - return err + return g.WrapErrors(rows.Close(), err) + } + + event.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return g.WrapErrors(rows.Close(), err) } err = callback(event) if err != nil { - return err + return g.WrapErrors(rows.Close(), err) } } - return rows.Err() + return g.WrapErrors(rows.Err(), rows.Close()) } -func eventsAfterQuery( - db *sql.DB, - tables tablesT, -) (func ([]byte, func(eventT) error) error, func() error, error) { - const tmpl = ` - -- INSERT +func allAfterSQL(prefix string) queryT { + const tmpl_read = ` + -- FIXME ` - sql := fmt.Sprintf(tmpl) - /// fmt.Println(sql) /// + return queryT{ + read: fmt.Sprintf(tmpl_read, prefix), + } +} + +func allAfterStmt( + db *sql.DB, + prefix string, +) (func (guuid.UUID) (*sql.Rows, error), func() error, error) { + q := allAfterSQL(prefix) - stmt, err := db.Prepare(sql) + readStmt, err := db.Prepare(q.read) if err != nil { return nil, nil, err } - fn := func(uuid []byte, callback func(eventT) error) error { - return eventEach(stmt, uuid, callback) + fn := func(eventID guuid.UUID) (*sql.Rows, error) { + return readStmt.Query(eventID) } - return fn, stmt.Close, nil + return fn, readStmt.Close, nil +} + +func addEventSQL(prefix string) queryT { + const tmpl_write = ` + -- FIXME + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } } -func addEventQuery( +func addEventStmt( db *sql.DB, - tables tablesT, + prefix string, ) (func (string, string) error, func() error, error) { - const tmpl = ` - ` - sql := fmt.Sprintf(tmpl) - /// fmt.Println(sql) /// + q := addEventSQL(prefix) - stmt, err := db.Prepare(sql) + writeStmt, err := db.Prepare(q.write) if err != nil { return nil, nil, err } fn := func(type_ string, payload string) error { - _, err := stmt.Exec(type_, payload) + _, err := writeStmt.Exec(type_, payload) return err } - return fn, stmt.Close, nil + return fn, writeStmt.Close, nil } -func initDB(db *sql.DB, tables tablesT) (queriesT, error) { - createTablesErr := createTables(db, tables) - addChannel, addChannelClose, addChannelErr := addChannelQuery(db, tables) - eventsAfter, eventsAfterClose, eventsAfterErr := eventsAfterQuery(db, tables) - addEvent, addEventClose, addEventErr := addEventQuery(db, tables) +func initDB( + db *sql.DB, + prefix string, +) (queriesT, error) { + createTablesErr := createTables(db, prefix) +// addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceStmt(db, prefix) +// addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceQ(db, p) + addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix) + addChannel, addChannelClose, addChannelErr := addChannelStmt(db, prefix) + channels, channelsClose, channelsErr := channelsStmt(db, prefix) + allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix) + addEvent, addEventClose, addEventErr := addEventStmt(db, prefix) err := g.SomeError( createTablesErr, + addNetworkErr, addChannelErr, - eventsAfterErr, + channelsErr, + allAfterErr, addEventErr, ) if err != nil { @@ -264,27 +606,82 @@ func initDB(db *sql.DB, tables tablesT) (queriesT, error) { close := func() error { return g.SomeFnError( - addChannelClose, - eventsAfterClose, - addEventClose, + addNetworkClose, + addChannelClose, + channelsClose, + allAfterClose, + addEventClose, ) } + var connMutex sync.RWMutex return queriesT{ - addChannel: addChannel, - eventsAfter: eventsAfter, - addEvent: addEvent, - close: close, + addNetwork: func(a guuid.UUID, b newNetworkT) (networkT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return addNetwork(a, b) + }, + addChannel: func( + a guuid.UUID, b newChannelT, + ) (channelT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return addChannel(a, b) + }, + channels: func( + a guuid.UUID, + callback func(channelT) error, + ) error { + var ( + err error + rows *sql.Rows + ) + { + connMutex.RLock() + defer connMutex.RUnlock() + rows, err = channels(a) + } + if err != nil { + return err + } + + return channelEach(rows, callback) + }, + allAfter: func( + a guuid.UUID, + callback func(eventT) error, + ) error { + var ( + err error + rows *sql.Rows + ) + { + connMutex.RLock() + defer connMutex.RUnlock() + rows, err = allAfter(a) + } + if err != nil { + return err + } + + return eventEach(rows, callback) + }, + addEvent: func(a string, b string) error { + connMutex.RLock() + defer connMutex.RUnlock() + return addEvent(a, b) + }, + close: func() error { + connMutex.Lock() + defer connMutex.Unlock() + return close() + }, }, nil } -func (papod PapoD) Close() error { - return papod.queries.close() -} - var consumers = []consumerT{ } -func registerConsumers(papod PapoD, consumers []consumerT) { +func registerConsumers(papod papodT, consumers []consumerT) { for _, consumer := range consumers { papod.queue.Subscribe( consumer.topic, @@ -294,27 +691,52 @@ func registerConsumers(papod PapoD, consumers []consumerT) { } } -func NewWithPrefix(db *sql.DB, queue q.IQueue, prefix string) (PapoD, error) { - tables, err := tablesFrom(prefix) +func NewWithPrefix(databasePath string, prefix string) (IPapod, error) { + queue, err := q.NewWithPrefix(databasePath, prefix) if err != nil { - return PapoD{}, err + return papodT{}, err } - queries, err := initDB(db, tables) + auth, err := cracha.NewWithPrefix(databasePath, prefix) if err != nil { - return PapoD{}, err + return papodT{}, err } - return PapoD{ - queries: queries, + db, err := sql.Open(golite.DriverName, databasePath) + if err != nil { + return papodT{}, err + } + + err = g.ValidateSQLTablePrefix(prefix) + if err != nil { + return papodT{}, err + } + + queries, err := initDB(db, prefix) + if err != nil { + return papodT{}, err + } + + return papodT{ queue: queue, + auth: auth, + db: db, + queries: queries, }, nil } -func New(db *sql.DB, queue q.IQueue) (PapoD, error) { - return NewWithPrefix(db, queue, defaultPrefix) +func New(databasePath string) (IPapod, error) { + return NewWithPrefix(databasePath, defaultPrefix) } +func (papod papodT) Close() error { + return g.WrapErrors( + papod.queries.close(), + papod.db.Close(), + papod.auth.Close(), + papod.queue.Close(), + ) +} @@ -322,74 +744,26 @@ func New(db *sql.DB, queue q.IQueue) (PapoD, error) { -// Global variables -var Colour string - -func SetEnvironmentVariables() { - Colour = os.Getenv("PAPOD_COLOUR") - if Colour == "" { - Colour = "PAPOD-COLOUR-UNKNOWN" - } -} // FIXME: reorder -var EmitActiveConnection = g.MakeGauge("active-connections") -var EmitNicksInChannel = g.MakeGauge("nicks-in-channel") -var EmitReceivedMessage = g.MakeCounter("received-message") -var EmitWriteToClientError = g.MakeCounter("write-to-client") +var emitActiveConnection = g.MakeGauge("active-connections") +var emitNicksInChannel = g.MakeGauge("nicks-in-channel") +var emitReceivedMessage = g.MakeCounter("received-message") +var emitWriteToClientError = g.MakeCounter("write-to-client") const pingFrequency = time.Duration(30) * time.Second const pongMaxLatency = time.Duration(5) * time.Second -type Channel struct { -} - -type Connection struct { - conn net.Conn - replyChan chan string - lastReadFrom time.Time - lastWrittenTo time.Time - // id *UUID - id string - isAuthenticated bool -} - -type User struct { - connections []Connection -} - -type State struct { - users map[string]*User -} - -type Context struct { - db *sql.DB - state State - tx chan int -} - -type MessageParams struct { - Middle []string - Trailing string -} - -type Message struct { - Prefix string - Command string - Params MessageParams - Raw string -} - var ( - CmdUSER = Message { Command: "USER" } - CmdPRIVMSG = Message { Command: "PRIVMSG" } - CmdJOIN = Message { Command: "JOIN" } + cmdUSER = messageT{ command: "USER" } + cmdPRIVMSG = messageT{ command: "PRIVMSG" } + cmdJOIN = messageT{ command: "JOIN" } ) -func SplitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) { +func splitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) { idx := bytes.Index(data, []byte { '\r', '\n' }) if idx == -1 { return 0, nil, nil @@ -398,8 +772,8 @@ func SplitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) { return idx + 2, data[0:idx], nil } -func SplitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) { - advance, token, error := SplitOnCRLF(data, atEOF) +func splitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) { + advance, token, error := splitOnCRLF(data, atEOF) if len(token) == 0 { return advance, nil, error @@ -408,11 +782,11 @@ func SplitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) { return advance, token, error } -func SplitSpaces(r rune) bool { +func splitSpaces(r rune) bool { return r == ' ' } -func ParseMessageParams(params string) MessageParams { +func parseMessageParams(params string) messageParamsT { const sep = " :" var middle string @@ -427,49 +801,49 @@ func ParseMessageParams(params string) MessageParams { trailing = params[idx + len(sep):] } - return MessageParams { - Middle: strings.FieldsFunc(middle, SplitSpaces), - Trailing: trailing, + return messageParamsT{ + middle: strings.FieldsFunc(middle, splitSpaces), + trailing: trailing, } } -var MessageRegex = regexp.MustCompilePOSIX( +var messageRegex = regexp.MustCompilePOSIX( // <prefix> <command> <params> //1 2 3 4 `^(:([^ ]+) +)?([a-zA-Z]+) *( .*)$`, ) -func ParseMessage(rawMessage string) (Message, error) { - var msg Message +func parseMessage(rawMessage string) (messageT, error) { + var msg messageT - components := MessageRegex.FindStringSubmatch(rawMessage) + components := messageRegex.FindStringSubmatch(rawMessage) if components == nil { return msg, errors.New("Can't parse message") } - msg = Message { - Prefix: components[2], - Command: components[3], - Params: ParseMessageParams(components[4]), - Raw: rawMessage, + msg = messageT{ + prefix: components[2], + command: components[3], + params: parseMessageParams(components[4]), + raw: rawMessage, } return msg, nil } -func HandleUnknown(ctx *Context, msg Message) { +func handleUnknown(ctx *contextT, msg messageT) { g.Warning( "Unsupported command", "unsupported-command", - "command", msg.Command, + "command", msg.command, ) - var r Reply = ReplyUnknown - r.Prefix = "dunno" + var r replyT = replyUnknown + r.prefix = "dunno" // return []Action { r } } -func HandleUSER(ctx *Context, msg Message) { +func handleUSER(ctx *contextT, msg messageT) { fmt.Printf("USER: %#v\n", msg) } -func HandlePRIVMSG(ctx *Context, msg Message) { +func handlePRIVMSG(ctx *contextT, msg messageT) { // . assert no missing params // . write to DB: (after auth) // . channel timeline: message from $USER @@ -504,7 +878,7 @@ func HandlePRIVMSG(ctx *Context, msg Message) { fmt.Println("ret: ", ret) } -func HandleJOIN(ctx *Context, msg Message) { +func handleJOIN(ctx *contextT, msg messageT) { fmt.Printf("JOIN: %#v\n", msg) // . write to DB: (after auth) @@ -514,60 +888,45 @@ func HandleJOIN(ctx *Context, msg Message) { // . broadcast new timeline event to members of the channel } -func ReplyAnonymous() { -} - -func PersistMessage(msg Message) { -} - -type ActionType int -const ( - ActionReply = iota -) - -type Action interface { - Type() ActionType +func replyAnonymous() { } -type Reply struct { - Prefix string - Command int - Params MessageParams +func persistMessage(msg messageT) { } -func (reply Reply) Type() ActionType { - return ActionReply +func (reply replyT) typeOf() actionType { + return actionReply } var ( - ReplyUnknown = Reply { - Command: 421, - Params: MessageParams { - Middle: []string { }, - Trailing: "Unknown command", + replyUnknown = replyT{ + command: 421, + params: messageParamsT{ + middle: []string{}, + trailing: "Unknown command", }, } ) -var Commands = map[string]func(*Context, Message) { - CmdUSER.Command: HandleUSER, - CmdPRIVMSG.Command: HandlePRIVMSG, - CmdJOIN.Command: HandleJOIN, +var commands = map[string]func(*contextT, messageT) { + cmdUSER.command: handleUSER, + cmdPRIVMSG.command: handlePRIVMSG, + cmdJOIN.command: handleJOIN, } -func ActionFnFor(command string) func(*Context, Message) { - fn := Commands[command] +func actionFnFor(command string) func(*contextT, messageT) { + fn := commands[command] if fn != nil { return fn } - return HandleUnknown + return handleUnknown } -func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) { +func processMessage(ctx *contextT, connection *connectionT, rawMessage string) { connection.lastReadFrom = time.Now() - msg, err := ParseMessage(rawMessage) + msg, err := parseMessage(rawMessage) if err != nil { g.Info( "Error processing message", @@ -577,10 +936,10 @@ func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) { return } - if msg.Command == CmdUSER.Command { - args := msg.Params.Middle + if msg.command == cmdUSER.command { + args := msg.params.middle if len(args) == 0 { - go ReplyAnonymous() + go replyAnonymous() return } connection.id = args[0] @@ -588,22 +947,22 @@ func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) { } if !connection.isAuthenticated { - go ReplyAnonymous() + go replyAnonymous() return } - ActionFnFor(msg.Command)(ctx, msg) + actionFnFor(msg.command)(ctx, msg) } -func ReadLoop(ctx *Context, connection *Connection) { +func readLoop(ctx *contextT, connection *connectionT) { scanner := bufio.NewScanner(connection.conn) - scanner.Split(SplitOnRawMessage) + scanner.Split(splitOnRawMessage) for scanner.Scan() { - ProcessMessage(ctx, connection, scanner.Text()) + processMessage(ctx, connection, scanner.Text()) } } -func WriteLoop(ctx *Context, connection *Connection) { +func writeLoop(ctx *contextT, connection *connectionT) { for message := range connection.replyChan { _, err := io.WriteString(connection.conn, message) if err != nil { @@ -612,18 +971,18 @@ func WriteLoop(ctx *Context, connection *Connection) { "user-reply-error", "err", err, ) - EmitWriteToClientError() + emitWriteToClientError() continue } connection.lastWrittenTo = time.Now() } - EmitActiveConnection.Dec() + emitActiveConnection.Dec() connection.conn.Close() } -func Kill(ctx *Context, connection *Connection) { +func kill(ctx *contextT, connection *connectionT) { // lock? delete(ctx.state.users, connection.id) // unlock? @@ -631,40 +990,40 @@ func Kill(ctx *Context, connection *Connection) { connection.conn.Close() // Ignore errors? } -const PingWindow = 30 * time.Second -func PingLoop(ctx *Context, connection *Connection) { +const pingWindow = 30 * time.Second +func pingLoop(ctx *contextT, connection *connectionT) { for { - time.Sleep(PingWindow) - if (time.Since(connection.lastReadFrom) <= PingWindow) { + time.Sleep(pingWindow) + if (time.Since(connection.lastReadFrom) <= pingWindow) { continue } window := connection.lastWrittenTo.Sub(connection.lastReadFrom) - if (window <= PingWindow) { + if (window <= pingWindow) { connection.replyChan <- "PING" continue } - Kill(ctx, connection) + kill(ctx, connection) break } } -func HandleConnection(ctx *Context, conn net.Conn) { - EmitActiveConnection.Inc() +func handleConnection(ctx *contextT, conn net.Conn) { + emitActiveConnection.Inc() // FIXME: WaitGroup here? now := time.Now() - connection := Connection { + connection := connectionT { conn: conn, isAuthenticated: false, lastReadFrom: now, lastWrittenTo: now, } - go ReadLoop(ctx, &connection) - go WriteLoop(ctx, &connection) - go PingLoop(ctx, &connection) + go readLoop(ctx, &connection) + go writeLoop(ctx, &connection) + go pingLoop(ctx, &connection) } -func IRCdLoop(ctx *Context, publicSocketPath string) { +func serverLoop(ctx *contextT, publicSocketPath string) { listener, err := net.Listen("unix", publicSocketPath) g.FatalIf(err) g.Info("IRCd started", "component-up", "component", "ircd") @@ -681,11 +1040,11 @@ func IRCdLoop(ctx *Context, publicSocketPath string) { continue } // FIXME: where does it get closed - go HandleConnection(ctx, conn) + go handleConnection(ctx, conn) } } -func CommandListenerLoop(ctx *Context, commandSocketPath string) { +func commandListenerLoop(ctx *contextT, commandSocketPath string) { listener, err := net.Listen("unix", commandSocketPath) g.FatalIf(err) g.Info( @@ -710,102 +1069,22 @@ func CommandListenerLoop(ctx *Context, commandSocketPath string) { } } -func TransactorLoop(ctx *Context) { +func transactorLoop(ctx *contextT) { g.Info("transactor started", "component-up", "component", "transactor") - EmitActiveConnection.Inc() + emitActiveConnection.Inc() for tx := range ctx.tx { fmt.Println(tx) } } -func InitMigrations(db *sql.DB) { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS migrations ( - filename TEXT PRIMARY KEY - ); - `) - g.FatalIf(err) -} - -const MIGRATIONS_DIR = "src/sql/migrations/" -func PendingMigrations(db *sql.DB) []string { - files, err := ioutil.ReadDir(MIGRATIONS_DIR) +func initDB2(databasePath string) *sql.DB { + db, err := sql.Open(golite.DriverName, databasePath) g.FatalIf(err) - - set := make(map[string]bool) - for _, file := range files { - set[file.Name()] = true - } - - rows, err := db.Query(`SELECT filename FROM migrations;`) - g.FatalIf(err) - defer rows.Close() - - for rows.Next() { - var filename string - err := rows.Scan(&filename) - g.FatalIf(err) - delete(set, filename) - } - g.FatalIf(rows.Err()) - - difference := make([]string, 0) - for filename := range set { - difference = append(difference, filename) - } - - sort.Sort(sort.StringSlice(difference)) - return difference -} - -func RunMigrations(db *sql.DB) { - InitMigrations(db) - - stmt, err := db.Prepare(`INSERT INTO migrations (filename) VALUES (?);`) - g.FatalIf(err) - defer stmt.Close() - - for _, filename := range PendingMigrations(db) { - g.Info("Running migration file", "exec-migration-file", - "filename", filename, - ) - - tx, err := db.Begin() - g.FatalIf(err) - - sql, err := os.ReadFile(MIGRATIONS_DIR + filename) - g.FatalIf(err) - - _, err = tx.Exec(string(sql)) - g.FatalIf(err) - - _, err = tx.Stmt(stmt).Exec(filename) - g.FatalIf(err) - - err = tx.Commit() - g.FatalIf(err) - } -} - -func InitDB(databasePath string) *sql.DB { - db, err := sql.Open("sqlite3", databasePath) - g.FatalIf(err) - // RunMigrations(db) return db } -func Init() { - g.Init(slog.Group( - "versions", - "gobang", g.Version, - // "golite", golite.Version, - "this", Version, - )) - SetEnvironmentVariables() -} - -func Start(ctx *Context, publicSocketPath string, commandSocketPath string) { +func start(ctx *contextT, publicSocketPath string, commandSocketPath string) { /* buildInfo, ok := debug.ReadBuildInfo() if !ok { @@ -833,16 +1112,16 @@ func Start(ctx *Context, publicSocketPath string, commandSocketPath string) { wg.Done() }() } - bgRun(func() { IRCdLoop(ctx, publicSocketPath) }) - bgRun(func() { CommandListenerLoop(ctx, commandSocketPath) }) - bgRun(func() { TransactorLoop(ctx) }) + bgRun(func() { serverLoop(ctx, publicSocketPath) }) + bgRun(func() { commandListenerLoop(ctx, commandSocketPath) }) + bgRun(func() { transactorLoop(ctx) }) wg.Wait() } -func BuildContext(databasePath string) *Context { - db := InitDB(databasePath) +func buildContext(databasePath string) *contextT { + db := initDB2(databasePath) tx := make(chan int, 100) - return &Context { + return &contextT { db: db, tx: tx, } @@ -868,8 +1147,19 @@ var ( func Main() { - Init() + g.Init(slog.Group( + "versions", + "cracha", cracha.Version, + "q", q.Version, + "golite", golite.Version, + "guuid", guuid.Version, + "gobang", g.Version, + "papod", Version, + "this", Version, + )) flag.Parse() - ctx := BuildContext(*databasePath) - Start(ctx, *publicSocketPath, *commandSocketPath) + ctx := buildContext(*databasePath) + start(ctx, *publicSocketPath, *commandSocketPath) } + +// FIXME: review usage of g.Fatal() |