diff options
Diffstat (limited to 'src/papod.go')
-rw-r--r-- | src/papod.go | 308 |
1 files changed, 301 insertions, 7 deletions
diff --git a/src/papod.go b/src/papod.go index 05b1d8e..220ef55 100644 --- a/src/papod.go +++ b/src/papod.go @@ -18,12 +18,308 @@ import ( "sync" "time" + "gracha" + "q" + // "golite" + "guuid" g "gobang" - "golite" ) +type tablesT struct{ + servers string + users string + channels string + channelEvents string +} + +type queriesT struct{ + addChannel func(string) error + eventsAfter func([]byte, func(eventT) error) error + addEvent func(string, string) error + close func() error +} + +type eventT struct{ + id int64 +} + +type PapoD struct{ + queries queriesT + auth gracha.IAuth + queue q.IQueue +} + +type consumerT struct{ + topic string + handlerFn func(PapoD) func(q.Message) error +} + + + +const ( + NEW_CHANNEL_EVENT = "new-channel-event" + + defaultPrefix = "papod" +) + + + +func tablesFrom(prefix string) (tablesT, error) { + err := g.ValidateSQLTablePrefix(prefix) + if err != nil { + return tablesT{}, err + } + + servers := prefix + "-servers" + users := prefix + "-users" + channels := prefix + "-channels" + channelEvents := prefix + "-channel-events" + return tablesT{ + servers: servers, + users: users, + channels: channels, + channelEvents: channelEvents, + }, nil +} + +func createTables(db *sql.DB, tables tablesT) error { + const tmpl = ` + BEGIN TRANSACTION; + CREATE TABLE IF NOT EXISTS "%s" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + uuid BLOB NOT NULL UNIQUE, + name TEXT NOT NULL, + description TEXT NOT NULL, + public BOOLEAN NOT NULL, + metadata TEXT + ); + CREATE TABLE IF NOT EXISTS "%s" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + uuid BLOB NOT NULL UNIQUE, + server_id INTEGER NOT NULL REFERENCES "%s"(id), + authuser_uuid BLOB NOT NULL UNIQUE, + human BOOLEAN NOT NULL, + visible BOOLEAN NOT NULL, + enabled BOOLEAN NOT NULL, + metadata TEXT + ); + CREATE TABLE IF NOT EXISTS "%s" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + uuid BLOB NOT NULL UNIQUE, + server_id INTEGER NOT NULL REFERENCES "%s"(id), + name TEXT, + description TEXT, + virtual BOOLEAN NOT NULL, + metadata TEXT + ); + -- FIXME: group conversations? + CREATE TABLE IF NOT EXISTS "%s" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + uuid BLOB NOT NULL UNIQUE, + channel_id INTEGER NOT NULL REFERENCES "%s"(id), + connection_id INTEGER NOT NULL + -- payload FIXME: vary by type? + ); + + + -- FIXME:indexes + COMMIT TRANSACTION; + ` + sql := fmt.Sprintf( + tmpl, + tables.servers, + g.SQLiteNow, + tables.users, + g.SQLiteNow, + tables.servers, + tables.channels, + g.SQLiteNow, + tables.servers, + tables.channelEvents, + g.SQLiteNow, + tables.channels, + ) + fmt.Println(sql) /// + + _, err := db.Exec(sql) + return err +} + +func addChannelQuery( + db *sql.DB, + tables tablesT, +) (func (string) error, func() error, error) { + const tmpl = ` + ` + sql := fmt.Sprintf(tmpl) + /// fmt.Println(sql) /// + + stmt, err := db.Prepare(sql) + if err != nil { + return nil, nil, err + } + + fn := func(name string) error { + _, err := stmt.Exec(name) + return err + } + + return fn, nil, nil +} + +func eventEach( + stmt *sql.Stmt, + uuid []byte, + callback func(eventT) error, +) error { + rows, err := stmt.Query(uuid) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var event eventT + err = rows.Scan( + &event.id, + ) + if err != nil { + return err + } + + err = callback(event) + if err != nil { + return err + } + } + + return rows.Err() +} + +func eventsAfterQuery( + db *sql.DB, + tables tablesT, +) (func ([]byte, func(eventT) error) error, func() error, error) { + const tmpl = ` + -- INSERT + ` + sql := fmt.Sprintf(tmpl) + /// fmt.Println(sql) /// + + stmt, err := db.Prepare(sql) + if err != nil { + return nil, nil, err + } + + fn := func(uuid []byte, callback func(eventT) error) error { + return eventEach(stmt, uuid, callback) + } + + return fn, stmt.Close, nil +} + +func addEventQuery( + db *sql.DB, + tables tablesT, +) (func (string, string) error, func() error, error) { + const tmpl = ` + ` + sql := fmt.Sprintf(tmpl) + /// fmt.Println(sql) /// + + stmt, err := db.Prepare(sql) + if err != nil { + return nil, nil, err + } + + fn := func(type_ string, payload string) error { + _, err := stmt.Exec(type_, payload) + return err + } + + return fn, stmt.Close, nil +} + +func initDB(db *sql.DB, tables tablesT) (queriesT, error) { + createTablesErr := createTables(db, tables) + addChannel, addChannelClose, addChannelErr := addChannelQuery(db, tables) + eventsAfter, eventsAfterClose, eventsAfterErr := eventsAfterQuery(db, tables) + addEvent, addEventClose, addEventErr := addEventQuery(db, tables) + + err := g.SomeError( + createTablesErr, + addChannelErr, + eventsAfterErr, + addEventErr, + ) + if err != nil { + return queriesT{}, err + } + + close := func() error { + return g.SomeFnError( + addChannelClose, + eventsAfterClose, + addEventClose, + ) + } + + return queriesT{ + addChannel: addChannel, + eventsAfter: eventsAfter, + addEvent: addEvent, + close: close, + }, nil +} + +func (papod PapoD) Close() error { + return papod.queries.close() +} + +var consumers = []consumerT{ +} +func registerConsumers(papod PapoD, consumers []consumerT) { + for _, consumer := range consumers { + papod.queue.Subscribe( + consumer.topic, + defaultPrefix + "-" + consumer.topic, + consumer.handlerFn(papod), + ) + } +} + +func NewWithPrefix(db *sql.DB, queue q.IQueue, prefix string) (PapoD, error) { + tables, err := tablesFrom(prefix) + if err != nil { + return PapoD{}, err + } + + queries, err := initDB(db, tables) + if err != nil { + return PapoD{}, err + } + + return PapoD{ + queries: queries, + queue: queue, + }, nil +} + +func New(db *sql.DB, queue q.IQueue) (PapoD, error) { + return NewWithPrefix(db, queue, defaultPrefix) +} + + + + + + // Global variables @@ -48,8 +344,6 @@ var EmitWriteToClientError = g.MakeCounter("write-to-client") const pingFrequency = time.Duration(30) * time.Second const pongMaxLatency = time.Duration(5) * time.Second -// type UUID string - type Channel struct { } @@ -196,7 +490,7 @@ func HandlePRIVMSG(ctx *Context, msg Message) { defer stmt.Close() ret, err := stmt.Exec( - g.NewUUID().String(), + guuid.New().String(), "FIXME", "FIXME", time.Now(), @@ -497,7 +791,7 @@ func RunMigrations(db *sql.DB) { func InitDB(databasePath string) *sql.DB { db, err := sql.Open("sqlite3", databasePath) g.FatalIf(err) - RunMigrations(db) + // RunMigrations(db) return db } @@ -505,8 +799,8 @@ func Init() { g.Init(slog.Group( "versions", "gobang", g.Version, - "golite", golite.Version, - "this", version, + // "golite", golite.Version, + "this", Version, )) SetEnvironmentVariables() } |