summaryrefslogtreecommitdiff
path: root/src/papod.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/papod.go')
-rw-r--r--src/papod.go308
1 files changed, 301 insertions, 7 deletions
diff --git a/src/papod.go b/src/papod.go
index 05b1d8e..220ef55 100644
--- a/src/papod.go
+++ b/src/papod.go
@@ -18,12 +18,308 @@ import (
"sync"
"time"
+ "gracha"
+ "q"
+ // "golite"
+ "guuid"
g "gobang"
- "golite"
)
+type tablesT struct{
+ servers string
+ users string
+ channels string
+ channelEvents string
+}
+
+type queriesT struct{
+ addChannel func(string) error
+ eventsAfter func([]byte, func(eventT) error) error
+ addEvent func(string, string) error
+ close func() error
+}
+
+type eventT struct{
+ id int64
+}
+
+type PapoD struct{
+ queries queriesT
+ auth gracha.IAuth
+ queue q.IQueue
+}
+
+type consumerT struct{
+ topic string
+ handlerFn func(PapoD) func(q.Message) error
+}
+
+
+
+const (
+ NEW_CHANNEL_EVENT = "new-channel-event"
+
+ defaultPrefix = "papod"
+)
+
+
+
+func tablesFrom(prefix string) (tablesT, error) {
+ err := g.ValidateSQLTablePrefix(prefix)
+ if err != nil {
+ return tablesT{}, err
+ }
+
+ servers := prefix + "-servers"
+ users := prefix + "-users"
+ channels := prefix + "-channels"
+ channelEvents := prefix + "-channel-events"
+ return tablesT{
+ servers: servers,
+ users: users,
+ channels: channels,
+ channelEvents: channelEvents,
+ }, nil
+}
+
+func createTables(db *sql.DB, tables tablesT) error {
+ const tmpl = `
+ BEGIN TRANSACTION;
+ CREATE TABLE IF NOT EXISTS "%s" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ uuid BLOB NOT NULL UNIQUE,
+ name TEXT NOT NULL,
+ description TEXT NOT NULL,
+ public BOOLEAN NOT NULL,
+ metadata TEXT
+ );
+ CREATE TABLE IF NOT EXISTS "%s" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ uuid BLOB NOT NULL UNIQUE,
+ server_id INTEGER NOT NULL REFERENCES "%s"(id),
+ authuser_uuid BLOB NOT NULL UNIQUE,
+ human BOOLEAN NOT NULL,
+ visible BOOLEAN NOT NULL,
+ enabled BOOLEAN NOT NULL,
+ metadata TEXT
+ );
+ CREATE TABLE IF NOT EXISTS "%s" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ uuid BLOB NOT NULL UNIQUE,
+ server_id INTEGER NOT NULL REFERENCES "%s"(id),
+ name TEXT,
+ description TEXT,
+ virtual BOOLEAN NOT NULL,
+ metadata TEXT
+ );
+ -- FIXME: group conversations?
+ CREATE TABLE IF NOT EXISTS "%s" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ uuid BLOB NOT NULL UNIQUE,
+ channel_id INTEGER NOT NULL REFERENCES "%s"(id),
+ connection_id INTEGER NOT NULL
+ -- payload FIXME: vary by type?
+ );
+
+
+ -- FIXME:indexes
+ COMMIT TRANSACTION;
+ `
+ sql := fmt.Sprintf(
+ tmpl,
+ tables.servers,
+ g.SQLiteNow,
+ tables.users,
+ g.SQLiteNow,
+ tables.servers,
+ tables.channels,
+ g.SQLiteNow,
+ tables.servers,
+ tables.channelEvents,
+ g.SQLiteNow,
+ tables.channels,
+ )
+ fmt.Println(sql) ///
+
+ _, err := db.Exec(sql)
+ return err
+}
+
+func addChannelQuery(
+ db *sql.DB,
+ tables tablesT,
+) (func (string) error, func() error, error) {
+ const tmpl = `
+ `
+ sql := fmt.Sprintf(tmpl)
+ /// fmt.Println(sql) ///
+
+ stmt, err := db.Prepare(sql)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(name string) error {
+ _, err := stmt.Exec(name)
+ return err
+ }
+
+ return fn, nil, nil
+}
+
+func eventEach(
+ stmt *sql.Stmt,
+ uuid []byte,
+ callback func(eventT) error,
+) error {
+ rows, err := stmt.Query(uuid)
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var event eventT
+ err = rows.Scan(
+ &event.id,
+ )
+ if err != nil {
+ return err
+ }
+
+ err = callback(event)
+ if err != nil {
+ return err
+ }
+ }
+
+ return rows.Err()
+}
+
+func eventsAfterQuery(
+ db *sql.DB,
+ tables tablesT,
+) (func ([]byte, func(eventT) error) error, func() error, error) {
+ const tmpl = `
+ -- INSERT
+ `
+ sql := fmt.Sprintf(tmpl)
+ /// fmt.Println(sql) ///
+
+ stmt, err := db.Prepare(sql)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(uuid []byte, callback func(eventT) error) error {
+ return eventEach(stmt, uuid, callback)
+ }
+
+ return fn, stmt.Close, nil
+}
+
+func addEventQuery(
+ db *sql.DB,
+ tables tablesT,
+) (func (string, string) error, func() error, error) {
+ const tmpl = `
+ `
+ sql := fmt.Sprintf(tmpl)
+ /// fmt.Println(sql) ///
+
+ stmt, err := db.Prepare(sql)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(type_ string, payload string) error {
+ _, err := stmt.Exec(type_, payload)
+ return err
+ }
+
+ return fn, stmt.Close, nil
+}
+
+func initDB(db *sql.DB, tables tablesT) (queriesT, error) {
+ createTablesErr := createTables(db, tables)
+ addChannel, addChannelClose, addChannelErr := addChannelQuery(db, tables)
+ eventsAfter, eventsAfterClose, eventsAfterErr := eventsAfterQuery(db, tables)
+ addEvent, addEventClose, addEventErr := addEventQuery(db, tables)
+
+ err := g.SomeError(
+ createTablesErr,
+ addChannelErr,
+ eventsAfterErr,
+ addEventErr,
+ )
+ if err != nil {
+ return queriesT{}, err
+ }
+
+ close := func() error {
+ return g.SomeFnError(
+ addChannelClose,
+ eventsAfterClose,
+ addEventClose,
+ )
+ }
+
+ return queriesT{
+ addChannel: addChannel,
+ eventsAfter: eventsAfter,
+ addEvent: addEvent,
+ close: close,
+ }, nil
+}
+
+func (papod PapoD) Close() error {
+ return papod.queries.close()
+}
+
+var consumers = []consumerT{
+}
+func registerConsumers(papod PapoD, consumers []consumerT) {
+ for _, consumer := range consumers {
+ papod.queue.Subscribe(
+ consumer.topic,
+ defaultPrefix + "-" + consumer.topic,
+ consumer.handlerFn(papod),
+ )
+ }
+}
+
+func NewWithPrefix(db *sql.DB, queue q.IQueue, prefix string) (PapoD, error) {
+ tables, err := tablesFrom(prefix)
+ if err != nil {
+ return PapoD{}, err
+ }
+
+ queries, err := initDB(db, tables)
+ if err != nil {
+ return PapoD{}, err
+ }
+
+ return PapoD{
+ queries: queries,
+ queue: queue,
+ }, nil
+}
+
+func New(db *sql.DB, queue q.IQueue) (PapoD, error) {
+ return NewWithPrefix(db, queue, defaultPrefix)
+}
+
+
+
+
+
+
// Global variables
@@ -48,8 +344,6 @@ var EmitWriteToClientError = g.MakeCounter("write-to-client")
const pingFrequency = time.Duration(30) * time.Second
const pongMaxLatency = time.Duration(5) * time.Second
-// type UUID string
-
type Channel struct {
}
@@ -196,7 +490,7 @@ func HandlePRIVMSG(ctx *Context, msg Message) {
defer stmt.Close()
ret, err := stmt.Exec(
- g.NewUUID().String(),
+ guuid.New().String(),
"FIXME",
"FIXME",
time.Now(),
@@ -497,7 +791,7 @@ func RunMigrations(db *sql.DB) {
func InitDB(databasePath string) *sql.DB {
db, err := sql.Open("sqlite3", databasePath)
g.FatalIf(err)
- RunMigrations(db)
+ // RunMigrations(db)
return db
}
@@ -505,8 +799,8 @@ func Init() {
g.Init(slog.Group(
"versions",
"gobang", g.Version,
- "golite", golite.Version,
- "this", version,
+ // "golite", golite.Version,
+ "this", Version,
))
SetEnvironmentVariables()
}