diff options
author | EuAndreh <eu@euandre.org> | 2024-10-31 15:50:56 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2024-10-31 15:50:56 -0300 |
commit | 985430232e659e5d501a81a0ef74e563e5b9009a (patch) | |
tree | 24451e4dfb0c186574e745f6ac1f7f2e1fb53596 /src | |
parent | .gitignore: Include pattern for cgo (diff) | |
download | papod-985430232e659e5d501a81a0ef74e563e5b9009a.tar.gz papod-985430232e659e5d501a81a0ef74e563e5b9009a.tar.xz |
Add initial implementation of some of `queriesT` functions
Diffstat (limited to 'src')
-rw-r--r-- | src/papod.go | 1698 |
1 files changed, 1553 insertions, 145 deletions
diff --git a/src/papod.go b/src/papod.go index aedfb5f..00086c3 100644 --- a/src/papod.go +++ b/src/papod.go @@ -3,7 +3,6 @@ package papod import ( "bufio" "bytes" - "context" "database/sql" "errors" "flag" @@ -40,43 +39,119 @@ type queryT struct{ } type queriesT struct{ - addNetwork func(guuid.UUID, newNetworkT) (networkT, error) + createUser func(newUserT) (userT, error) + userByUUID func(guuid.UUID) (userT, error) + updateUser func(userT) error + deleteUser func(guuid.UUID) error + addNetwork func(userT, newNetworkT) (networkT, error) + getNetwork func(userT, guuid.UUID) (networkT, error) + networks func(userT, func(networkT) error) error + setNetwork func(userT, networkT) error + nipNetwork func(userT, guuid.UUID) error + addMember func(userT, networkT, newMemberT) (memberT, error) + showMember func(userT, guuid.UUID) (memberT, error) + members func(userT, guuid.UUID, func(memberT) error) error + editMember func(userT, memberT) error + dropMember func(userT, guuid.UUID) error addChannel func(guuid.UUID, newChannelT) (channelT, error) channels func(guuid.UUID, func(channelT) error) error + topic func(channelT) error + endChannel func(guuid.UUID) error + join func(guuid.UUID, guuid.UUID) error + part func(guuid.UUID, guuid.UUID) error + names func(guuid.UUID, func(memberT) error) error + addEvent func(newEventT) (eventT, error) allAfter func(guuid.UUID, func(eventT) error) error - addEvent func(string, string) error close func() error } +type newUserT struct{ + uuid guuid.UUID + username string + displayName string +} + +type userT struct{ + id int64 + timestamp time.Time + uuid guuid.UUID + username string + displayName string + pictureID *guuid.UUID +} + +type NetworkType string +const ( + NetworkType_Public NetworkType = "public" + NetworkType_Private NetworkType = "private" + NetworkType_Unlisted NetworkType = "unlisted" +) + type newNetworkT struct{ - uuid guuid.UUID - name string + uuid guuid.UUID + name string + description string + type_ NetworkType } type networkT struct{ + id int64 + timestamp time.Time + uuid guuid.UUID + createdBy guuid.UUID + name string + description string + type_ NetworkType +} + +type newMemberT struct{ + userID guuid.UUID +} + +type memberT struct{ id int64 timestamp time.Time uuid guuid.UUID - name string - createdBy guuid.UUID } type newChannelT struct{ - name string - uuid guuid.UUID + id int64 + timestamp time.Time + uuid guuid.UUID + // networkID guuid.UUID FIXME + publicName string + label string + description string + virtual bool } type channelT struct { - id int64 - timestamp time.Time - uuid guuid.UUID - name string + id int64 + timestamp time.Time + uuid guuid.UUID + // networkID guuid.UUID FIXME + publicName string + label string + description string + virtual bool +} + +type newEventT struct{ + eventID guuid.UUID + channelID guuid.UUID + connectionID guuid.UUID + type_ string + payload string } type eventT struct{ - id int64 - timestamp time.Time - uuid guuid.UUID + id int64 + timestamp time.Time + uuid guuid.UUID + channelID guuid.UUID + connectionID guuid.UUID + type_ string + payload string } type papodT struct{ @@ -101,7 +176,7 @@ type connectionT struct { isAuthenticated bool } -type userT struct { +type userT2 struct { connections []connectionT } @@ -147,8 +222,55 @@ type IPapod interface{ -func tryRollback(db *sql.DB, ctx context.Context, err error) error { - _, rollbackErr := db.ExecContext(ctx, "ROLLBACK;") +func serialized[A any, B any](callback func(...A) B) (func(...A) B, func()) { + in := make(chan []A) + out := make(chan B) + + closed := false + var ( + closeWg sync.WaitGroup + closeMutex sync.Mutex + ) + closeWg.Add(1) + + go func() { + for input := range in { + out <- callback(input...) + } + close(out) + closeWg.Done() + }() + + fn := func(input ...A) B { + in <- input + return (<- out) + } + + closeFn := func() { + closeMutex.Lock() + defer closeMutex.Unlock() + if closed { + return + } + close(in) + closed = true + closeWg.Wait() + } + + return fn, closeFn +} + +func execSerialized(query string, db *sql.DB) (func(...any) error, func()) { + return serialized(func(args ...any) error { + return inTx(db, func(tx *sql.Tx) error { + _, err := tx.Exec(query, args...) + return err + }) + }) +} + +func tryRollback(tx *sql.Tx, err error) error { + rollbackErr := tx.Rollback() if rollbackErr != nil { return fmt.Errorf( rollbackErrorFmt, @@ -160,25 +282,20 @@ func tryRollback(db *sql.DB, ctx context.Context, err error) error { return err } -// FIXME -// See: -// https://sqlite.org/forum/forumpost/2507664507 -func inTx(db *sql.DB, fn func(context.Context) error) error { - ctx := context.Background() - - _, err := db.ExecContext(ctx, "BEGIN IMMEDIATE;") +func inTx(db *sql.DB, fn func(*sql.Tx) error) error { + tx, err := db.Begin() if err != nil { return err } - err = fn(ctx) + err = fn(tx) if err != nil { - return tryRollback(db, ctx, err) + return tryRollback(tx, err) } - _, err = db.ExecContext(ctx, "COMMIT;") + err = tx.Commit() if err != nil { - return tryRollback(db, ctx, err) + return tryRollback(tx, err) } return nil @@ -194,24 +311,13 @@ func inTx(db *sql.DB, fn func(context.Context) error) error { /// opaque IDs. func createTablesSQL(prefix string) queryT { const tmpl_write = ` - CREATE TABLE IF NOT EXISTS "%s_workspaces" ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - uuid BLOB NOT NULL UNIQUE, - name TEXT NOT NULL, - description TEXT NOT NULL, - -- "public", "private", "unlisted" - type TEXT NOT NULL - ); - CREATE TABLE IF NOT EXISTS "%s_workspace_changes ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (%s), - workspace_id INTEGER NOT NULL - REFERENCES "%s_workspaces"(id), - attribute TEXT NOT NULL, - value TEXT NOT NULL, - op BOOLEAN NOT NULL - ); + -- FIXME: unconfirmed premise: statements within a trigger are + -- part of the transaction that caused it, and so are + -- atomic. + -- See also: + -- https://stackoverflow.com/questions/77441888/ + -- https://stackoverflow.com/questions/30511116/ + CREATE TABLE IF NOT EXISTS "%s_users" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), @@ -219,60 +325,168 @@ func createTablesSQL(prefix string) queryT { uuid BLOB NOT NULL UNIQUE, username TEXT NOT NULL, display_name TEXT NOT NULL, - picture_uuid BLOB NOT NULL UNIQUE, - deleted BOOLEAN NOT NULL - ); + picture_uuid BLOB UNIQUE, + deleted INT NOT NULL + ) STRICT; CREATE TABLE IF NOT EXISTS "%s_user_changes" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), user_id INTEGER NOT NULL REFERENCES "%s_users"(id), - attribute TEXT NOT NULL, + attribute TEXT NOT NULL CHECK( + attribute IN ( + 'username', + 'display_name', + 'picture_uuid', + 'deleted' + ) + ), value TEXT NOT NULL, - op BOOLEAN NOT NULL - ); + op INT NOT NULL CHECK(op IN (0, 1)) + ) STRICT; + CREATE TRIGGER IF NOT EXISTS "%s_user_creation" + AFTER INSERT ON "%s_users" + BEGIN + INSERT INTO "%s_user_changes" ( + user_id, attribute, value, op + ) VALUES + (NEW.id, 'username', NEW.username, true), + (NEW.id, 'display_name', NEW.display_name, true), + (NEW.id, 'deleted', NEW.deleted, true) + ; + END; + CREATE TRIGGER IF NOT EXISTS "%s_user_creation_picture_uuid" + AFTER INSERT ON "%s_users" + WHEN NEW.picture_uuid != NULL + BEGIN + INSERT INTO "%s_user_changes" ( + user_id, attribute, value, op + ) VALUES + (NEW.id, 'picture_uuid', NEW.picture_uuid, true) + ; + END; + CREATE TRIGGER IF NOT EXISTS "%s_user_update_username" + AFTER UPDATE ON "%s_users" + WHEN OLD.username != NEW.username + BEGIN + INSERT INTO "%s_user_changes" ( + user_id, attribute, value, op + ) VALUES + (NEW.id, 'username', OLD.username, false), + (NEW.id, 'username', NEW.username, true) + ; + END; + CREATE TRIGGER IF NOT EXISTS "%s_user_update_display_name" + AFTER UPDATE ON "%s_users" + WHEN OLD.display_name != NEW.display_name + BEGIN + INSERT INTO "%s_user_changes" ( + user_id, attribute, value, op + ) VALUES + (NEW.id, 'display_name', OLD.display_name, false), + (NEW.id, 'display_name', NEW.display_name, true) + ; + END; + CREATE TRIGGER IF NOT EXISTS "%s_user_update_picture_uuid" + AFTER UPDATE ON "%s_users" + WHEN OLD.picture_uuid != NEW.picture_uuid + BEGIN + INSERT INTO "%s_user_changes" ( + user_id, attribute, value, op + ) VALUES + (NEW.id, 'picture_uuid', OLD.picture_uuid, false), + (NEW.id, 'picture_uuid', NEW.picture_uuid, true) + ; + END; + CREATE TRIGGER IF NOT EXISTS "%s_user_update_deleted" + AFTER UPDATE ON "%s_users" + WHEN OLD.deleted != NEW.deleted + BEGIN + INSERT INTO "%s_user_changes" ( + user_id, attribute, value, op + ) VALUES + (NEW.id, 'deleted', OLD.deleted, false), + (NEW.id, 'deleted', NEW.deleted, true) + ; + END; + + CREATE TABLE IF NOT EXISTS "%s_networks" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + uuid BLOB NOT NULL UNIQUE, + creator_id INTEGER NOT NULL REFERENCES "%s_users"(id), + name TEXT NOT NULL, + description TEXT NOT NULL, + type TEXT NOT NULL CHECK( + type IN ('public', 'private', 'unlisted') + ) + ) STRICT; + CREATE TABLE IF NOT EXISTS "%s_network_changes" ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (%s), + network_id INTEGER NOT NULL + REFERENCES "%s_networks"(id), + attribute TEXT NOT NULL CHECK( + attribute IN ( + 'name', + 'description', + 'type' + ) + ), + value TEXT NOT NULL, + op INT NOT NULL CHECK(op IN (0, 1)) + ) STRICT; + CREATE TABLE IF NOT EXISTS "%s_members" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), - workspace_id INTEGER NOT NULL - REFERENCES "%s_workspaces"(id), + network_id INTEGER NOT NULL + REFERENCES "%s_networks"(id), user_id INTEGER NOT NULL, username TEXT NOT NULL, display_name TEXT NOT NULL, - picture_uuid BLOB NOT NULL UNIQUE, - -- "active", "inactive", "removed" - status TEXT NOT NULL, - -- "active", always - active_uniq TEXT, - UNIQUE (workspace_id, username, active_uniq), - UNIQUE (workspace_id, user_id) - ); + picture_uuid BLOB UNIQUE, + status TEXT NOT NULL CHECK( + status IN ('active', 'inactive', 'removed') + ), + active_uniq TEXT CHECK( + active_uniq IN ('active', NULL) + ), + UNIQUE (network_id, username, active_uniq), + UNIQUE (network_id, user_id) + ) STRICT; + CREATE TABLE IF NOT EXISTS "%s_member_roles" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, member_id INTEGER NOT NULL REFERENCES "%s_members"(id), role TEXT NOT NULL, UNIQUE (member_id, role) - ); + ) STRICT; + + -- FIXME: use a trigger CREATE TABLE IF NOT EXISTS "%s_member_changes" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), - member_id INTEGER NOT NULL + member_id INTEGER NOT NULL REFERENCES "%s_members"(id), attribute TEXT NOT NULL, value TEXT NOT NULL, - op BOOLEAN NOT NULL - ); + op INT NOT NULL CHECK(op IN (0, 1)) + ) STRICT; + CREATE TABLE IF NOT EXISTS "%s_channels" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, - workspace_id INTEGER NOT NULL - REFERENCES "%s_workspaces"(id), + network_id INTEGER -- FIXME NOT NULL + REFERENCES "%s_networks"(id), public_name TEXT UNIQUE, label TEXT NOT NULL, - description TEXT, - virtual BOOLEAN NOT NULL - ); + description TEXT NOT NULL, + virtual INT NOT NULL CHECK(virtual IN (0, 1)) + ) STRICT; + + -- FIXME: use a trigger CREATE TABLE IF NOT EXISTS "%s_channel_changes" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), @@ -280,23 +494,40 @@ func createTablesSQL(prefix string) queryT { REFERENCES "%s_channels"(id), attribute TEXT NOT NULL, value TEXT NOT NULL, - op BOOLEAN NOT NULL - ); + op INT NOT NULL CHECK(op IN (0, 1)) + ) STRICT; + CREATE TABLE IF NOT EXISTS "%s_participants" ( - member_id - ); + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + channel_id INTEGER NOT NULL + REFERENCES "%s_channels"(id), + member_id INTEGER NOT NULL + REFERENCES "%s_members"(id), + UNIQUE (channel_id, member_id) + ) STRICT; + + -- FIXME: create table for connections? + -- A user can have multiple sessions (different browsers, + -- mobile, etc.), and each session has multiple connections, as + -- the user connects and disconnections using the same session + -- id, all while it is valid. + -- FIXME: can a connection have multiple sessions? A long-lived + -- connection that spans multiple sessions would fit into this. CREATE TABLE IF NOT EXISTS "%s_channel_events" ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (%s), uuid BLOB NOT NULL UNIQUE, - channel_id INTEGER NOT NULL REFERENCES "%s"(id), - connection_id INTEGER NOT NULL, - -- payload FIXME: vary by type? - ); - -- FIXME: group conversations? - -- user: person - -- member: workspace user - -- participant: channel member + channel_id INTEGER NOT NULL + REFERENCES "%s_channels"(id), + connection_uuid BLOB NOT NULL, -- FIXME: join + type TEXT NOT NULL CHECK( + type IN ( + 'user-join', + 'user-message' + ) + ), + payload TEXT NOT NULL + ) STRICT; ` return queryT{ write: fmt.Sprintf( @@ -307,11 +538,49 @@ func createTablesSQL(prefix string) queryT { g.SQLiteNow, prefix, prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + g.SQLiteNow, + prefix, + prefix, + g.SQLiteNow, + prefix, + prefix, + g.SQLiteNow, + prefix, + prefix, + prefix, + prefix, g.SQLiteNow, prefix, prefix, g.SQLiteNow, prefix, + prefix, + g.SQLiteNow, + prefix, + prefix, + prefix, + prefix, + prefix, + g.SQLiteNow, + prefix, ), } } @@ -319,28 +588,246 @@ func createTablesSQL(prefix string) queryT { func createTables(db *sql.DB, prefix string) error { q := createTablesSQL(prefix) - return inTx(db, func(ctx context.Context) error { - _, err := db.ExecContext(ctx, q.write) + return inTx(db, func(tx *sql.Tx) error { + _, err := tx.Exec(q.write) return err }) } -// addServer -// addWorkspace -// addNetwork FIXME -func addNetworkSQL(prefix string) queryT { +func createUserSQL(prefix string) queryT { const tmpl_write = ` - -- FIXME + INSERT INTO "%s_users" ( + uuid, username, display_name, picture_uuid, deleted + ) VALUES ( + ?, ?, ?, NULL, false + ) RETURNING id, timestamp; + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func createUserStmt( + db *sql.DB, + prefix string, +) (func(newUserT) (userT, error), func() error, error) { + q := createUserSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(newUser newUserT) (userT, error) { + user := userT{ + uuid: newUser.uuid, + username: newUser.username, + displayName: newUser.displayName, + } + + var timestr string + err := writeStmt.QueryRow( + newUser.uuid[:], + newUser.username, + newUser.displayName, + ).Scan(&user.id, ×tr) + if err != nil { + return userT{}, err + } + + user.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return userT{}, err + } + + return user, nil + } + + return fn, writeStmt.Close, nil +} + +func userByUUIDSQL(prefix string) queryT { + const tmpl_read = ` + SELECT + id, + timestamp, + username, + display_name, + picture_uuid + FROM "%s_users" + WHERE + uuid = ? AND + deleted = false; + ` + return queryT{ + read: fmt.Sprintf(tmpl_read, prefix), + } +} + +func userByUUIDStmt( + db *sql.DB, + prefix string, +) (func(guuid.UUID) (userT, error), func() error, error) { + q := userByUUIDSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(userID guuid.UUID) (userT, error) { + user := userT{ + uuid: userID, + } + + var ( + timestr string + picture_id_bytes []byte + ) + err := readStmt.QueryRow(userID[:]).Scan( + &user.id, + ×tr, + &user.username, + &user.displayName, + &picture_id_bytes, + ) + if err != nil { + return userT{}, err + } + if picture_id_bytes != nil { + pictureID := guuid.UUID(picture_id_bytes) + user.pictureID = &pictureID + } + + user.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return userT{}, err + } + + return user, err + } + + return fn, readStmt.Close, nil +} + +func updateUserSQL(prefix string) queryT { + const tmpl_write = ` + UPDATE "%s_users" + SET + username = ?, + display_name = ?, + picture_uuid = ? + WHERE + id = ? AND + deleted = false + RETURNING id; + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func updateUserStmt( + db *sql.DB, + prefix string, +) (func(userT) error, func() error, error) { + q := updateUserSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(user userT) error { + var picture_id_bytes []byte = nil + if user.pictureID != nil { + picture_id_bytes = user.pictureID[:] + } + var _id int64 + return writeStmt.QueryRow( + user.username, + user.displayName, + picture_id_bytes, + user.id, + ).Scan(&_id) + } + + return fn, writeStmt.Close, nil +} + +func deleteUserSQL(prefix string) queryT { + const tmpl_write = ` + UPDATE "%s_users" + SET deleted = true + WHERE + uuid = ? AND + deleted = false + RETURNING id; ` return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } +func deleteUserStmt( + db *sql.DB, + prefix string, +) (func(guuid.UUID) error, func() error, error) { + q := deleteUserSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(userID guuid.UUID) error { + var _id int64 + return writeStmt.QueryRow(userID[:]).Scan(&_id) + } + + return fn, writeStmt.Close, nil +} + +func addNetworkSQL(prefix string) queryT { + const tmpl_write = ` + INSERT INTO "%s_networks" ( + uuid, name, description, type, creator_id + ) + VALUES ( + ?, + ?, + ?, + ?, + ( + SELECT id FROM "%s_users" + WHERE id = ? AND deleted = false + ) + ) RETURNING id, timestamp; + + INSERT INTO "%s_members" ( + network_id, user_id, username, display_name, + picture_uuid, status, active_uniq + ) VALUES ( + last_insert_rowid(), + ?, + ( + SELECT username, display_name, picture_uuid + FROM "%s_users" + WHERE id = ? AND deleted = false + ), + 'active', + 'active' + ) RETURNING id, timestamp; + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix, prefix), + } +} + func addNetworkStmt( db *sql.DB, prefix string, -) (func(guuid.UUID, newNetworkT) (networkT, error), func() error, error) { +) (func(userT, newNetworkT) (networkT, error), func() error, error) { q := addNetworkSQL(prefix) writeStmt, err := db.Prepare(q.write) @@ -349,25 +836,172 @@ func addNetworkStmt( } fn := func( - userID guuid.UUID, + user userT, newNetwork newNetworkT, ) (networkT, error) { network := networkT{ - uuid: newNetwork.uuid, - name: newNetwork.name, + uuid: newNetwork.uuid, + createdBy: user.uuid, + name: newNetwork.name, + description: newNetwork.description, + type_: newNetwork.type_, + } + + member := memberT{ } var timestr string - network_id_bytes := newNetwork.uuid[:] - user_id_bytes := userID[:] - err := writeStmt.QueryRow( - network_id_bytes, - newNetwork.name, - user_id_bytes, - ).Scan(&network.id, ×tr) + { + rows, err := writeStmt.Query( + newNetwork.uuid[:], + newNetwork.name, + newNetwork.description, + newNetwork.type_, + user.id, + ) + if err != nil { + return networkT{}, err + } + defer rows.Close() + + { + if !rows.Next() { + return networkT{}, sql.ErrNoRows + } + + err := rows.Scan(&network.id, ×tr) + if err != nil { + return networkT{}, err + } + + network.timestamp, err = time.Parse( + time.RFC3339Nano, + timestr, + ) + if err != nil { + return networkT{}, err + } + } + + { + if !rows.Next() { + return networkT{}, sql.ErrNoRows + } + + err := rows.Scan(&member.id, ×tr) + if err != nil { + return networkT{}, err + } + + member.timestamp, err = time.Parse( + time.RFC3339Nano, + timestr, + ) + if err != nil { + return networkT{}, err + } + } + + { + if rows.Next() { + return networkT{}, errors.New("FIXME") + } + err := rows.Err() + + if err != nil { + return networkT{}, err + } + } + } + + return network, nil + } + + return fn, writeStmt.Close, nil +} + +func getNetworkSQL(prefix string) queryT { + const tmpl_read = ` + SELECT + "%s_networks".id, + "%s_networks".timestamp, + "%s_users".uuid, + "%s_networks".name, + "%s_networks".description, + "%s_networks".type + FROM "%s_networks" + JOIN "%s_users" ON + "%s_users".id = "%s_networks".creator_id + WHERE + "%s_networks".uuid = $networkUUID AND + $userID IN ( + SELECT id FROM "%s_users" + WHERE id = $userID AND deleted = false + ) AND + ( + "%s_networks".type IN ('public', 'unlisted') OR + $userID IN ( + SELECT user_id FROM "%s_members" + WHERE + user_id = $userID AND + network_id = "%s_networks".id + ) + ); + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + +func getNetworkStmt( + db *sql.DB, + prefix string, +) (func(userT, guuid.UUID) (networkT, error), func() error, error) { + q := getNetworkSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(user userT, networkID guuid.UUID) (networkT, error) { + network := networkT{ + uuid: networkID, + } + + var ( + timestr string + creator_id_bytes []byte + ) + err := readStmt.QueryRow(networkID[:], user.id).Scan( + &network.id, + ×tr, + &creator_id_bytes, + &network.name, + &network.description, + &network.type_, + ) if err != nil { return networkT{}, err } + network.createdBy = guuid.UUID(creator_id_bytes) network.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { @@ -377,14 +1011,341 @@ func addNetworkStmt( return network, nil } + return fn, readStmt.Close, nil +} + +func networkEach(rows *sql.Rows, callback func(networkT) error) error { + if rows == nil { + return nil + } + + for rows.Next() { + var ( + network networkT + timestr string + network_id_bytes []byte + ) + err := rows.Scan( + &network.id, + ×tr, + &network_id_bytes, + ) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + network.uuid = guuid.UUID(network_id_bytes) + + network.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + + err = callback(network) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + } + + return g.WrapErrors(rows.Err(), rows.Close()) +} + +func networksSQL(prefix string) queryT { + const tmpl_read = ` + -- FIXME + ` + return queryT{ + read: fmt.Sprintf(tmpl_read, prefix), + } +} + +func networksStmt( + db *sql.DB, + prefix string, +) (func(userT) (*sql.Rows, error), func() error, error) { + q := networksSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(user userT) (*sql.Rows, error) { + return readStmt.Query(user.uuid[:]) + } + + return fn, readStmt.Close, nil +} + +func setNetworkSQL(prefix string) queryT { + const tmpl_write = ` + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func setNetworkStmt( + db *sql.DB, + prefix string, +) (func(userT, networkT) error, func() error, error) { + q := setNetworkSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(user userT, network networkT) error { + _, err := writeStmt.Exec(network) + return err + } + return fn, writeStmt.Close, nil } -func addChannelSQL(prefix string) queryT { +func nipNetworkSQL(prefix string) queryT { + const tmpl_write = ` + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func nipNetworkStmt( + db *sql.DB, + prefix string, +) (func(userT, guuid.UUID) error, func() error, error) { + q := nipNetworkSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(user userT, networkID guuid.UUID) error { + _, err := writeStmt.Exec(networkID[:]) + return err + } + + return fn, writeStmt.Close, nil +} + + +func addMemberSQL(prefix string) queryT { const tmpl_write = ` -- FIXME ` return queryT{ + write: fmt.Sprintf(tmpl_write), + } +} + +func addMemberStmt( + db *sql.DB, + prefix string, +) (func(userT, networkT, newMemberT) (memberT, error), func() error, error) { + q := addMemberSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func( + user userT, + network networkT, + newMember newMemberT, + ) (memberT, error) { + member := memberT{ + } + + var timestr string + err := writeStmt.QueryRow(network.uuid[:], newMember).Scan( + &member.id, + ×tr, + ) + if err != nil { + return memberT{}, err + } + + member.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return memberT{}, err + } + + return member, nil + } + + return fn, writeStmt.Close, nil +} + +func showMemberSQL(prefix string) queryT { + const tmpl_read = ` + ` + return queryT{ + read: fmt.Sprintf(tmpl_read, prefix), + } +} + +func showMemberStmt( + db *sql.DB, + prefix string, +) (func(userT, guuid.UUID) (memberT, error), func() error, error) { + q := showMemberSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(user userT, memberID guuid.UUID) (memberT, error) { + member := memberT{ + uuid: memberID, + } + + var timestr string + err := readStmt.QueryRow(memberID[:]).Scan( + &member.id, + ×tr, + ) + if err != nil { + return memberT{}, err + } + + member.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return memberT{}, err + } + + return member, err + } + + return fn, readStmt.Close, nil +} + +func memberEach(rows *sql.Rows, callback func(memberT) error) error { + if rows == nil { + return nil + } + + for rows.Next() { + var ( + member memberT + timestr string + member_id_bytes []byte + ) + err := rows.Scan( + &member.id, + ×tr, + &member_id_bytes, + ) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + // member.uuid = guuid.UUID(member_id_bytes) FIXME + + member.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + + err = callback(member) + if err != nil { + return g.WrapErrors(rows.Close(), err) + } + } + + return g.WrapErrors(rows.Err(), rows.Close()) +} + +func membersSQL(prefix string) queryT { + const tmpl_read = ` + -- FIXME + ` + return queryT{ + read: fmt.Sprintf(tmpl_read, prefix), + } +} + +func membersStmt( + db *sql.DB, + prefix string, +) (func(userT, guuid.UUID) (*sql.Rows, error), func() error, error) { + q := membersSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(user userT, networkID guuid.UUID) (*sql.Rows, error) { + return readStmt.Query(networkID[:]) + } + + return fn, readStmt.Close, nil +} + +func editMemberSQL(prefix string) queryT { + const tmpl_write = ` + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func editMemberStmt( + db *sql.DB, + prefix string, +) (func(userT, memberT) error, func() error, error) { + q := editMemberSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(user userT, member memberT) error { + _, err := writeStmt.Exec(member) + return err + } + + return fn, writeStmt.Close, nil +} + +func dropMemberSQL(prefix string) queryT { + const tmpl_write = ` + ` + return queryT{ + write: fmt.Sprintf(tmpl_write), + } +} + +func dropMemberStmt( + db *sql.DB, + prefix string, +) (func(userT, guuid.UUID) error, func() error, error) { + q := dropMemberSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(user userT, memberID guuid.UUID) error { + _, err := writeStmt.Exec(memberID[:]) + return err + } + + return fn, writeStmt.Close, nil +} + +func addChannelSQL(prefix string) queryT { + const tmpl_write = ` + INSERT INTO "%s_channels" ( + uuid, public_name, label, description, virtual + ) VALUES (?, ?, ?, ?, ?) RETURNING id, timestamp; + ` + return queryT{ write: fmt.Sprintf(tmpl_write, prefix), } } @@ -405,14 +1366,21 @@ func addChannelStmt( newChannel newChannelT, ) (channelT, error) { channel := channelT{ - name: newChannel.name, + uuid: newChannel.uuid, + // networkID[:], + publicName: newChannel.publicName, + label: newChannel.label, + description: newChannel.description, + virtual: newChannel.virtual, } var timestr string - network_id_bytes := networkID[:] err := writeStmt.QueryRow( - network_id_bytes, - newChannel.name, + newChannel.uuid[:], + newChannel.publicName, + newChannel.label, + newChannel.description, + newChannel.virtual, ).Scan(&channel.id, ×tr) if err != nil { return channelT{}, err @@ -426,7 +1394,7 @@ func addChannelStmt( return channel, nil } - return fn, nil, nil + return fn, writeStmt.Close, nil } func channelEach(rows *sql.Rows, callback func(channelT) error) error { @@ -446,17 +1414,18 @@ func channelEach(rows *sql.Rows, callback func(channelT) error) error { &channel_id_bytes, ) if err != nil { - g.WrapErrors(rows.Close(), err) + return g.WrapErrors(rows.Close(), err) } + channel.uuid = guuid.UUID(channel_id_bytes) channel.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { - g.WrapErrors(rows.Close(), err) + return g.WrapErrors(rows.Close(), err) } err = callback(channel) if err != nil { - g.WrapErrors(rows.Close(), err) + return g.WrapErrors(rows.Close(), err) } } @@ -483,39 +1452,150 @@ func channelsStmt( return nil, nil, err } - fn := func(workspaceID guuid.UUID) (*sql.Rows, error) { - return readStmt.Query(workspaceID) + fn := func(networkID guuid.UUID) (*sql.Rows, error) { + return readStmt.Query(networkID[:]) } return fn, readStmt.Close, nil } -func eventEach(rows *sql.Rows, callback func(eventT) error) error { +func topicSQL(prefix string) queryT { + const tmpl_write = ` + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func topicStmt( + db *sql.DB, + prefix string, +) (func(channelT) error, func() error, error) { + q := topicSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(channel channelT) error { + _, err := writeStmt.Exec(channel) + return err + } + + return fn, writeStmt.Close, nil +} + +func endChannelSQL(prefix string) queryT { + const tmpl_write = ` + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func endChannelStmt( + db *sql.DB, + prefix string, +) (func(guuid.UUID) error, func() error, error) { + q := endChannelSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(channelID guuid.UUID) error { + _, err := writeStmt.Exec(channelID[:]) + return err + } + + return fn, writeStmt.Close, nil +} + +func joinSQL(prefix string) queryT { + const tmpl_write = ` + -- FIXME + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func joinStmt( + db *sql.DB, + prefix string, +) (func(guuid.UUID, guuid.UUID) error, func() error, error) { + q := joinSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(memberID guuid.UUID, channelID guuid.UUID) error { + _, err := writeStmt.Exec(memberID[:], channelID[:]) + return err + } + + return fn, writeStmt.Close, nil +} + +func partSQL(prefix string) queryT { + const tmpl_write = ` + -- FIXME + ` + return queryT{ + write: fmt.Sprintf(tmpl_write, prefix), + } +} + +func partStmt( + db *sql.DB, + prefix string, +) (func(guuid.UUID, guuid.UUID) error, func() error, error) { + q := partSQL(prefix) + + writeStmt, err := db.Prepare(q.write) + if err != nil { + return nil, nil, err + } + + fn := func(memberID guuid.UUID, channelID guuid.UUID) error { + _, err := writeStmt.Exec(memberID[:], channelID[:]) + return err + } + + return fn, writeStmt.Close, nil +} + +func nameEach(rows *sql.Rows, callback func(memberT) error) error { if rows == nil { return nil } for rows.Next() { var ( - event eventT - timestr string - event_id_bytes []byte + member memberT + timestr string + member_id_bytes []byte ) err := rows.Scan( - &event.id, + &member.id, ×tr, - &event_id_bytes, + &member_id_bytes, ) if err != nil { return g.WrapErrors(rows.Close(), err) } + member.uuid = guuid.UUID(member_id_bytes) - event.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + member.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { return g.WrapErrors(rows.Close(), err) } - err = callback(event) + err = callback(member) if err != nil { return g.WrapErrors(rows.Close(), err) } @@ -524,7 +1604,7 @@ func eventEach(rows *sql.Rows, callback func(eventT) error) error { return g.WrapErrors(rows.Err(), rows.Close()) } -func allAfterSQL(prefix string) queryT { +func namesSQL(prefix string) queryT { const tmpl_read = ` -- FIXME ` @@ -533,19 +1613,19 @@ func allAfterSQL(prefix string) queryT { } } -func allAfterStmt( +func namesStmt( db *sql.DB, prefix string, -) (func (guuid.UUID) (*sql.Rows, error), func() error, error) { - q := allAfterSQL(prefix) +) (func(guuid.UUID) (*sql.Rows, error), func() error, error) { + q := namesSQL(prefix) readStmt, err := db.Prepare(q.read) if err != nil { return nil, nil, err } - fn := func(eventID guuid.UUID) (*sql.Rows, error) { - return readStmt.Query(eventID) + fn := func(channelID guuid.UUID) (*sql.Rows, error) { + return readStmt.Query(channelID[:]) } return fn, readStmt.Close, nil @@ -553,17 +1633,25 @@ func allAfterStmt( func addEventSQL(prefix string) queryT { const tmpl_write = ` - -- FIXME + INSERT INTO "%s_channel_events" ( + uuid, channel_id, connection_uuid, type, payload + ) VALUES ( + ?, + (SELECT id FROM "%s_channels" WHERE uuid = ?), + ?, + ?, + ? + ) RETURNING id, timestamp; ` return queryT{ - write: fmt.Sprintf(tmpl_write, prefix), + write: fmt.Sprintf(tmpl_write, prefix, prefix), } } func addEventStmt( db *sql.DB, prefix string, -) (func (string, string) error, func() error, error) { +) (func (newEventT) (eventT, error), func() error, error) { q := addEventSQL(prefix) writeStmt, err := db.Prepare(q.write) @@ -571,34 +1659,201 @@ func addEventStmt( return nil, nil, err } - fn := func(type_ string, payload string) error { - _, err := writeStmt.Exec(type_, payload) - return err + fn := func(newEvent newEventT) (eventT, error) { + event := eventT{ + uuid: newEvent.eventID, + channelID: newEvent.channelID, + connectionID: newEvent.connectionID, + type_: newEvent.type_, + payload: newEvent.payload, + } + + var timestr string + err := writeStmt.QueryRow( + newEvent.eventID[:], + newEvent.channelID[:], + newEvent.connectionID[:], + newEvent.type_, + newEvent.payload, + ).Scan(&event.id, ×tr) + if err != nil { + return eventT{}, err + } + + event.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + return eventT{}, err + } + + return event, nil } return fn, writeStmt.Close, nil } +func eventEach(rows *sql.Rows, callback func(eventT) error) error { + if rows == nil { + return nil + } + + for rows.Next() { + var ( + event eventT + timestr string + event_id_bytes []byte + channel_id_bytes []byte + connection_id_bytes []byte + ) + err := rows.Scan( + &event.id, + ×tr, + &event_id_bytes, + &channel_id_bytes, + &connection_id_bytes, + &event.type_, + &event.payload, + ) + if err != nil { + rows.Close() + return err + } + event.uuid = guuid.UUID(event_id_bytes) + event.channelID = guuid.UUID(channel_id_bytes) + event.connectionID = guuid.UUID(connection_id_bytes) + + event.timestamp, err = time.Parse(time.RFC3339Nano, timestr) + if err != nil { + rows.Close() + return err + } + + err = callback(event) + if err != nil { + rows.Close() + return err + } + } + + return g.WrapErrors(rows.Err(), rows.Close()) +} + +func allAfterSQL(prefix string) queryT { + const tmpl_read = ` + WITH landmark_event AS ( + SELECT id, channel_id + FROM "%s_channel_events" + WHERE uuid = ? + ) + SELECT + "%s_channel_events".id, + "%s_channel_events".timestamp, + "%s_channel_events".uuid, + "%s_channels".uuid, + "%s_channel_events".connection_uuid, + "%s_channel_events".type, + "%s_channel_events".payload + FROM "%s_channel_events" + JOIN "%s_channels" ON + "%s_channel_events".channel_id = "%s_channels".id + WHERE + "%s_channel_events".id > ( + SELECT id FROM landmark_event + ) AND channel_id = ( + SELECT channel_id FROM landmark_event + ); + ` + return queryT{ + read: fmt.Sprintf( + tmpl_read, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + prefix, + ), + } +} + +func allAfterStmt( + db *sql.DB, + prefix string, +) (func (guuid.UUID) (*sql.Rows, error), func() error, error) { + q := allAfterSQL(prefix) + + readStmt, err := db.Prepare(q.read) + if err != nil { + return nil, nil, err + } + + fn := func(eventID guuid.UUID) (*sql.Rows, error) { + return readStmt.Query(eventID[:]) + } + + return fn, readStmt.Close, nil +} + func initDB( db *sql.DB, prefix string, ) (queriesT, error) { createTablesErr := createTables(db, prefix) -// addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceStmt(db, prefix) -// addWorkspace, addWorkspaceClose, addWorkspaceErr := addWorkspaceQ(db, p) + createUser, createUserClose, createUserErr := createUserStmt(db, prefix) + userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(db, prefix) + updateUser, updateUserClose, updateUserErr := updateUserStmt(db, prefix) + deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(db, prefix) addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix) + getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(db, prefix) + networks, networksClose, networksErr := networksStmt(db, prefix) + setNetwork, setNetworkClose, setNetworkErr := setNetworkStmt(db, prefix) + nipNetwork, nipNetworkClose, nipNetworkErr := nipNetworkStmt(db, prefix) + addMember, addMemberClose, addMemberErr := addMemberStmt(db, prefix) + showMember, showMemberClose, showMemberErr := showMemberStmt(db, prefix) + members, membersClose, membersErr := membersStmt(db, prefix) + editMember, editMemberClose, editMemberErr := editMemberStmt(db, prefix) + dropMember, dropMemberClose, dropMemberErr := dropMemberStmt(db, prefix) addChannel, addChannelClose, addChannelErr := addChannelStmt(db, prefix) channels, channelsClose, channelsErr := channelsStmt(db, prefix) - allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix) + topic, topicClose, topicErr := topicStmt(db, prefix) + endChannel, endChannelClose, endChannelErr := endChannelStmt(db, prefix) + join, joinClose, joinErr := joinStmt(db, prefix) + part, partClose, partErr := partStmt(db, prefix) + names, namesClose, namesErr := namesStmt(db, prefix) addEvent, addEventClose, addEventErr := addEventStmt(db, prefix) + allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix) err := g.SomeError( createTablesErr, + createUserErr, + userByUUIDErr, + updateUserErr, + deleteUserErr, addNetworkErr, + getNetworkErr, + networksErr, + setNetworkErr, + nipNetworkErr, + addMemberErr, + showMemberErr, + membersErr, + editMemberErr, + dropMemberErr, addChannelErr, channelsErr, - allAfterErr, + topicErr, + endChannelErr, + joinErr, + partErr, + namesErr, addEventErr, + allAfterErr, ) if err != nil { return queriesT{}, err @@ -606,21 +1861,137 @@ func initDB( close := func() error { return g.SomeFnError( + createUserClose, + userByUUIDClose, + updateUserClose, + deleteUserClose, addNetworkClose, + getNetworkClose, + networksClose, + setNetworkClose, + nipNetworkClose, + addMemberClose, + showMemberClose, + membersClose, + editMemberClose, + dropMemberClose, addChannelClose, channelsClose, - allAfterClose, + topicClose, + endChannelClose, + joinClose, + partClose, + namesClose, addEventClose, + allAfterClose, ) } var connMutex sync.RWMutex return queriesT{ - addNetwork: func(a guuid.UUID, b newNetworkT) (networkT, error) { + createUser: func(a newUserT) (userT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return createUser(a) + }, + userByUUID: func(a guuid.UUID) (userT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return userByUUID(a) + }, + updateUser: func(a userT) error { + connMutex.RLock() + defer connMutex.RUnlock() + return updateUser(a) + }, + deleteUser: func(a guuid.UUID) error { + connMutex.RLock() + defer connMutex.RUnlock() + return deleteUser(a) + }, + addNetwork: func(a userT, b newNetworkT) (networkT, error) { connMutex.RLock() defer connMutex.RUnlock() return addNetwork(a, b) }, + getNetwork: func(a userT, b guuid.UUID) (networkT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return getNetwork(a, b) + }, + networks: func( + a userT, + callback func(networkT) error, + ) error { + var ( + err error + rows *sql.Rows + ) + { + connMutex.RLock() + defer connMutex.RUnlock() + rows, err = networks(a) + } + if err != nil { + return err + } + + return networkEach(rows, callback) + }, + setNetwork: func(a userT, b networkT) error { + connMutex.RLock() + defer connMutex.RUnlock() + return setNetwork(a, b) + }, + nipNetwork: func(a userT, b guuid.UUID) error { + connMutex.RLock() + defer connMutex.RUnlock() + return nipNetwork(a, b) + }, + addMember: func( + a userT, + b networkT, + c newMemberT, + ) (memberT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return addMember(a, b, c) + }, + showMember: func(a userT, b guuid.UUID) (memberT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return showMember(a, b) + }, + members: func( + a userT, + b guuid.UUID, + callback func(memberT) error, + ) error { + var ( + err error + rows *sql.Rows + ) + { + connMutex.RLock() + defer connMutex.RUnlock() + rows, err = members(a, b) + } + if err != nil { + return err + } + + return memberEach(rows, callback) + }, + editMember: func(a userT, b memberT) error { + connMutex.RLock() + defer connMutex.RUnlock() + return editMember(a, b) + }, + dropMember: func(a userT, b guuid.UUID) error { + connMutex.RLock() + defer connMutex.RUnlock() + return dropMember(a, b) + }, addChannel: func( a guuid.UUID, b newChannelT, ) (channelT, error) { @@ -647,6 +2018,47 @@ func initDB( return channelEach(rows, callback) }, + topic: func(a channelT) error { + connMutex.RLock() + defer connMutex.RUnlock() + return topic(a) + }, + endChannel: func(a guuid.UUID) error { + connMutex.RLock() + defer connMutex.RUnlock() + return endChannel(a) + }, + join: func(a guuid.UUID, b guuid.UUID) error { + connMutex.RLock() + defer connMutex.RUnlock() + return join(a, b) + }, + part: func(a guuid.UUID, b guuid.UUID) error { + connMutex.RLock() + defer connMutex.RUnlock() + return part(a, b) + }, + names: func(a guuid.UUID, callback func(memberT) error) error { + var ( + err error + rows *sql.Rows + ) + { + connMutex.RLock() + defer connMutex.RUnlock() + rows, err = names(a) + } + if err != nil { + return err + } + + return nameEach(rows, callback) + }, + addEvent: func(a newEventT) (eventT, error) { + connMutex.RLock() + defer connMutex.RUnlock() + return addEvent(a) + }, allAfter: func( a guuid.UUID, callback func(eventT) error, @@ -666,11 +2078,6 @@ func initDB( return eventEach(rows, callback) }, - addEvent: func(a string, b string) error { - connMutex.RLock() - defer connMutex.RUnlock() - return addEvent(a, b) - }, close: func() error { connMutex.Lock() defer connMutex.Unlock() @@ -1163,3 +2570,4 @@ func Main() { } // FIXME: review usage of g.Fatal() +// https://gist.github.com/xero/2d6e4b061b4ecbeb9f99 |