summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2024-10-27 09:34:59 -0300
committerEuAndreh <eu@euandre.org>2024-10-27 09:34:59 -0300
commit11718f5c08a821eded93be54399576e8836a9a37 (patch)
tree43debc63238d06a4458e91b33dee1475c0dac824 /src
parentBig bundle commit: adjust build project skeleton; include uncommitted code (diff)
downloadpapod-11718f5c08a821eded93be54399576e8836a9a37.tar.gz
papod-11718f5c08a821eded93be54399576e8836a9a37.tar.xz
src/papod.go: Make private most exported names and start actually implementing queriesT{}
Diffstat (limited to 'src')
-rw-r--r--src/papod.go1086
1 files changed, 688 insertions, 398 deletions
diff --git a/src/papod.go b/src/papod.go
index 220ef55..aedfb5f 100644
--- a/src/papod.go
+++ b/src/papod.go
@@ -3,259 +3,601 @@ package papod
import (
"bufio"
"bytes"
+ "context"
"database/sql"
"errors"
"flag"
"fmt"
"io"
- "io/ioutil"
"log/slog"
"net"
- "os"
"regexp"
- "sort"
"strings"
"sync"
"time"
- "gracha"
+ "cracha"
"q"
- // "golite"
+ "golite"
"guuid"
g "gobang"
)
-type tablesT struct{
- servers string
- users string
- channels string
- channelEvents string
+const (
+ defaultPrefix = "papod"
+ rollbackErrorFmt = "rollback error: %w; while executing: %w"
+
+ NEW_CHANNEL = "new-channel"
+)
+
+
+
+type queryT struct{
+ write string
+ read string
}
type queriesT struct{
- addChannel func(string) error
- eventsAfter func([]byte, func(eventT) error) error
- addEvent func(string, string) error
- close func() error
+ addNetwork func(guuid.UUID, newNetworkT) (networkT, error)
+ addChannel func(guuid.UUID, newChannelT) (channelT, error)
+ channels func(guuid.UUID, func(channelT) error) error
+ allAfter func(guuid.UUID, func(eventT) error) error
+ addEvent func(string, string) error
+ close func() error
+}
+
+type newNetworkT struct{
+ uuid guuid.UUID
+ name string
+}
+
+type networkT struct{
+ id int64
+ timestamp time.Time
+ uuid guuid.UUID
+ name string
+ createdBy guuid.UUID
+}
+
+type newChannelT struct{
+ name string
+ uuid guuid.UUID
+}
+
+type channelT struct {
+ id int64
+ timestamp time.Time
+ uuid guuid.UUID
+ name string
}
type eventT struct{
- id int64
+ id int64
+ timestamp time.Time
+ uuid guuid.UUID
}
-type PapoD struct{
- queries queriesT
- auth gracha.IAuth
+type papodT struct{
+ auth cracha.IAuth
queue q.IQueue
+ db *sql.DB
+ queries queriesT
}
type consumerT struct{
topic string
- handlerFn func(PapoD) func(q.Message) error
+ handlerFn func(papodT) func(q.Message) error
}
+type connectionT struct {
+ conn net.Conn
+ replyChan chan string
+ lastReadFrom time.Time
+ lastWrittenTo time.Time
+ // id *UUID
+ id string
+ isAuthenticated bool
+}
+type userT struct {
+ connections []connectionT
+}
-const (
- NEW_CHANNEL_EVENT = "new-channel-event"
+type stateT struct {
+ users map[string]*userT
+}
+
+type contextT struct {
+ db *sql.DB
+ state stateT
+ tx chan int
+}
+
+type messageParamsT struct {
+ middle []string
+ trailing string
+}
+
+type messageT struct {
+ prefix string
+ command string
+ params messageParamsT
+ raw string
+}
- defaultPrefix = "papod"
+type actionType int
+const (
+ actionReply actionType = iota
)
+type action interface {
+ typeOf() actionType
+}
+
+type replyT struct {
+ prefix string
+ command int
+ params messageParamsT
+}
+
+type IPapod interface{
+}
+
+
+
+func tryRollback(db *sql.DB, ctx context.Context, err error) error {
+ _, rollbackErr := db.ExecContext(ctx, "ROLLBACK;")
+ if rollbackErr != nil {
+ return fmt.Errorf(
+ rollbackErrorFmt,
+ rollbackErr,
+ err,
+ )
+ }
+
+ return err
+}
+
+// FIXME
+// See:
+// https://sqlite.org/forum/forumpost/2507664507
+func inTx(db *sql.DB, fn func(context.Context) error) error {
+ ctx := context.Background()
+ _, err := db.ExecContext(ctx, "BEGIN IMMEDIATE;")
+ if err != nil {
+ return err
+ }
-func tablesFrom(prefix string) (tablesT, error) {
- err := g.ValidateSQLTablePrefix(prefix)
+ err = fn(ctx)
if err != nil {
- return tablesT{}, err
- }
-
- servers := prefix + "-servers"
- users := prefix + "-users"
- channels := prefix + "-channels"
- channelEvents := prefix + "-channel-events"
- return tablesT{
- servers: servers,
- users: users,
- channels: channels,
- channelEvents: channelEvents,
- }, nil
+ return tryRollback(db, ctx, err)
+ }
+
+ _, err = db.ExecContext(ctx, "COMMIT;")
+ if err != nil {
+ return tryRollback(db, ctx, err)
+ }
+
+ return nil
}
-func createTables(db *sql.DB, tables tablesT) error {
- const tmpl = `
- BEGIN TRANSACTION;
- CREATE TABLE IF NOT EXISTS "%s" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- uuid BLOB NOT NULL UNIQUE,
- name TEXT NOT NULL,
- description TEXT NOT NULL,
- public BOOLEAN NOT NULL,
- metadata TEXT
+/// "papod_users".uuid is the same as cracha_users.uuid. Not a foreign key to
+/// allow them to live in different physical locations. Adding it here feels
+/// less like an optimization related decision, and more of a coupling one. The
+/// way that New() works now uses the same databasePath for the q.IQueue *and*
+/// cracha.IAuth, but cracha in no way exposes where it stores the user UUID or
+/// how the it is handled. This has similarities to how events here don't
+/// reference the q.Message.ID via foreign keys either. They're treated only as
+/// opaque IDs.
+func createTablesSQL(prefix string) queryT {
+ const tmpl_write = `
+ CREATE TABLE IF NOT EXISTS "%s_workspaces" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ uuid BLOB NOT NULL UNIQUE,
+ name TEXT NOT NULL,
+ description TEXT NOT NULL,
+ -- "public", "private", "unlisted"
+ type TEXT NOT NULL
);
- CREATE TABLE IF NOT EXISTS "%s" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- uuid BLOB NOT NULL UNIQUE,
- server_id INTEGER NOT NULL REFERENCES "%s"(id),
- authuser_uuid BLOB NOT NULL UNIQUE,
- human BOOLEAN NOT NULL,
- visible BOOLEAN NOT NULL,
- enabled BOOLEAN NOT NULL,
- metadata TEXT
+ CREATE TABLE IF NOT EXISTS "%s_workspace_changes (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ workspace_id INTEGER NOT NULL
+ REFERENCES "%s_workspaces"(id),
+ attribute TEXT NOT NULL,
+ value TEXT NOT NULL,
+ op BOOLEAN NOT NULL
);
- CREATE TABLE IF NOT EXISTS "%s" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- uuid BLOB NOT NULL UNIQUE,
- server_id INTEGER NOT NULL REFERENCES "%s"(id),
- name TEXT,
- description TEXT,
- virtual BOOLEAN NOT NULL,
- metadata TEXT
+ CREATE TABLE IF NOT EXISTS "%s_users" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ -- provided by cracha
+ uuid BLOB NOT NULL UNIQUE,
+ username TEXT NOT NULL,
+ display_name TEXT NOT NULL,
+ picture_uuid BLOB NOT NULL UNIQUE,
+ deleted BOOLEAN NOT NULL
);
- -- FIXME: group conversations?
- CREATE TABLE IF NOT EXISTS "%s" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- uuid BLOB NOT NULL UNIQUE,
- channel_id INTEGER NOT NULL REFERENCES "%s"(id),
- connection_id INTEGER NOT NULL
+ CREATE TABLE IF NOT EXISTS "%s_user_changes" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ user_id INTEGER NOT NULL REFERENCES "%s_users"(id),
+ attribute TEXT NOT NULL,
+ value TEXT NOT NULL,
+ op BOOLEAN NOT NULL
+ );
+ CREATE TABLE IF NOT EXISTS "%s_members" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ workspace_id INTEGER NOT NULL
+ REFERENCES "%s_workspaces"(id),
+ user_id INTEGER NOT NULL,
+ username TEXT NOT NULL,
+ display_name TEXT NOT NULL,
+ picture_uuid BLOB NOT NULL UNIQUE,
+ -- "active", "inactive", "removed"
+ status TEXT NOT NULL,
+ -- "active", always
+ active_uniq TEXT,
+ UNIQUE (workspace_id, username, active_uniq),
+ UNIQUE (workspace_id, user_id)
+ );
+ CREATE TABLE IF NOT EXISTS "%s_member_roles" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ member_id INTEGER NOT NULL
+ REFERENCES "%s_members"(id),
+ role TEXT NOT NULL,
+ UNIQUE (member_id, role)
+ );
+ CREATE TABLE IF NOT EXISTS "%s_member_changes" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ member_id INTEGER NOT NULL
+ REFERENCES "%s_members"(id),
+ attribute TEXT NOT NULL,
+ value TEXT NOT NULL,
+ op BOOLEAN NOT NULL
+ );
+ CREATE TABLE IF NOT EXISTS "%s_channels" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ uuid BLOB NOT NULL UNIQUE,
+ workspace_id INTEGER NOT NULL
+ REFERENCES "%s_workspaces"(id),
+ public_name TEXT UNIQUE,
+ label TEXT NOT NULL,
+ description TEXT,
+ virtual BOOLEAN NOT NULL
+ );
+ CREATE TABLE IF NOT EXISTS "%s_channel_changes" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ channel_id INTEGER NOT NULL
+ REFERENCES "%s_channels"(id),
+ attribute TEXT NOT NULL,
+ value TEXT NOT NULL,
+ op BOOLEAN NOT NULL
+ );
+ CREATE TABLE IF NOT EXISTS "%s_participants" (
+ member_id
+ );
+ CREATE TABLE IF NOT EXISTS "%s_channel_events" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ uuid BLOB NOT NULL UNIQUE,
+ channel_id INTEGER NOT NULL REFERENCES "%s"(id),
+ connection_id INTEGER NOT NULL,
-- payload FIXME: vary by type?
);
+ -- FIXME: group conversations?
+ -- user: person
+ -- member: workspace user
+ -- participant: channel member
+ `
+ return queryT{
+ write: fmt.Sprintf(
+ tmpl_write,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ ),
+ }
+}
+func createTables(db *sql.DB, prefix string) error {
+ q := createTablesSQL(prefix)
- -- FIXME:indexes
- COMMIT TRANSACTION;
- `
- sql := fmt.Sprintf(
- tmpl,
- tables.servers,
- g.SQLiteNow,
- tables.users,
- g.SQLiteNow,
- tables.servers,
- tables.channels,
- g.SQLiteNow,
- tables.servers,
- tables.channelEvents,
- g.SQLiteNow,
- tables.channels,
- )
- fmt.Println(sql) ///
+ return inTx(db, func(ctx context.Context) error {
+ _, err := db.ExecContext(ctx, q.write)
+ return err
+ })
+}
- _, err := db.Exec(sql)
- return err
+// addServer
+// addWorkspace
+// addNetwork FIXME
+func addNetworkSQL(prefix string) queryT {
+ const tmpl_write = `
+ -- FIXME
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
}
-func addChannelQuery(
+func addNetworkStmt(
db *sql.DB,
- tables tablesT,
-) (func (string) error, func() error, error) {
- const tmpl = `
+ prefix string,
+) (func(guuid.UUID, newNetworkT) (networkT, error), func() error, error) {
+ q := addNetworkSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(
+ userID guuid.UUID,
+ newNetwork newNetworkT,
+ ) (networkT, error) {
+ network := networkT{
+ uuid: newNetwork.uuid,
+ name: newNetwork.name,
+ }
+
+ var timestr string
+ network_id_bytes := newNetwork.uuid[:]
+ user_id_bytes := userID[:]
+ err := writeStmt.QueryRow(
+ network_id_bytes,
+ newNetwork.name,
+ user_id_bytes,
+ ).Scan(&network.id, &timestr)
+ if err != nil {
+ return networkT{}, err
+ }
+
+ network.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return networkT{}, err
+ }
+
+ return network, nil
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func addChannelSQL(prefix string) queryT {
+ const tmpl_write = `
+ -- FIXME
`
- sql := fmt.Sprintf(tmpl)
- /// fmt.Println(sql) ///
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func addChannelStmt(
+ db *sql.DB,
+ prefix string,
+) (func (guuid.UUID, newChannelT) (channelT, error), func() error, error) {
+ q := addChannelSQL(prefix)
- stmt, err := db.Prepare(sql)
+ writeStmt, err := db.Prepare(q.write)
if err != nil {
return nil, nil, err
}
- fn := func(name string) error {
- _, err := stmt.Exec(name)
- return err
+ fn := func(
+ networkID guuid.UUID,
+ newChannel newChannelT,
+ ) (channelT, error) {
+ channel := channelT{
+ name: newChannel.name,
+ }
+
+ var timestr string
+ network_id_bytes := networkID[:]
+ err := writeStmt.QueryRow(
+ network_id_bytes,
+ newChannel.name,
+ ).Scan(&channel.id, &timestr)
+ if err != nil {
+ return channelT{}, err
+ }
+
+ channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return channelT{}, err
+ }
+
+ return channel, nil
}
return fn, nil, nil
}
-func eventEach(
- stmt *sql.Stmt,
- uuid []byte,
- callback func(eventT) error,
-) error {
- rows, err := stmt.Query(uuid)
+func channelEach(rows *sql.Rows, callback func(channelT) error) error {
+ if rows == nil {
+ return nil
+ }
+
+ for rows.Next() {
+ var (
+ channel channelT
+ timestr string
+ channel_id_bytes []byte
+ )
+ err := rows.Scan(
+ &channel.id,
+ &timestr,
+ &channel_id_bytes,
+ )
+ if err != nil {
+ g.WrapErrors(rows.Close(), err)
+ }
+
+ channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ g.WrapErrors(rows.Close(), err)
+ }
+
+ err = callback(channel)
+ if err != nil {
+ g.WrapErrors(rows.Close(), err)
+ }
+ }
+
+ return g.WrapErrors(rows.Err(), rows.Close())
+}
+
+func channelsSQL(prefix string) queryT {
+ const tmpl_read = `
+ -- FIXME
+ `
+ return queryT{
+ read: fmt.Sprintf(tmpl_read, prefix),
+ }
+}
+
+func channelsStmt(
+ db *sql.DB,
+ prefix string,
+) (func(guuid.UUID) (*sql.Rows, error), func() error, error) {
+ q := channelsSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
if err != nil {
- return err
+ return nil, nil, err
+ }
+
+ fn := func(workspaceID guuid.UUID) (*sql.Rows, error) {
+ return readStmt.Query(workspaceID)
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func eventEach(rows *sql.Rows, callback func(eventT) error) error {
+ if rows == nil {
+ return nil
}
- defer rows.Close()
for rows.Next() {
- var event eventT
- err = rows.Scan(
+ var (
+ event eventT
+ timestr string
+ event_id_bytes []byte
+ )
+ err := rows.Scan(
&event.id,
+ &timestr,
+ &event_id_bytes,
)
if err != nil {
- return err
+ return g.WrapErrors(rows.Close(), err)
+ }
+
+ event.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
}
err = callback(event)
if err != nil {
- return err
+ return g.WrapErrors(rows.Close(), err)
}
}
- return rows.Err()
+ return g.WrapErrors(rows.Err(), rows.Close())
}
-func eventsAfterQuery(
- db *sql.DB,
- tables tablesT,
-) (func ([]byte, func(eventT) error) error, func() error, error) {
- const tmpl = `
- -- INSERT
+func allAfterSQL(prefix string) queryT {
+ const tmpl_read = `
+ -- FIXME
`
- sql := fmt.Sprintf(tmpl)
- /// fmt.Println(sql) ///
+ return queryT{
+ read: fmt.Sprintf(tmpl_read, prefix),
+ }
+}
+
+func allAfterStmt(
+ db *sql.DB,
+ prefix string,
+) (func (guuid.UUID) (*sql.Rows, error), func() error, error) {
+ q := allAfterSQL(prefix)
- stmt, err := db.Prepare(sql)
+ readStmt, err := db.Prepare(q.read)
if err != nil {
return nil, nil, err
}
- fn := func(uuid []byte, callback func(eventT) error) error {
- return eventEach(stmt, uuid, callback)
+ fn := func(eventID guuid.UUID) (*sql.Rows, error) {
+ return readStmt.Query(eventID)
}
- return fn, stmt.Close, nil
+ return fn, readStmt.Close, nil
+}
+
+func addEventSQL(prefix string) queryT {
+ const tmpl_write = `
+ -- FIXME
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
}
-func addEventQuery(
+func addEventStmt(
db *sql.DB,
- tables tablesT,
+ prefix string,
) (func (string, string) error, func() error, error) {
- const tmpl = `
- `
- sql := fmt.Sprintf(tmpl)
- /// fmt.Println(sql) ///
+ q := addEventSQL(prefix)
- stmt, err := db.Prepare(sql)
+ writeStmt, err := db.Prepare(q.write)
if err != nil {
return nil, nil, err
}
fn := func(type_ string, payload string) error {
- _, err := stmt.Exec(type_, payload)
+ _, err := writeStmt.Exec(type_, payload)
return err
}
- return fn, stmt.Close, nil
+ return fn, writeStmt.Close, nil
}
-func initDB(db *sql.DB, tables tablesT) (queriesT, error) {
- createTablesErr := createTables(db, tables)
- addChannel, addChannelClose, addChannelErr := addChannelQuery(db, tables)
- eventsAfter, eventsAfterClose, eventsAfterErr := eventsAfterQuery(db, tables)
- addEvent, addEventClose, addEventErr := addEventQuery(db, tables)
+func initDB(
+ db *sql.DB,
+ prefix string,
+) (queriesT, error) {
+ createTablesErr := createTables(db, prefix)
+// addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceStmt(db, prefix)
+// addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceQ(db, p)
+ addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix)
+ addChannel, addChannelClose, addChannelErr := addChannelStmt(db, prefix)
+ channels, channelsClose, channelsErr := channelsStmt(db, prefix)
+ allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix)
+ addEvent, addEventClose, addEventErr := addEventStmt(db, prefix)
err := g.SomeError(
createTablesErr,
+ addNetworkErr,
addChannelErr,
- eventsAfterErr,
+ channelsErr,
+ allAfterErr,
addEventErr,
)
if err != nil {
@@ -264,27 +606,82 @@ func initDB(db *sql.DB, tables tablesT) (queriesT, error) {
close := func() error {
return g.SomeFnError(
- addChannelClose,
- eventsAfterClose,
- addEventClose,
+ addNetworkClose,
+ addChannelClose,
+ channelsClose,
+ allAfterClose,
+ addEventClose,
)
}
+ var connMutex sync.RWMutex
return queriesT{
- addChannel: addChannel,
- eventsAfter: eventsAfter,
- addEvent: addEvent,
- close: close,
+ addNetwork: func(a guuid.UUID, b newNetworkT) (networkT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return addNetwork(a, b)
+ },
+ addChannel: func(
+ a guuid.UUID, b newChannelT,
+ ) (channelT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return addChannel(a, b)
+ },
+ channels: func(
+ a guuid.UUID,
+ callback func(channelT) error,
+ ) error {
+ var (
+ err error
+ rows *sql.Rows
+ )
+ {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ rows, err = channels(a)
+ }
+ if err != nil {
+ return err
+ }
+
+ return channelEach(rows, callback)
+ },
+ allAfter: func(
+ a guuid.UUID,
+ callback func(eventT) error,
+ ) error {
+ var (
+ err error
+ rows *sql.Rows
+ )
+ {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ rows, err = allAfter(a)
+ }
+ if err != nil {
+ return err
+ }
+
+ return eventEach(rows, callback)
+ },
+ addEvent: func(a string, b string) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return addEvent(a, b)
+ },
+ close: func() error {
+ connMutex.Lock()
+ defer connMutex.Unlock()
+ return close()
+ },
}, nil
}
-func (papod PapoD) Close() error {
- return papod.queries.close()
-}
-
var consumers = []consumerT{
}
-func registerConsumers(papod PapoD, consumers []consumerT) {
+func registerConsumers(papod papodT, consumers []consumerT) {
for _, consumer := range consumers {
papod.queue.Subscribe(
consumer.topic,
@@ -294,27 +691,52 @@ func registerConsumers(papod PapoD, consumers []consumerT) {
}
}
-func NewWithPrefix(db *sql.DB, queue q.IQueue, prefix string) (PapoD, error) {
- tables, err := tablesFrom(prefix)
+func NewWithPrefix(databasePath string, prefix string) (IPapod, error) {
+ queue, err := q.NewWithPrefix(databasePath, prefix)
if err != nil {
- return PapoD{}, err
+ return papodT{}, err
}
- queries, err := initDB(db, tables)
+ auth, err := cracha.NewWithPrefix(databasePath, prefix)
if err != nil {
- return PapoD{}, err
+ return papodT{}, err
}
- return PapoD{
- queries: queries,
+ db, err := sql.Open(golite.DriverName, databasePath)
+ if err != nil {
+ return papodT{}, err
+ }
+
+ err = g.ValidateSQLTablePrefix(prefix)
+ if err != nil {
+ return papodT{}, err
+ }
+
+ queries, err := initDB(db, prefix)
+ if err != nil {
+ return papodT{}, err
+ }
+
+ return papodT{
queue: queue,
+ auth: auth,
+ db: db,
+ queries: queries,
}, nil
}
-func New(db *sql.DB, queue q.IQueue) (PapoD, error) {
- return NewWithPrefix(db, queue, defaultPrefix)
+func New(databasePath string) (IPapod, error) {
+ return NewWithPrefix(databasePath, defaultPrefix)
}
+func (papod papodT) Close() error {
+ return g.WrapErrors(
+ papod.queries.close(),
+ papod.db.Close(),
+ papod.auth.Close(),
+ papod.queue.Close(),
+ )
+}
@@ -322,74 +744,26 @@ func New(db *sql.DB, queue q.IQueue) (PapoD, error) {
-// Global variables
-var Colour string
-
-func SetEnvironmentVariables() {
- Colour = os.Getenv("PAPOD_COLOUR")
- if Colour == "" {
- Colour = "PAPOD-COLOUR-UNKNOWN"
- }
-}
// FIXME: reorder
-var EmitActiveConnection = g.MakeGauge("active-connections")
-var EmitNicksInChannel = g.MakeGauge("nicks-in-channel")
-var EmitReceivedMessage = g.MakeCounter("received-message")
-var EmitWriteToClientError = g.MakeCounter("write-to-client")
+var emitActiveConnection = g.MakeGauge("active-connections")
+var emitNicksInChannel = g.MakeGauge("nicks-in-channel")
+var emitReceivedMessage = g.MakeCounter("received-message")
+var emitWriteToClientError = g.MakeCounter("write-to-client")
const pingFrequency = time.Duration(30) * time.Second
const pongMaxLatency = time.Duration(5) * time.Second
-type Channel struct {
-}
-
-type Connection struct {
- conn net.Conn
- replyChan chan string
- lastReadFrom time.Time
- lastWrittenTo time.Time
- // id *UUID
- id string
- isAuthenticated bool
-}
-
-type User struct {
- connections []Connection
-}
-
-type State struct {
- users map[string]*User
-}
-
-type Context struct {
- db *sql.DB
- state State
- tx chan int
-}
-
-type MessageParams struct {
- Middle []string
- Trailing string
-}
-
-type Message struct {
- Prefix string
- Command string
- Params MessageParams
- Raw string
-}
-
var (
- CmdUSER = Message { Command: "USER" }
- CmdPRIVMSG = Message { Command: "PRIVMSG" }
- CmdJOIN = Message { Command: "JOIN" }
+ cmdUSER = messageT{ command: "USER" }
+ cmdPRIVMSG = messageT{ command: "PRIVMSG" }
+ cmdJOIN = messageT{ command: "JOIN" }
)
-func SplitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) {
+func splitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) {
idx := bytes.Index(data, []byte { '\r', '\n' })
if idx == -1 {
return 0, nil, nil
@@ -398,8 +772,8 @@ func SplitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) {
return idx + 2, data[0:idx], nil
}
-func SplitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) {
- advance, token, error := SplitOnCRLF(data, atEOF)
+func splitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) {
+ advance, token, error := splitOnCRLF(data, atEOF)
if len(token) == 0 {
return advance, nil, error
@@ -408,11 +782,11 @@ func SplitOnRawMessage(data []byte, atEOF bool) (int, []byte, error) {
return advance, token, error
}
-func SplitSpaces(r rune) bool {
+func splitSpaces(r rune) bool {
return r == ' '
}
-func ParseMessageParams(params string) MessageParams {
+func parseMessageParams(params string) messageParamsT {
const sep = " :"
var middle string
@@ -427,49 +801,49 @@ func ParseMessageParams(params string) MessageParams {
trailing = params[idx + len(sep):]
}
- return MessageParams {
- Middle: strings.FieldsFunc(middle, SplitSpaces),
- Trailing: trailing,
+ return messageParamsT{
+ middle: strings.FieldsFunc(middle, splitSpaces),
+ trailing: trailing,
}
}
-var MessageRegex = regexp.MustCompilePOSIX(
+var messageRegex = regexp.MustCompilePOSIX(
// <prefix> <command> <params>
//1 2 3 4
`^(:([^ ]+) +)?([a-zA-Z]+) *( .*)$`,
)
-func ParseMessage(rawMessage string) (Message, error) {
- var msg Message
+func parseMessage(rawMessage string) (messageT, error) {
+ var msg messageT
- components := MessageRegex.FindStringSubmatch(rawMessage)
+ components := messageRegex.FindStringSubmatch(rawMessage)
if components == nil {
return msg, errors.New("Can't parse message")
}
- msg = Message {
- Prefix: components[2],
- Command: components[3],
- Params: ParseMessageParams(components[4]),
- Raw: rawMessage,
+ msg = messageT{
+ prefix: components[2],
+ command: components[3],
+ params: parseMessageParams(components[4]),
+ raw: rawMessage,
}
return msg, nil
}
-func HandleUnknown(ctx *Context, msg Message) {
+func handleUnknown(ctx *contextT, msg messageT) {
g.Warning(
"Unsupported command", "unsupported-command",
- "command", msg.Command,
+ "command", msg.command,
)
- var r Reply = ReplyUnknown
- r.Prefix = "dunno"
+ var r replyT = replyUnknown
+ r.prefix = "dunno"
// return []Action { r }
}
-func HandleUSER(ctx *Context, msg Message) {
+func handleUSER(ctx *contextT, msg messageT) {
fmt.Printf("USER: %#v\n", msg)
}
-func HandlePRIVMSG(ctx *Context, msg Message) {
+func handlePRIVMSG(ctx *contextT, msg messageT) {
// . assert no missing params
// . write to DB: (after auth)
// . channel timeline: message from $USER
@@ -504,7 +878,7 @@ func HandlePRIVMSG(ctx *Context, msg Message) {
fmt.Println("ret: ", ret)
}
-func HandleJOIN(ctx *Context, msg Message) {
+func handleJOIN(ctx *contextT, msg messageT) {
fmt.Printf("JOIN: %#v\n", msg)
// . write to DB: (after auth)
@@ -514,60 +888,45 @@ func HandleJOIN(ctx *Context, msg Message) {
// . broadcast new timeline event to members of the channel
}
-func ReplyAnonymous() {
-}
-
-func PersistMessage(msg Message) {
-}
-
-type ActionType int
-const (
- ActionReply = iota
-)
-
-type Action interface {
- Type() ActionType
+func replyAnonymous() {
}
-type Reply struct {
- Prefix string
- Command int
- Params MessageParams
+func persistMessage(msg messageT) {
}
-func (reply Reply) Type() ActionType {
- return ActionReply
+func (reply replyT) typeOf() actionType {
+ return actionReply
}
var (
- ReplyUnknown = Reply {
- Command: 421,
- Params: MessageParams {
- Middle: []string { },
- Trailing: "Unknown command",
+ replyUnknown = replyT{
+ command: 421,
+ params: messageParamsT{
+ middle: []string{},
+ trailing: "Unknown command",
},
}
)
-var Commands = map[string]func(*Context, Message) {
- CmdUSER.Command: HandleUSER,
- CmdPRIVMSG.Command: HandlePRIVMSG,
- CmdJOIN.Command: HandleJOIN,
+var commands = map[string]func(*contextT, messageT) {
+ cmdUSER.command: handleUSER,
+ cmdPRIVMSG.command: handlePRIVMSG,
+ cmdJOIN.command: handleJOIN,
}
-func ActionFnFor(command string) func(*Context, Message) {
- fn := Commands[command]
+func actionFnFor(command string) func(*contextT, messageT) {
+ fn := commands[command]
if fn != nil {
return fn
}
- return HandleUnknown
+ return handleUnknown
}
-func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) {
+func processMessage(ctx *contextT, connection *connectionT, rawMessage string) {
connection.lastReadFrom = time.Now()
- msg, err := ParseMessage(rawMessage)
+ msg, err := parseMessage(rawMessage)
if err != nil {
g.Info(
"Error processing message",
@@ -577,10 +936,10 @@ func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) {
return
}
- if msg.Command == CmdUSER.Command {
- args := msg.Params.Middle
+ if msg.command == cmdUSER.command {
+ args := msg.params.middle
if len(args) == 0 {
- go ReplyAnonymous()
+ go replyAnonymous()
return
}
connection.id = args[0]
@@ -588,22 +947,22 @@ func ProcessMessage(ctx *Context, connection *Connection, rawMessage string) {
}
if !connection.isAuthenticated {
- go ReplyAnonymous()
+ go replyAnonymous()
return
}
- ActionFnFor(msg.Command)(ctx, msg)
+ actionFnFor(msg.command)(ctx, msg)
}
-func ReadLoop(ctx *Context, connection *Connection) {
+func readLoop(ctx *contextT, connection *connectionT) {
scanner := bufio.NewScanner(connection.conn)
- scanner.Split(SplitOnRawMessage)
+ scanner.Split(splitOnRawMessage)
for scanner.Scan() {
- ProcessMessage(ctx, connection, scanner.Text())
+ processMessage(ctx, connection, scanner.Text())
}
}
-func WriteLoop(ctx *Context, connection *Connection) {
+func writeLoop(ctx *contextT, connection *connectionT) {
for message := range connection.replyChan {
_, err := io.WriteString(connection.conn, message)
if err != nil {
@@ -612,18 +971,18 @@ func WriteLoop(ctx *Context, connection *Connection) {
"user-reply-error",
"err", err,
)
- EmitWriteToClientError()
+ emitWriteToClientError()
continue
}
connection.lastWrittenTo = time.Now()
}
- EmitActiveConnection.Dec()
+ emitActiveConnection.Dec()
connection.conn.Close()
}
-func Kill(ctx *Context, connection *Connection) {
+func kill(ctx *contextT, connection *connectionT) {
// lock?
delete(ctx.state.users, connection.id)
// unlock?
@@ -631,40 +990,40 @@ func Kill(ctx *Context, connection *Connection) {
connection.conn.Close() // Ignore errors?
}
-const PingWindow = 30 * time.Second
-func PingLoop(ctx *Context, connection *Connection) {
+const pingWindow = 30 * time.Second
+func pingLoop(ctx *contextT, connection *connectionT) {
for {
- time.Sleep(PingWindow)
- if (time.Since(connection.lastReadFrom) <= PingWindow) {
+ time.Sleep(pingWindow)
+ if (time.Since(connection.lastReadFrom) <= pingWindow) {
continue
}
window := connection.lastWrittenTo.Sub(connection.lastReadFrom)
- if (window <= PingWindow) {
+ if (window <= pingWindow) {
connection.replyChan <- "PING"
continue
}
- Kill(ctx, connection)
+ kill(ctx, connection)
break
}
}
-func HandleConnection(ctx *Context, conn net.Conn) {
- EmitActiveConnection.Inc()
+func handleConnection(ctx *contextT, conn net.Conn) {
+ emitActiveConnection.Inc()
// FIXME: WaitGroup here?
now := time.Now()
- connection := Connection {
+ connection := connectionT {
conn: conn,
isAuthenticated: false,
lastReadFrom: now,
lastWrittenTo: now,
}
- go ReadLoop(ctx, &connection)
- go WriteLoop(ctx, &connection)
- go PingLoop(ctx, &connection)
+ go readLoop(ctx, &connection)
+ go writeLoop(ctx, &connection)
+ go pingLoop(ctx, &connection)
}
-func IRCdLoop(ctx *Context, publicSocketPath string) {
+func serverLoop(ctx *contextT, publicSocketPath string) {
listener, err := net.Listen("unix", publicSocketPath)
g.FatalIf(err)
g.Info("IRCd started", "component-up", "component", "ircd")
@@ -681,11 +1040,11 @@ func IRCdLoop(ctx *Context, publicSocketPath string) {
continue
}
// FIXME: where does it get closed
- go HandleConnection(ctx, conn)
+ go handleConnection(ctx, conn)
}
}
-func CommandListenerLoop(ctx *Context, commandSocketPath string) {
+func commandListenerLoop(ctx *contextT, commandSocketPath string) {
listener, err := net.Listen("unix", commandSocketPath)
g.FatalIf(err)
g.Info(
@@ -710,102 +1069,22 @@ func CommandListenerLoop(ctx *Context, commandSocketPath string) {
}
}
-func TransactorLoop(ctx *Context) {
+func transactorLoop(ctx *contextT) {
g.Info("transactor started", "component-up", "component", "transactor")
- EmitActiveConnection.Inc()
+ emitActiveConnection.Inc()
for tx := range ctx.tx {
fmt.Println(tx)
}
}
-func InitMigrations(db *sql.DB) {
- _, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS migrations (
- filename TEXT PRIMARY KEY
- );
- `)
- g.FatalIf(err)
-}
-
-const MIGRATIONS_DIR = "src/sql/migrations/"
-func PendingMigrations(db *sql.DB) []string {
- files, err := ioutil.ReadDir(MIGRATIONS_DIR)
+func initDB2(databasePath string) *sql.DB {
+ db, err := sql.Open(golite.DriverName, databasePath)
g.FatalIf(err)
-
- set := make(map[string]bool)
- for _, file := range files {
- set[file.Name()] = true
- }
-
- rows, err := db.Query(`SELECT filename FROM migrations;`)
- g.FatalIf(err)
- defer rows.Close()
-
- for rows.Next() {
- var filename string
- err := rows.Scan(&filename)
- g.FatalIf(err)
- delete(set, filename)
- }
- g.FatalIf(rows.Err())
-
- difference := make([]string, 0)
- for filename := range set {
- difference = append(difference, filename)
- }
-
- sort.Sort(sort.StringSlice(difference))
- return difference
-}
-
-func RunMigrations(db *sql.DB) {
- InitMigrations(db)
-
- stmt, err := db.Prepare(`INSERT INTO migrations (filename) VALUES (?);`)
- g.FatalIf(err)
- defer stmt.Close()
-
- for _, filename := range PendingMigrations(db) {
- g.Info("Running migration file", "exec-migration-file",
- "filename", filename,
- )
-
- tx, err := db.Begin()
- g.FatalIf(err)
-
- sql, err := os.ReadFile(MIGRATIONS_DIR + filename)
- g.FatalIf(err)
-
- _, err = tx.Exec(string(sql))
- g.FatalIf(err)
-
- _, err = tx.Stmt(stmt).Exec(filename)
- g.FatalIf(err)
-
- err = tx.Commit()
- g.FatalIf(err)
- }
-}
-
-func InitDB(databasePath string) *sql.DB {
- db, err := sql.Open("sqlite3", databasePath)
- g.FatalIf(err)
- // RunMigrations(db)
return db
}
-func Init() {
- g.Init(slog.Group(
- "versions",
- "gobang", g.Version,
- // "golite", golite.Version,
- "this", Version,
- ))
- SetEnvironmentVariables()
-}
-
-func Start(ctx *Context, publicSocketPath string, commandSocketPath string) {
+func start(ctx *contextT, publicSocketPath string, commandSocketPath string) {
/*
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
@@ -833,16 +1112,16 @@ func Start(ctx *Context, publicSocketPath string, commandSocketPath string) {
wg.Done()
}()
}
- bgRun(func() { IRCdLoop(ctx, publicSocketPath) })
- bgRun(func() { CommandListenerLoop(ctx, commandSocketPath) })
- bgRun(func() { TransactorLoop(ctx) })
+ bgRun(func() { serverLoop(ctx, publicSocketPath) })
+ bgRun(func() { commandListenerLoop(ctx, commandSocketPath) })
+ bgRun(func() { transactorLoop(ctx) })
wg.Wait()
}
-func BuildContext(databasePath string) *Context {
- db := InitDB(databasePath)
+func buildContext(databasePath string) *contextT {
+ db := initDB2(databasePath)
tx := make(chan int, 100)
- return &Context {
+ return &contextT {
db: db,
tx: tx,
}
@@ -868,8 +1147,19 @@ var (
func Main() {
- Init()
+ g.Init(slog.Group(
+ "versions",
+ "cracha", cracha.Version,
+ "q", q.Version,
+ "golite", golite.Version,
+ "guuid", guuid.Version,
+ "gobang", g.Version,
+ "papod", Version,
+ "this", Version,
+ ))
flag.Parse()
- ctx := BuildContext(*databasePath)
- Start(ctx, *publicSocketPath, *commandSocketPath)
+ ctx := buildContext(*databasePath)
+ start(ctx, *publicSocketPath, *commandSocketPath)
}
+
+// FIXME: review usage of g.Fatal()