summaryrefslogtreecommitdiff
path: root/src/papod.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/papod.go')
-rw-r--r--src/papod.go1698
1 files changed, 1553 insertions, 145 deletions
diff --git a/src/papod.go b/src/papod.go
index aedfb5f..00086c3 100644
--- a/src/papod.go
+++ b/src/papod.go
@@ -3,7 +3,6 @@ package papod
import (
"bufio"
"bytes"
- "context"
"database/sql"
"errors"
"flag"
@@ -40,43 +39,119 @@ type queryT struct{
}
type queriesT struct{
- addNetwork func(guuid.UUID, newNetworkT) (networkT, error)
+ createUser func(newUserT) (userT, error)
+ userByUUID func(guuid.UUID) (userT, error)
+ updateUser func(userT) error
+ deleteUser func(guuid.UUID) error
+ addNetwork func(userT, newNetworkT) (networkT, error)
+ getNetwork func(userT, guuid.UUID) (networkT, error)
+ networks func(userT, func(networkT) error) error
+ setNetwork func(userT, networkT) error
+ nipNetwork func(userT, guuid.UUID) error
+ addMember func(userT, networkT, newMemberT) (memberT, error)
+ showMember func(userT, guuid.UUID) (memberT, error)
+ members func(userT, guuid.UUID, func(memberT) error) error
+ editMember func(userT, memberT) error
+ dropMember func(userT, guuid.UUID) error
addChannel func(guuid.UUID, newChannelT) (channelT, error)
channels func(guuid.UUID, func(channelT) error) error
+ topic func(channelT) error
+ endChannel func(guuid.UUID) error
+ join func(guuid.UUID, guuid.UUID) error
+ part func(guuid.UUID, guuid.UUID) error
+ names func(guuid.UUID, func(memberT) error) error
+ addEvent func(newEventT) (eventT, error)
allAfter func(guuid.UUID, func(eventT) error) error
- addEvent func(string, string) error
close func() error
}
+type newUserT struct{
+ uuid guuid.UUID
+ username string
+ displayName string
+}
+
+type userT struct{
+ id int64
+ timestamp time.Time
+ uuid guuid.UUID
+ username string
+ displayName string
+ pictureID *guuid.UUID
+}
+
+type NetworkType string
+const (
+ NetworkType_Public NetworkType = "public"
+ NetworkType_Private NetworkType = "private"
+ NetworkType_Unlisted NetworkType = "unlisted"
+)
+
type newNetworkT struct{
- uuid guuid.UUID
- name string
+ uuid guuid.UUID
+ name string
+ description string
+ type_ NetworkType
}
type networkT struct{
+ id int64
+ timestamp time.Time
+ uuid guuid.UUID
+ createdBy guuid.UUID
+ name string
+ description string
+ type_ NetworkType
+}
+
+type newMemberT struct{
+ userID guuid.UUID
+}
+
+type memberT struct{
id int64
timestamp time.Time
uuid guuid.UUID
- name string
- createdBy guuid.UUID
}
type newChannelT struct{
- name string
- uuid guuid.UUID
+ id int64
+ timestamp time.Time
+ uuid guuid.UUID
+ // networkID guuid.UUID FIXME
+ publicName string
+ label string
+ description string
+ virtual bool
}
type channelT struct {
- id int64
- timestamp time.Time
- uuid guuid.UUID
- name string
+ id int64
+ timestamp time.Time
+ uuid guuid.UUID
+ // networkID guuid.UUID FIXME
+ publicName string
+ label string
+ description string
+ virtual bool
+}
+
+type newEventT struct{
+ eventID guuid.UUID
+ channelID guuid.UUID
+ connectionID guuid.UUID
+ type_ string
+ payload string
}
type eventT struct{
- id int64
- timestamp time.Time
- uuid guuid.UUID
+ id int64
+ timestamp time.Time
+ uuid guuid.UUID
+ channelID guuid.UUID
+ connectionID guuid.UUID
+ type_ string
+ payload string
}
type papodT struct{
@@ -101,7 +176,7 @@ type connectionT struct {
isAuthenticated bool
}
-type userT struct {
+type userT2 struct {
connections []connectionT
}
@@ -147,8 +222,55 @@ type IPapod interface{
-func tryRollback(db *sql.DB, ctx context.Context, err error) error {
- _, rollbackErr := db.ExecContext(ctx, "ROLLBACK;")
+func serialized[A any, B any](callback func(...A) B) (func(...A) B, func()) {
+ in := make(chan []A)
+ out := make(chan B)
+
+ closed := false
+ var (
+ closeWg sync.WaitGroup
+ closeMutex sync.Mutex
+ )
+ closeWg.Add(1)
+
+ go func() {
+ for input := range in {
+ out <- callback(input...)
+ }
+ close(out)
+ closeWg.Done()
+ }()
+
+ fn := func(input ...A) B {
+ in <- input
+ return (<- out)
+ }
+
+ closeFn := func() {
+ closeMutex.Lock()
+ defer closeMutex.Unlock()
+ if closed {
+ return
+ }
+ close(in)
+ closed = true
+ closeWg.Wait()
+ }
+
+ return fn, closeFn
+}
+
+func execSerialized(query string, db *sql.DB) (func(...any) error, func()) {
+ return serialized(func(args ...any) error {
+ return inTx(db, func(tx *sql.Tx) error {
+ _, err := tx.Exec(query, args...)
+ return err
+ })
+ })
+}
+
+func tryRollback(tx *sql.Tx, err error) error {
+ rollbackErr := tx.Rollback()
if rollbackErr != nil {
return fmt.Errorf(
rollbackErrorFmt,
@@ -160,25 +282,20 @@ func tryRollback(db *sql.DB, ctx context.Context, err error) error {
return err
}
-// FIXME
-// See:
-// https://sqlite.org/forum/forumpost/2507664507
-func inTx(db *sql.DB, fn func(context.Context) error) error {
- ctx := context.Background()
-
- _, err := db.ExecContext(ctx, "BEGIN IMMEDIATE;")
+func inTx(db *sql.DB, fn func(*sql.Tx) error) error {
+ tx, err := db.Begin()
if err != nil {
return err
}
- err = fn(ctx)
+ err = fn(tx)
if err != nil {
- return tryRollback(db, ctx, err)
+ return tryRollback(tx, err)
}
- _, err = db.ExecContext(ctx, "COMMIT;")
+ err = tx.Commit()
if err != nil {
- return tryRollback(db, ctx, err)
+ return tryRollback(tx, err)
}
return nil
@@ -194,24 +311,13 @@ func inTx(db *sql.DB, fn func(context.Context) error) error {
/// opaque IDs.
func createTablesSQL(prefix string) queryT {
const tmpl_write = `
- CREATE TABLE IF NOT EXISTS "%s_workspaces" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- uuid BLOB NOT NULL UNIQUE,
- name TEXT NOT NULL,
- description TEXT NOT NULL,
- -- "public", "private", "unlisted"
- type TEXT NOT NULL
- );
- CREATE TABLE IF NOT EXISTS "%s_workspace_changes (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- workspace_id INTEGER NOT NULL
- REFERENCES "%s_workspaces"(id),
- attribute TEXT NOT NULL,
- value TEXT NOT NULL,
- op BOOLEAN NOT NULL
- );
+ -- FIXME: unconfirmed premise: statements within a trigger are
+ -- part of the transaction that caused it, and so are
+ -- atomic.
+ -- See also:
+ -- https://stackoverflow.com/questions/77441888/
+ -- https://stackoverflow.com/questions/30511116/
+
CREATE TABLE IF NOT EXISTS "%s_users" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (%s),
@@ -219,60 +325,168 @@ func createTablesSQL(prefix string) queryT {
uuid BLOB NOT NULL UNIQUE,
username TEXT NOT NULL,
display_name TEXT NOT NULL,
- picture_uuid BLOB NOT NULL UNIQUE,
- deleted BOOLEAN NOT NULL
- );
+ picture_uuid BLOB UNIQUE,
+ deleted INT NOT NULL
+ ) STRICT;
CREATE TABLE IF NOT EXISTS "%s_user_changes" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (%s),
user_id INTEGER NOT NULL REFERENCES "%s_users"(id),
- attribute TEXT NOT NULL,
+ attribute TEXT NOT NULL CHECK(
+ attribute IN (
+ 'username',
+ 'display_name',
+ 'picture_uuid',
+ 'deleted'
+ )
+ ),
value TEXT NOT NULL,
- op BOOLEAN NOT NULL
- );
+ op INT NOT NULL CHECK(op IN (0, 1))
+ ) STRICT;
+ CREATE TRIGGER IF NOT EXISTS "%s_user_creation"
+ AFTER INSERT ON "%s_users"
+ BEGIN
+ INSERT INTO "%s_user_changes" (
+ user_id, attribute, value, op
+ ) VALUES
+ (NEW.id, 'username', NEW.username, true),
+ (NEW.id, 'display_name', NEW.display_name, true),
+ (NEW.id, 'deleted', NEW.deleted, true)
+ ;
+ END;
+ CREATE TRIGGER IF NOT EXISTS "%s_user_creation_picture_uuid"
+ AFTER INSERT ON "%s_users"
+ WHEN NEW.picture_uuid != NULL
+ BEGIN
+ INSERT INTO "%s_user_changes" (
+ user_id, attribute, value, op
+ ) VALUES
+ (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
+ ;
+ END;
+ CREATE TRIGGER IF NOT EXISTS "%s_user_update_username"
+ AFTER UPDATE ON "%s_users"
+ WHEN OLD.username != NEW.username
+ BEGIN
+ INSERT INTO "%s_user_changes" (
+ user_id, attribute, value, op
+ ) VALUES
+ (NEW.id, 'username', OLD.username, false),
+ (NEW.id, 'username', NEW.username, true)
+ ;
+ END;
+ CREATE TRIGGER IF NOT EXISTS "%s_user_update_display_name"
+ AFTER UPDATE ON "%s_users"
+ WHEN OLD.display_name != NEW.display_name
+ BEGIN
+ INSERT INTO "%s_user_changes" (
+ user_id, attribute, value, op
+ ) VALUES
+ (NEW.id, 'display_name', OLD.display_name, false),
+ (NEW.id, 'display_name', NEW.display_name, true)
+ ;
+ END;
+ CREATE TRIGGER IF NOT EXISTS "%s_user_update_picture_uuid"
+ AFTER UPDATE ON "%s_users"
+ WHEN OLD.picture_uuid != NEW.picture_uuid
+ BEGIN
+ INSERT INTO "%s_user_changes" (
+ user_id, attribute, value, op
+ ) VALUES
+ (NEW.id, 'picture_uuid', OLD.picture_uuid, false),
+ (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
+ ;
+ END;
+ CREATE TRIGGER IF NOT EXISTS "%s_user_update_deleted"
+ AFTER UPDATE ON "%s_users"
+ WHEN OLD.deleted != NEW.deleted
+ BEGIN
+ INSERT INTO "%s_user_changes" (
+ user_id, attribute, value, op
+ ) VALUES
+ (NEW.id, 'deleted', OLD.deleted, false),
+ (NEW.id, 'deleted', NEW.deleted, true)
+ ;
+ END;
+
+ CREATE TABLE IF NOT EXISTS "%s_networks" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ uuid BLOB NOT NULL UNIQUE,
+ creator_id INTEGER NOT NULL REFERENCES "%s_users"(id),
+ name TEXT NOT NULL,
+ description TEXT NOT NULL,
+ type TEXT NOT NULL CHECK(
+ type IN ('public', 'private', 'unlisted')
+ )
+ ) STRICT;
+ CREATE TABLE IF NOT EXISTS "%s_network_changes" (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL DEFAULT (%s),
+ network_id INTEGER NOT NULL
+ REFERENCES "%s_networks"(id),
+ attribute TEXT NOT NULL CHECK(
+ attribute IN (
+ 'name',
+ 'description',
+ 'type'
+ )
+ ),
+ value TEXT NOT NULL,
+ op INT NOT NULL CHECK(op IN (0, 1))
+ ) STRICT;
+
CREATE TABLE IF NOT EXISTS "%s_members" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (%s),
- workspace_id INTEGER NOT NULL
- REFERENCES "%s_workspaces"(id),
+ network_id INTEGER NOT NULL
+ REFERENCES "%s_networks"(id),
user_id INTEGER NOT NULL,
username TEXT NOT NULL,
display_name TEXT NOT NULL,
- picture_uuid BLOB NOT NULL UNIQUE,
- -- "active", "inactive", "removed"
- status TEXT NOT NULL,
- -- "active", always
- active_uniq TEXT,
- UNIQUE (workspace_id, username, active_uniq),
- UNIQUE (workspace_id, user_id)
- );
+ picture_uuid BLOB UNIQUE,
+ status TEXT NOT NULL CHECK(
+ status IN ('active', 'inactive', 'removed')
+ ),
+ active_uniq TEXT CHECK(
+ active_uniq IN ('active', NULL)
+ ),
+ UNIQUE (network_id, username, active_uniq),
+ UNIQUE (network_id, user_id)
+ ) STRICT;
+
CREATE TABLE IF NOT EXISTS "%s_member_roles" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
member_id INTEGER NOT NULL
REFERENCES "%s_members"(id),
role TEXT NOT NULL,
UNIQUE (member_id, role)
- );
+ ) STRICT;
+
+ -- FIXME: use a trigger
CREATE TABLE IF NOT EXISTS "%s_member_changes" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (%s),
- member_id INTEGER NOT NULL
+ member_id INTEGER NOT NULL
REFERENCES "%s_members"(id),
attribute TEXT NOT NULL,
value TEXT NOT NULL,
- op BOOLEAN NOT NULL
- );
+ op INT NOT NULL CHECK(op IN (0, 1))
+ ) STRICT;
+
CREATE TABLE IF NOT EXISTS "%s_channels" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (%s),
uuid BLOB NOT NULL UNIQUE,
- workspace_id INTEGER NOT NULL
- REFERENCES "%s_workspaces"(id),
+ network_id INTEGER -- FIXME NOT NULL
+ REFERENCES "%s_networks"(id),
public_name TEXT UNIQUE,
label TEXT NOT NULL,
- description TEXT,
- virtual BOOLEAN NOT NULL
- );
+ description TEXT NOT NULL,
+ virtual INT NOT NULL CHECK(virtual IN (0, 1))
+ ) STRICT;
+
+ -- FIXME: use a trigger
CREATE TABLE IF NOT EXISTS "%s_channel_changes" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (%s),
@@ -280,23 +494,40 @@ func createTablesSQL(prefix string) queryT {
REFERENCES "%s_channels"(id),
attribute TEXT NOT NULL,
value TEXT NOT NULL,
- op BOOLEAN NOT NULL
- );
+ op INT NOT NULL CHECK(op IN (0, 1))
+ ) STRICT;
+
CREATE TABLE IF NOT EXISTS "%s_participants" (
- member_id
- );
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ channel_id INTEGER NOT NULL
+ REFERENCES "%s_channels"(id),
+ member_id INTEGER NOT NULL
+ REFERENCES "%s_members"(id),
+ UNIQUE (channel_id, member_id)
+ ) STRICT;
+
+ -- FIXME: create table for connections?
+ -- A user can have multiple sessions (different browsers,
+ -- mobile, etc.), and each session has multiple connections, as
+ -- the user connects and disconnections using the same session
+ -- id, all while it is valid.
+ -- FIXME: can a connection have multiple sessions? A long-lived
+ -- connection that spans multiple sessions would fit into this.
CREATE TABLE IF NOT EXISTS "%s_channel_events" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (%s),
uuid BLOB NOT NULL UNIQUE,
- channel_id INTEGER NOT NULL REFERENCES "%s"(id),
- connection_id INTEGER NOT NULL,
- -- payload FIXME: vary by type?
- );
- -- FIXME: group conversations?
- -- user: person
- -- member: workspace user
- -- participant: channel member
+ channel_id INTEGER NOT NULL
+ REFERENCES "%s_channels"(id),
+ connection_uuid BLOB NOT NULL, -- FIXME: join
+ type TEXT NOT NULL CHECK(
+ type IN (
+ 'user-join',
+ 'user-message'
+ )
+ ),
+ payload TEXT NOT NULL
+ ) STRICT;
`
return queryT{
write: fmt.Sprintf(
@@ -307,11 +538,49 @@ func createTablesSQL(prefix string) queryT {
g.SQLiteNow,
prefix,
prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
g.SQLiteNow,
prefix,
prefix,
g.SQLiteNow,
prefix,
+ prefix,
+ g.SQLiteNow,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ g.SQLiteNow,
+ prefix,
),
}
}
@@ -319,28 +588,246 @@ func createTablesSQL(prefix string) queryT {
func createTables(db *sql.DB, prefix string) error {
q := createTablesSQL(prefix)
- return inTx(db, func(ctx context.Context) error {
- _, err := db.ExecContext(ctx, q.write)
+ return inTx(db, func(tx *sql.Tx) error {
+ _, err := tx.Exec(q.write)
return err
})
}
-// addServer
-// addWorkspace
-// addNetwork FIXME
-func addNetworkSQL(prefix string) queryT {
+func createUserSQL(prefix string) queryT {
const tmpl_write = `
- -- FIXME
+ INSERT INTO "%s_users" (
+ uuid, username, display_name, picture_uuid, deleted
+ ) VALUES (
+ ?, ?, ?, NULL, false
+ ) RETURNING id, timestamp;
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func createUserStmt(
+ db *sql.DB,
+ prefix string,
+) (func(newUserT) (userT, error), func() error, error) {
+ q := createUserSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(newUser newUserT) (userT, error) {
+ user := userT{
+ uuid: newUser.uuid,
+ username: newUser.username,
+ displayName: newUser.displayName,
+ }
+
+ var timestr string
+ err := writeStmt.QueryRow(
+ newUser.uuid[:],
+ newUser.username,
+ newUser.displayName,
+ ).Scan(&user.id, &timestr)
+ if err != nil {
+ return userT{}, err
+ }
+
+ user.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return userT{}, err
+ }
+
+ return user, nil
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func userByUUIDSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT
+ id,
+ timestamp,
+ username,
+ display_name,
+ picture_uuid
+ FROM "%s_users"
+ WHERE
+ uuid = ? AND
+ deleted = false;
+ `
+ return queryT{
+ read: fmt.Sprintf(tmpl_read, prefix),
+ }
+}
+
+func userByUUIDStmt(
+ db *sql.DB,
+ prefix string,
+) (func(guuid.UUID) (userT, error), func() error, error) {
+ q := userByUUIDSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(userID guuid.UUID) (userT, error) {
+ user := userT{
+ uuid: userID,
+ }
+
+ var (
+ timestr string
+ picture_id_bytes []byte
+ )
+ err := readStmt.QueryRow(userID[:]).Scan(
+ &user.id,
+ &timestr,
+ &user.username,
+ &user.displayName,
+ &picture_id_bytes,
+ )
+ if err != nil {
+ return userT{}, err
+ }
+ if picture_id_bytes != nil {
+ pictureID := guuid.UUID(picture_id_bytes)
+ user.pictureID = &pictureID
+ }
+
+ user.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return userT{}, err
+ }
+
+ return user, err
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func updateUserSQL(prefix string) queryT {
+ const tmpl_write = `
+ UPDATE "%s_users"
+ SET
+ username = ?,
+ display_name = ?,
+ picture_uuid = ?
+ WHERE
+ id = ? AND
+ deleted = false
+ RETURNING id;
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func updateUserStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT) error, func() error, error) {
+ q := updateUserSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT) error {
+ var picture_id_bytes []byte = nil
+ if user.pictureID != nil {
+ picture_id_bytes = user.pictureID[:]
+ }
+ var _id int64
+ return writeStmt.QueryRow(
+ user.username,
+ user.displayName,
+ picture_id_bytes,
+ user.id,
+ ).Scan(&_id)
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func deleteUserSQL(prefix string) queryT {
+ const tmpl_write = `
+ UPDATE "%s_users"
+ SET deleted = true
+ WHERE
+ uuid = ? AND
+ deleted = false
+ RETURNING id;
`
return queryT{
write: fmt.Sprintf(tmpl_write, prefix),
}
}
+func deleteUserStmt(
+ db *sql.DB,
+ prefix string,
+) (func(guuid.UUID) error, func() error, error) {
+ q := deleteUserSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(userID guuid.UUID) error {
+ var _id int64
+ return writeStmt.QueryRow(userID[:]).Scan(&_id)
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func addNetworkSQL(prefix string) queryT {
+ const tmpl_write = `
+ INSERT INTO "%s_networks" (
+ uuid, name, description, type, creator_id
+ )
+ VALUES (
+ ?,
+ ?,
+ ?,
+ ?,
+ (
+ SELECT id FROM "%s_users"
+ WHERE id = ? AND deleted = false
+ )
+ ) RETURNING id, timestamp;
+
+ INSERT INTO "%s_members" (
+ network_id, user_id, username, display_name,
+ picture_uuid, status, active_uniq
+ ) VALUES (
+ last_insert_rowid(),
+ ?,
+ (
+ SELECT username, display_name, picture_uuid
+ FROM "%s_users"
+ WHERE id = ? AND deleted = false
+ ),
+ 'active',
+ 'active'
+ ) RETURNING id, timestamp;
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix, prefix),
+ }
+}
+
func addNetworkStmt(
db *sql.DB,
prefix string,
-) (func(guuid.UUID, newNetworkT) (networkT, error), func() error, error) {
+) (func(userT, newNetworkT) (networkT, error), func() error, error) {
q := addNetworkSQL(prefix)
writeStmt, err := db.Prepare(q.write)
@@ -349,25 +836,172 @@ func addNetworkStmt(
}
fn := func(
- userID guuid.UUID,
+ user userT,
newNetwork newNetworkT,
) (networkT, error) {
network := networkT{
- uuid: newNetwork.uuid,
- name: newNetwork.name,
+ uuid: newNetwork.uuid,
+ createdBy: user.uuid,
+ name: newNetwork.name,
+ description: newNetwork.description,
+ type_: newNetwork.type_,
+ }
+
+ member := memberT{
}
var timestr string
- network_id_bytes := newNetwork.uuid[:]
- user_id_bytes := userID[:]
- err := writeStmt.QueryRow(
- network_id_bytes,
- newNetwork.name,
- user_id_bytes,
- ).Scan(&network.id, &timestr)
+ {
+ rows, err := writeStmt.Query(
+ newNetwork.uuid[:],
+ newNetwork.name,
+ newNetwork.description,
+ newNetwork.type_,
+ user.id,
+ )
+ if err != nil {
+ return networkT{}, err
+ }
+ defer rows.Close()
+
+ {
+ if !rows.Next() {
+ return networkT{}, sql.ErrNoRows
+ }
+
+ err := rows.Scan(&network.id, &timestr)
+ if err != nil {
+ return networkT{}, err
+ }
+
+ network.timestamp, err = time.Parse(
+ time.RFC3339Nano,
+ timestr,
+ )
+ if err != nil {
+ return networkT{}, err
+ }
+ }
+
+ {
+ if !rows.Next() {
+ return networkT{}, sql.ErrNoRows
+ }
+
+ err := rows.Scan(&member.id, &timestr)
+ if err != nil {
+ return networkT{}, err
+ }
+
+ member.timestamp, err = time.Parse(
+ time.RFC3339Nano,
+ timestr,
+ )
+ if err != nil {
+ return networkT{}, err
+ }
+ }
+
+ {
+ if rows.Next() {
+ return networkT{}, errors.New("FIXME")
+ }
+ err := rows.Err()
+
+ if err != nil {
+ return networkT{}, err
+ }
+ }
+ }
+
+ return network, nil
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func getNetworkSQL(prefix string) queryT {
+ const tmpl_read = `
+ SELECT
+ "%s_networks".id,
+ "%s_networks".timestamp,
+ "%s_users".uuid,
+ "%s_networks".name,
+ "%s_networks".description,
+ "%s_networks".type
+ FROM "%s_networks"
+ JOIN "%s_users" ON
+ "%s_users".id = "%s_networks".creator_id
+ WHERE
+ "%s_networks".uuid = $networkUUID AND
+ $userID IN (
+ SELECT id FROM "%s_users"
+ WHERE id = $userID AND deleted = false
+ ) AND
+ (
+ "%s_networks".type IN ('public', 'unlisted') OR
+ $userID IN (
+ SELECT user_id FROM "%s_members"
+ WHERE
+ user_id = $userID AND
+ network_id = "%s_networks".id
+ )
+ );
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+func getNetworkStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT, guuid.UUID) (networkT, error), func() error, error) {
+ q := getNetworkSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT, networkID guuid.UUID) (networkT, error) {
+ network := networkT{
+ uuid: networkID,
+ }
+
+ var (
+ timestr string
+ creator_id_bytes []byte
+ )
+ err := readStmt.QueryRow(networkID[:], user.id).Scan(
+ &network.id,
+ &timestr,
+ &creator_id_bytes,
+ &network.name,
+ &network.description,
+ &network.type_,
+ )
if err != nil {
return networkT{}, err
}
+ network.createdBy = guuid.UUID(creator_id_bytes)
network.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
if err != nil {
@@ -377,14 +1011,341 @@ func addNetworkStmt(
return network, nil
}
+ return fn, readStmt.Close, nil
+}
+
+func networkEach(rows *sql.Rows, callback func(networkT) error) error {
+ if rows == nil {
+ return nil
+ }
+
+ for rows.Next() {
+ var (
+ network networkT
+ timestr string
+ network_id_bytes []byte
+ )
+ err := rows.Scan(
+ &network.id,
+ &timestr,
+ &network_id_bytes,
+ )
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+ network.uuid = guuid.UUID(network_id_bytes)
+
+ network.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+
+ err = callback(network)
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+ }
+
+ return g.WrapErrors(rows.Err(), rows.Close())
+}
+
+func networksSQL(prefix string) queryT {
+ const tmpl_read = `
+ -- FIXME
+ `
+ return queryT{
+ read: fmt.Sprintf(tmpl_read, prefix),
+ }
+}
+
+func networksStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT) (*sql.Rows, error), func() error, error) {
+ q := networksSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT) (*sql.Rows, error) {
+ return readStmt.Query(user.uuid[:])
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func setNetworkSQL(prefix string) queryT {
+ const tmpl_write = `
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func setNetworkStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT, networkT) error, func() error, error) {
+ q := setNetworkSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT, network networkT) error {
+ _, err := writeStmt.Exec(network)
+ return err
+ }
+
return fn, writeStmt.Close, nil
}
-func addChannelSQL(prefix string) queryT {
+func nipNetworkSQL(prefix string) queryT {
+ const tmpl_write = `
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func nipNetworkStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT, guuid.UUID) error, func() error, error) {
+ q := nipNetworkSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT, networkID guuid.UUID) error {
+ _, err := writeStmt.Exec(networkID[:])
+ return err
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+
+func addMemberSQL(prefix string) queryT {
const tmpl_write = `
-- FIXME
`
return queryT{
+ write: fmt.Sprintf(tmpl_write),
+ }
+}
+
+func addMemberStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT, networkT, newMemberT) (memberT, error), func() error, error) {
+ q := addMemberSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(
+ user userT,
+ network networkT,
+ newMember newMemberT,
+ ) (memberT, error) {
+ member := memberT{
+ }
+
+ var timestr string
+ err := writeStmt.QueryRow(network.uuid[:], newMember).Scan(
+ &member.id,
+ &timestr,
+ )
+ if err != nil {
+ return memberT{}, err
+ }
+
+ member.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return memberT{}, err
+ }
+
+ return member, nil
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func showMemberSQL(prefix string) queryT {
+ const tmpl_read = `
+ `
+ return queryT{
+ read: fmt.Sprintf(tmpl_read, prefix),
+ }
+}
+
+func showMemberStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT, guuid.UUID) (memberT, error), func() error, error) {
+ q := showMemberSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT, memberID guuid.UUID) (memberT, error) {
+ member := memberT{
+ uuid: memberID,
+ }
+
+ var timestr string
+ err := readStmt.QueryRow(memberID[:]).Scan(
+ &member.id,
+ &timestr,
+ )
+ if err != nil {
+ return memberT{}, err
+ }
+
+ member.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return memberT{}, err
+ }
+
+ return member, err
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func memberEach(rows *sql.Rows, callback func(memberT) error) error {
+ if rows == nil {
+ return nil
+ }
+
+ for rows.Next() {
+ var (
+ member memberT
+ timestr string
+ member_id_bytes []byte
+ )
+ err := rows.Scan(
+ &member.id,
+ &timestr,
+ &member_id_bytes,
+ )
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+ // member.uuid = guuid.UUID(member_id_bytes) FIXME
+
+ member.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+
+ err = callback(member)
+ if err != nil {
+ return g.WrapErrors(rows.Close(), err)
+ }
+ }
+
+ return g.WrapErrors(rows.Err(), rows.Close())
+}
+
+func membersSQL(prefix string) queryT {
+ const tmpl_read = `
+ -- FIXME
+ `
+ return queryT{
+ read: fmt.Sprintf(tmpl_read, prefix),
+ }
+}
+
+func membersStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT, guuid.UUID) (*sql.Rows, error), func() error, error) {
+ q := membersSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT, networkID guuid.UUID) (*sql.Rows, error) {
+ return readStmt.Query(networkID[:])
+ }
+
+ return fn, readStmt.Close, nil
+}
+
+func editMemberSQL(prefix string) queryT {
+ const tmpl_write = `
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func editMemberStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT, memberT) error, func() error, error) {
+ q := editMemberSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT, member memberT) error {
+ _, err := writeStmt.Exec(member)
+ return err
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func dropMemberSQL(prefix string) queryT {
+ const tmpl_write = `
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write),
+ }
+}
+
+func dropMemberStmt(
+ db *sql.DB,
+ prefix string,
+) (func(userT, guuid.UUID) error, func() error, error) {
+ q := dropMemberSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT, memberID guuid.UUID) error {
+ _, err := writeStmt.Exec(memberID[:])
+ return err
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func addChannelSQL(prefix string) queryT {
+ const tmpl_write = `
+ INSERT INTO "%s_channels" (
+ uuid, public_name, label, description, virtual
+ ) VALUES (?, ?, ?, ?, ?) RETURNING id, timestamp;
+ `
+ return queryT{
write: fmt.Sprintf(tmpl_write, prefix),
}
}
@@ -405,14 +1366,21 @@ func addChannelStmt(
newChannel newChannelT,
) (channelT, error) {
channel := channelT{
- name: newChannel.name,
+ uuid: newChannel.uuid,
+ // networkID[:],
+ publicName: newChannel.publicName,
+ label: newChannel.label,
+ description: newChannel.description,
+ virtual: newChannel.virtual,
}
var timestr string
- network_id_bytes := networkID[:]
err := writeStmt.QueryRow(
- network_id_bytes,
- newChannel.name,
+ newChannel.uuid[:],
+ newChannel.publicName,
+ newChannel.label,
+ newChannel.description,
+ newChannel.virtual,
).Scan(&channel.id, &timestr)
if err != nil {
return channelT{}, err
@@ -426,7 +1394,7 @@ func addChannelStmt(
return channel, nil
}
- return fn, nil, nil
+ return fn, writeStmt.Close, nil
}
func channelEach(rows *sql.Rows, callback func(channelT) error) error {
@@ -446,17 +1414,18 @@ func channelEach(rows *sql.Rows, callback func(channelT) error) error {
&channel_id_bytes,
)
if err != nil {
- g.WrapErrors(rows.Close(), err)
+ return g.WrapErrors(rows.Close(), err)
}
+ channel.uuid = guuid.UUID(channel_id_bytes)
channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
if err != nil {
- g.WrapErrors(rows.Close(), err)
+ return g.WrapErrors(rows.Close(), err)
}
err = callback(channel)
if err != nil {
- g.WrapErrors(rows.Close(), err)
+ return g.WrapErrors(rows.Close(), err)
}
}
@@ -483,39 +1452,150 @@ func channelsStmt(
return nil, nil, err
}
- fn := func(workspaceID guuid.UUID) (*sql.Rows, error) {
- return readStmt.Query(workspaceID)
+ fn := func(networkID guuid.UUID) (*sql.Rows, error) {
+ return readStmt.Query(networkID[:])
}
return fn, readStmt.Close, nil
}
-func eventEach(rows *sql.Rows, callback func(eventT) error) error {
+func topicSQL(prefix string) queryT {
+ const tmpl_write = `
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func topicStmt(
+ db *sql.DB,
+ prefix string,
+) (func(channelT) error, func() error, error) {
+ q := topicSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(channel channelT) error {
+ _, err := writeStmt.Exec(channel)
+ return err
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func endChannelSQL(prefix string) queryT {
+ const tmpl_write = `
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func endChannelStmt(
+ db *sql.DB,
+ prefix string,
+) (func(guuid.UUID) error, func() error, error) {
+ q := endChannelSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(channelID guuid.UUID) error {
+ _, err := writeStmt.Exec(channelID[:])
+ return err
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func joinSQL(prefix string) queryT {
+ const tmpl_write = `
+ -- FIXME
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func joinStmt(
+ db *sql.DB,
+ prefix string,
+) (func(guuid.UUID, guuid.UUID) error, func() error, error) {
+ q := joinSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(memberID guuid.UUID, channelID guuid.UUID) error {
+ _, err := writeStmt.Exec(memberID[:], channelID[:])
+ return err
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func partSQL(prefix string) queryT {
+ const tmpl_write = `
+ -- FIXME
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func partStmt(
+ db *sql.DB,
+ prefix string,
+) (func(guuid.UUID, guuid.UUID) error, func() error, error) {
+ q := partSQL(prefix)
+
+ writeStmt, err := db.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(memberID guuid.UUID, channelID guuid.UUID) error {
+ _, err := writeStmt.Exec(memberID[:], channelID[:])
+ return err
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
+func nameEach(rows *sql.Rows, callback func(memberT) error) error {
if rows == nil {
return nil
}
for rows.Next() {
var (
- event eventT
- timestr string
- event_id_bytes []byte
+ member memberT
+ timestr string
+ member_id_bytes []byte
)
err := rows.Scan(
- &event.id,
+ &member.id,
&timestr,
- &event_id_bytes,
+ &member_id_bytes,
)
if err != nil {
return g.WrapErrors(rows.Close(), err)
}
+ member.uuid = guuid.UUID(member_id_bytes)
- event.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ member.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
if err != nil {
return g.WrapErrors(rows.Close(), err)
}
- err = callback(event)
+ err = callback(member)
if err != nil {
return g.WrapErrors(rows.Close(), err)
}
@@ -524,7 +1604,7 @@ func eventEach(rows *sql.Rows, callback func(eventT) error) error {
return g.WrapErrors(rows.Err(), rows.Close())
}
-func allAfterSQL(prefix string) queryT {
+func namesSQL(prefix string) queryT {
const tmpl_read = `
-- FIXME
`
@@ -533,19 +1613,19 @@ func allAfterSQL(prefix string) queryT {
}
}
-func allAfterStmt(
+func namesStmt(
db *sql.DB,
prefix string,
-) (func (guuid.UUID) (*sql.Rows, error), func() error, error) {
- q := allAfterSQL(prefix)
+) (func(guuid.UUID) (*sql.Rows, error), func() error, error) {
+ q := namesSQL(prefix)
readStmt, err := db.Prepare(q.read)
if err != nil {
return nil, nil, err
}
- fn := func(eventID guuid.UUID) (*sql.Rows, error) {
- return readStmt.Query(eventID)
+ fn := func(channelID guuid.UUID) (*sql.Rows, error) {
+ return readStmt.Query(channelID[:])
}
return fn, readStmt.Close, nil
@@ -553,17 +1633,25 @@ func allAfterStmt(
func addEventSQL(prefix string) queryT {
const tmpl_write = `
- -- FIXME
+ INSERT INTO "%s_channel_events" (
+ uuid, channel_id, connection_uuid, type, payload
+ ) VALUES (
+ ?,
+ (SELECT id FROM "%s_channels" WHERE uuid = ?),
+ ?,
+ ?,
+ ?
+ ) RETURNING id, timestamp;
`
return queryT{
- write: fmt.Sprintf(tmpl_write, prefix),
+ write: fmt.Sprintf(tmpl_write, prefix, prefix),
}
}
func addEventStmt(
db *sql.DB,
prefix string,
-) (func (string, string) error, func() error, error) {
+) (func (newEventT) (eventT, error), func() error, error) {
q := addEventSQL(prefix)
writeStmt, err := db.Prepare(q.write)
@@ -571,34 +1659,201 @@ func addEventStmt(
return nil, nil, err
}
- fn := func(type_ string, payload string) error {
- _, err := writeStmt.Exec(type_, payload)
- return err
+ fn := func(newEvent newEventT) (eventT, error) {
+ event := eventT{
+ uuid: newEvent.eventID,
+ channelID: newEvent.channelID,
+ connectionID: newEvent.connectionID,
+ type_: newEvent.type_,
+ payload: newEvent.payload,
+ }
+
+ var timestr string
+ err := writeStmt.QueryRow(
+ newEvent.eventID[:],
+ newEvent.channelID[:],
+ newEvent.connectionID[:],
+ newEvent.type_,
+ newEvent.payload,
+ ).Scan(&event.id, &timestr)
+ if err != nil {
+ return eventT{}, err
+ }
+
+ event.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ return eventT{}, err
+ }
+
+ return event, nil
}
return fn, writeStmt.Close, nil
}
+func eventEach(rows *sql.Rows, callback func(eventT) error) error {
+ if rows == nil {
+ return nil
+ }
+
+ for rows.Next() {
+ var (
+ event eventT
+ timestr string
+ event_id_bytes []byte
+ channel_id_bytes []byte
+ connection_id_bytes []byte
+ )
+ err := rows.Scan(
+ &event.id,
+ &timestr,
+ &event_id_bytes,
+ &channel_id_bytes,
+ &connection_id_bytes,
+ &event.type_,
+ &event.payload,
+ )
+ if err != nil {
+ rows.Close()
+ return err
+ }
+ event.uuid = guuid.UUID(event_id_bytes)
+ event.channelID = guuid.UUID(channel_id_bytes)
+ event.connectionID = guuid.UUID(connection_id_bytes)
+
+ event.timestamp, err = time.Parse(time.RFC3339Nano, timestr)
+ if err != nil {
+ rows.Close()
+ return err
+ }
+
+ err = callback(event)
+ if err != nil {
+ rows.Close()
+ return err
+ }
+ }
+
+ return g.WrapErrors(rows.Err(), rows.Close())
+}
+
+func allAfterSQL(prefix string) queryT {
+ const tmpl_read = `
+ WITH landmark_event AS (
+ SELECT id, channel_id
+ FROM "%s_channel_events"
+ WHERE uuid = ?
+ )
+ SELECT
+ "%s_channel_events".id,
+ "%s_channel_events".timestamp,
+ "%s_channel_events".uuid,
+ "%s_channels".uuid,
+ "%s_channel_events".connection_uuid,
+ "%s_channel_events".type,
+ "%s_channel_events".payload
+ FROM "%s_channel_events"
+ JOIN "%s_channels" ON
+ "%s_channel_events".channel_id = "%s_channels".id
+ WHERE
+ "%s_channel_events".id > (
+ SELECT id FROM landmark_event
+ ) AND channel_id = (
+ SELECT channel_id FROM landmark_event
+ );
+ `
+ return queryT{
+ read: fmt.Sprintf(
+ tmpl_read,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ prefix,
+ ),
+ }
+}
+
+func allAfterStmt(
+ db *sql.DB,
+ prefix string,
+) (func (guuid.UUID) (*sql.Rows, error), func() error, error) {
+ q := allAfterSQL(prefix)
+
+ readStmt, err := db.Prepare(q.read)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(eventID guuid.UUID) (*sql.Rows, error) {
+ return readStmt.Query(eventID[:])
+ }
+
+ return fn, readStmt.Close, nil
+}
+
func initDB(
db *sql.DB,
prefix string,
) (queriesT, error) {
createTablesErr := createTables(db, prefix)
-// addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceStmt(db, prefix)
-// addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceQ(db, p)
+ createUser, createUserClose, createUserErr := createUserStmt(db, prefix)
+ userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(db, prefix)
+ updateUser, updateUserClose, updateUserErr := updateUserStmt(db, prefix)
+ deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(db, prefix)
addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix)
+ getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(db, prefix)
+ networks, networksClose, networksErr := networksStmt(db, prefix)
+ setNetwork, setNetworkClose, setNetworkErr := setNetworkStmt(db, prefix)
+ nipNetwork, nipNetworkClose, nipNetworkErr := nipNetworkStmt(db, prefix)
+ addMember, addMemberClose, addMemberErr := addMemberStmt(db, prefix)
+ showMember, showMemberClose, showMemberErr := showMemberStmt(db, prefix)
+ members, membersClose, membersErr := membersStmt(db, prefix)
+ editMember, editMemberClose, editMemberErr := editMemberStmt(db, prefix)
+ dropMember, dropMemberClose, dropMemberErr := dropMemberStmt(db, prefix)
addChannel, addChannelClose, addChannelErr := addChannelStmt(db, prefix)
channels, channelsClose, channelsErr := channelsStmt(db, prefix)
- allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix)
+ topic, topicClose, topicErr := topicStmt(db, prefix)
+ endChannel, endChannelClose, endChannelErr := endChannelStmt(db, prefix)
+ join, joinClose, joinErr := joinStmt(db, prefix)
+ part, partClose, partErr := partStmt(db, prefix)
+ names, namesClose, namesErr := namesStmt(db, prefix)
addEvent, addEventClose, addEventErr := addEventStmt(db, prefix)
+ allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix)
err := g.SomeError(
createTablesErr,
+ createUserErr,
+ userByUUIDErr,
+ updateUserErr,
+ deleteUserErr,
addNetworkErr,
+ getNetworkErr,
+ networksErr,
+ setNetworkErr,
+ nipNetworkErr,
+ addMemberErr,
+ showMemberErr,
+ membersErr,
+ editMemberErr,
+ dropMemberErr,
addChannelErr,
channelsErr,
- allAfterErr,
+ topicErr,
+ endChannelErr,
+ joinErr,
+ partErr,
+ namesErr,
addEventErr,
+ allAfterErr,
)
if err != nil {
return queriesT{}, err
@@ -606,21 +1861,137 @@ func initDB(
close := func() error {
return g.SomeFnError(
+ createUserClose,
+ userByUUIDClose,
+ updateUserClose,
+ deleteUserClose,
addNetworkClose,
+ getNetworkClose,
+ networksClose,
+ setNetworkClose,
+ nipNetworkClose,
+ addMemberClose,
+ showMemberClose,
+ membersClose,
+ editMemberClose,
+ dropMemberClose,
addChannelClose,
channelsClose,
- allAfterClose,
+ topicClose,
+ endChannelClose,
+ joinClose,
+ partClose,
+ namesClose,
addEventClose,
+ allAfterClose,
)
}
var connMutex sync.RWMutex
return queriesT{
- addNetwork: func(a guuid.UUID, b newNetworkT) (networkT, error) {
+ createUser: func(a newUserT) (userT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return createUser(a)
+ },
+ userByUUID: func(a guuid.UUID) (userT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return userByUUID(a)
+ },
+ updateUser: func(a userT) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return updateUser(a)
+ },
+ deleteUser: func(a guuid.UUID) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return deleteUser(a)
+ },
+ addNetwork: func(a userT, b newNetworkT) (networkT, error) {
connMutex.RLock()
defer connMutex.RUnlock()
return addNetwork(a, b)
},
+ getNetwork: func(a userT, b guuid.UUID) (networkT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return getNetwork(a, b)
+ },
+ networks: func(
+ a userT,
+ callback func(networkT) error,
+ ) error {
+ var (
+ err error
+ rows *sql.Rows
+ )
+ {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ rows, err = networks(a)
+ }
+ if err != nil {
+ return err
+ }
+
+ return networkEach(rows, callback)
+ },
+ setNetwork: func(a userT, b networkT) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return setNetwork(a, b)
+ },
+ nipNetwork: func(a userT, b guuid.UUID) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return nipNetwork(a, b)
+ },
+ addMember: func(
+ a userT,
+ b networkT,
+ c newMemberT,
+ ) (memberT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return addMember(a, b, c)
+ },
+ showMember: func(a userT, b guuid.UUID) (memberT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return showMember(a, b)
+ },
+ members: func(
+ a userT,
+ b guuid.UUID,
+ callback func(memberT) error,
+ ) error {
+ var (
+ err error
+ rows *sql.Rows
+ )
+ {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ rows, err = members(a, b)
+ }
+ if err != nil {
+ return err
+ }
+
+ return memberEach(rows, callback)
+ },
+ editMember: func(a userT, b memberT) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return editMember(a, b)
+ },
+ dropMember: func(a userT, b guuid.UUID) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return dropMember(a, b)
+ },
addChannel: func(
a guuid.UUID, b newChannelT,
) (channelT, error) {
@@ -647,6 +2018,47 @@ func initDB(
return channelEach(rows, callback)
},
+ topic: func(a channelT) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return topic(a)
+ },
+ endChannel: func(a guuid.UUID) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return endChannel(a)
+ },
+ join: func(a guuid.UUID, b guuid.UUID) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return join(a, b)
+ },
+ part: func(a guuid.UUID, b guuid.UUID) error {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return part(a, b)
+ },
+ names: func(a guuid.UUID, callback func(memberT) error) error {
+ var (
+ err error
+ rows *sql.Rows
+ )
+ {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ rows, err = names(a)
+ }
+ if err != nil {
+ return err
+ }
+
+ return nameEach(rows, callback)
+ },
+ addEvent: func(a newEventT) (eventT, error) {
+ connMutex.RLock()
+ defer connMutex.RUnlock()
+ return addEvent(a)
+ },
allAfter: func(
a guuid.UUID,
callback func(eventT) error,
@@ -666,11 +2078,6 @@ func initDB(
return eventEach(rows, callback)
},
- addEvent: func(a string, b string) error {
- connMutex.RLock()
- defer connMutex.RUnlock()
- return addEvent(a, b)
- },
close: func() error {
connMutex.Lock()
defer connMutex.Unlock()
@@ -1163,3 +2570,4 @@ func Main() {
}
// FIXME: review usage of g.Fatal()
+// https://gist.github.com/xero/2d6e4b061b4ecbeb9f99