summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2024-11-04 09:13:33 -0300
committerEuAndreh <eu@euandre.org>2024-11-04 15:50:31 -0300
commit1feab2105ac464747b231232f53f4ac335f4467f (patch)
treecb4fd127f2bb59aa70deb44a8f7a30944f9a7936
parentmv doc/rfc/ doc/rfcs; Add newer related rfcs (diff)
downloadpapod-1feab2105ac464747b231232f53f4ac335f4467f.tar.gz
papod-1feab2105ac464747b231232f53f4ac335f4467f.tar.xz
src/papod.go: Integrate db layer with network, create command handlers, simplify network loop
-rw-r--r--Makefile6
-rw-r--r--src/papod.go1577
-rw-r--r--tests/papod.go142
-rw-r--r--tests/queries.sql220
4 files changed, 1229 insertions, 716 deletions
diff --git a/Makefile b/Makefile
index 8eed63d..ddbe3cd 100644
--- a/Makefile
+++ b/Makefile
@@ -65,8 +65,8 @@ side-assets = \
tests/functional/*/*.go.db* \
tests/fuzz/corpus/ \
tests/benchmarks/*/main.txt \
- $(NAME).public.socket \
- $(NAME).command.socket \
+ $(NAME).daemon.sock \
+ $(NAME).commander.sock \
@@ -176,7 +176,7 @@ uninstall:
## Run it locally.
run: all
- rm -f $(NAME).public.socket $(NAME).command.socket
+ rm -f $(NAME).daemon.sock $(NAME).commander.sock
./$(NAME).bin
diff --git a/src/papod.go b/src/papod.go
index 00086c3..4bc5b4d 100644
--- a/src/papod.go
+++ b/src/papod.go
@@ -5,18 +5,18 @@ import (
"bytes"
"database/sql"
"errors"
- "flag"
"fmt"
"io"
"log/slog"
"net"
+ "os"
"regexp"
"strings"
"sync"
"time"
"cracha"
- "q"
+ "fiinha"
"golite"
"guuid"
g "gobang"
@@ -29,10 +29,19 @@ const (
rollbackErrorFmt = "rollback error: %w; while executing: %w"
NEW_CHANNEL = "new-channel"
+
+ pingFrequency = time.Duration(30) * time.Second
+ pongMaxLatency = time.Duration(5) * time.Second
)
+type dbconfigT struct{
+ shared *sql.DB
+ dbpath string
+ prefix string
+}
+
type queryT struct{
write string
read string
@@ -62,6 +71,7 @@ type queriesT struct{
names func(guuid.UUID, func(memberT) error) error
addEvent func(newEventT) (eventT, error)
allAfter func(guuid.UUID, func(eventT) error) error
+ logMessage func(userT, messageT) error
close func() error
}
@@ -125,7 +135,7 @@ type newChannelT struct{
virtual bool
}
-type channelT struct {
+type channelT struct{
id int64
timestamp time.Time
uuid guuid.UUID
@@ -152,72 +162,80 @@ type eventT struct{
connectionID guuid.UUID
type_ string
payload string
+ previous *eventT
+ isFist bool
}
-type papodT struct{
- auth cracha.IAuth
- queue q.IQueue
- db *sql.DB
- queries queriesT
+type messageParamsT struct{
+ middle []string
+ trailing string
}
-type consumerT struct{
- topic string
- handlerFn func(papodT) func(q.Message) error
+type messageT struct{
+ prefix string
+ command string
+ params messageParamsT
+ raw string
}
-type connectionT struct {
- conn net.Conn
- replyChan chan string
- lastReadFrom time.Time
- lastWrittenTo time.Time
- // id *UUID
- id string
- isAuthenticated bool
+type replyT struct{
+ command string
+ params messageParamsT
}
-type userT2 struct {
- connections []connectionT
+type listenersT struct{
+ daemon net.Listener
+ commander net.Listener
+ close func() error
}
-type stateT struct {
- users map[string]*userT
+type consumerT struct{
+ topic string
+ name string
+ // FIXME: use generic to avoid circular reference?
+ handlerFn func(papodT) func(fiinha.Message) error
}
-type contextT struct {
- db *sql.DB
- state stateT
- tx chan int
+type connectionT struct{
+ conn net.Conn
+ uuid guuid.UUID
+ user *userT
}
-type messageParamsT struct {
- middle []string
- trailing string
+type receiverT struct{
+ send func(messageT)
+ close func()
}
-type messageT struct {
- prefix string
- command string
- params messageParamsT
- raw string
+type receiversT struct{
+ add func(receiverT)
+ remove func(receiverT)
+ get func(guuid.UUID) []receiverT
+ close func()
}
-type actionType int
-const (
- actionReply actionType = iota
-)
-
-type action interface {
- typeOf() actionType
+type metricsT struct{
+ activeConnections g.Gauge
+ nicksInChannel g.Gauge
+ sendToClientError func(...any)
+ receivedMessage func(...any)
+ sentReply func(...any)
}
-type replyT struct {
- prefix string
- command int
- params messageParamsT
+type papodT struct{
+ auth cracha.IAuth
+ queue fiinha.IQueue
+ queries queriesT
+ listeners listenersT
+ consumers []consumerT
+ receivers receiversT
+ metrics metricsT
+ // logger g.Logger
}
type IPapod interface{
+ Start() error
+ Close() error
}
@@ -304,11 +322,11 @@ func inTx(db *sql.DB, fn func(*sql.Tx) error) error {
/// "papod_users".uuid is the same as cracha_users.uuid. Not a foreign key to
/// allow them to live in different physical locations. Adding it here feels
/// less like an optimization related decision, and more of a coupling one. The
-/// way that New() works now uses the same databasePath for the q.IQueue *and*
-/// cracha.IAuth, but cracha in no way exposes where it stores the user UUID or
-/// how the it is handled. This has similarities to how events here don't
-/// reference the q.Message.ID via foreign keys either. They're treated only as
-/// opaque IDs.
+/// way that New() works now uses the same databasePath for the fiinha.IQueue
+/// *and* cracha.IAuth, but cracha in no way exposes where it stores the user
+/// UUID or how the it is handled. This has similarities to how events here
+/// don't reference the fiinha.Message.ID via foreign keys either. They're
+/// treated only as opaque IDs.
func createTablesSQL(prefix string) queryT {
const tmpl_write = `
-- FIXME: unconfirmed premise: statements within a trigger are
@@ -326,88 +344,88 @@ func createTablesSQL(prefix string) queryT {
username TEXT NOT NULL,
display_name TEXT NOT NULL,
picture_uuid BLOB UNIQUE,
- deleted INT NOT NULL
+ deleted INT NOT NULL CHECK(deleted IN (0, 1))
) STRICT;
- CREATE TABLE IF NOT EXISTS "%s_user_changes" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (%s),
- user_id INTEGER NOT NULL REFERENCES "%s_users"(id),
- attribute TEXT NOT NULL CHECK(
- attribute IN (
- 'username',
- 'display_name',
- 'picture_uuid',
- 'deleted'
- )
- ),
- value TEXT NOT NULL,
- op INT NOT NULL CHECK(op IN (0, 1))
- ) STRICT;
- CREATE TRIGGER IF NOT EXISTS "%s_user_creation"
- AFTER INSERT ON "%s_users"
- BEGIN
- INSERT INTO "%s_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'username', NEW.username, true),
- (NEW.id, 'display_name', NEW.display_name, true),
- (NEW.id, 'deleted', NEW.deleted, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "%s_user_creation_picture_uuid"
- AFTER INSERT ON "%s_users"
- WHEN NEW.picture_uuid != NULL
- BEGIN
- INSERT INTO "%s_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "%s_user_update_username"
- AFTER UPDATE ON "%s_users"
- WHEN OLD.username != NEW.username
- BEGIN
- INSERT INTO "%s_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'username', OLD.username, false),
- (NEW.id, 'username', NEW.username, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "%s_user_update_display_name"
- AFTER UPDATE ON "%s_users"
- WHEN OLD.display_name != NEW.display_name
- BEGIN
- INSERT INTO "%s_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'display_name', OLD.display_name, false),
- (NEW.id, 'display_name', NEW.display_name, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "%s_user_update_picture_uuid"
- AFTER UPDATE ON "%s_users"
- WHEN OLD.picture_uuid != NEW.picture_uuid
- BEGIN
- INSERT INTO "%s_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'picture_uuid', OLD.picture_uuid, false),
- (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "%s_user_update_deleted"
- AFTER UPDATE ON "%s_users"
- WHEN OLD.deleted != NEW.deleted
- BEGIN
- INSERT INTO "%s_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'deleted', OLD.deleted, false),
- (NEW.id, 'deleted', NEW.deleted, true)
- ;
- END;
+-- CREATE TABLE IF NOT EXISTS "%s_user_changes" (
+-- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+-- timestamp TEXT NOT NULL DEFAULT (%s),
+-- user_id INTEGER NOT NULL REFERENCES "%s_users"(id),
+-- attribute TEXT NOT NULL CHECK(
+-- attribute IN (
+-- 'username',
+-- 'display_name',
+-- 'picture_uuid',
+-- 'deleted'
+-- )
+-- ),
+-- value TEXT NOT NULL,
+-- op INT NOT NULL CHECK(op IN (0, 1))
+-- ) STRICT;
+-- CREATE TRIGGER IF NOT EXISTS "%s_user_creation"
+-- AFTER INSERT ON "%s_users"
+-- BEGIN
+-- INSERT INTO "%s_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'username', NEW.username, true),
+-- (NEW.id, 'display_name', NEW.display_name, true),
+-- (NEW.id, 'deleted', NEW.deleted, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "%s_user_creation_picture_uuid"
+-- AFTER INSERT ON "%s_users"
+-- WHEN NEW.picture_uuid != NULL
+-- BEGIN
+-- INSERT INTO "%s_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "%s_user_update_username"
+-- AFTER UPDATE ON "%s_users"
+-- WHEN OLD.username != NEW.username
+-- BEGIN
+-- INSERT INTO "%s_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'username', OLD.username, false),
+-- (NEW.id, 'username', NEW.username, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "%s_user_update_display_name"
+-- AFTER UPDATE ON "%s_users"
+-- WHEN OLD.display_name != NEW.display_name
+-- BEGIN
+-- INSERT INTO "%s_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'display_name', OLD.display_name, false),
+-- (NEW.id, 'display_name', NEW.display_name, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "%s_user_update_picture_uuid"
+-- AFTER UPDATE ON "%s_users"
+-- WHEN OLD.picture_uuid != NEW.picture_uuid
+-- BEGIN
+-- INSERT INTO "%s_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'picture_uuid', OLD.picture_uuid, false),
+-- (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "%s_user_update_deleted"
+-- AFTER UPDATE ON "%s_users"
+-- WHEN OLD.deleted != NEW.deleted
+-- BEGIN
+-- INSERT INTO "%s_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'deleted', OLD.deleted, false),
+-- (NEW.id, 'deleted', NEW.deleted, true)
+-- ;
+-- END;
CREATE TABLE IF NOT EXISTS "%s_networks" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -506,7 +524,7 @@ func createTablesSQL(prefix string) queryT {
UNIQUE (channel_id, member_id)
) STRICT;
- -- FIXME: create table for connections?
+ -- FIXME: create database table for connections?
-- A user can have multiple sessions (different browsers,
-- mobile, etc.), and each session has multiple connections, as
-- the user connects and disconnections using the same session
@@ -608,12 +626,11 @@ func createUserSQL(prefix string) queryT {
}
func createUserStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(newUserT) (userT, error), func() error, error) {
- q := createUserSQL(prefix)
+ q := createUserSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -665,12 +682,11 @@ func userByUUIDSQL(prefix string) queryT {
}
func userByUUIDStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(guuid.UUID) (userT, error), func() error, error) {
- q := userByUUIDSQL(prefix)
+ q := userByUUIDSQL(cfg.prefix)
- readStmt, err := db.Prepare(q.read)
+ readStmt, err := cfg.shared.Prepare(q.read)
if err != nil {
return nil, nil, err
}
@@ -728,12 +744,11 @@ func updateUserSQL(prefix string) queryT {
}
func updateUserStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT) error, func() error, error) {
- q := updateUserSQL(prefix)
+ q := updateUserSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -770,12 +785,11 @@ func deleteUserSQL(prefix string) queryT {
}
func deleteUserStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(guuid.UUID) error, func() error, error) {
- q := deleteUserSQL(prefix)
+ q := deleteUserSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -820,21 +834,22 @@ func addNetworkSQL(prefix string) queryT {
) RETURNING id, timestamp;
`
return queryT{
- write: fmt.Sprintf(tmpl_write, prefix, prefix),
+ write: fmt.Sprintf(tmpl_write, prefix, prefix, prefix, prefix),
}
}
func addNetworkStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT, newNetworkT) (networkT, error), func() error, error) {
- q := addNetworkSQL(prefix)
+ q := addNetworkSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ privateDB, err := sql.Open(golite.DriverName, cfg.dbpath)
if err != nil {
return nil, nil, err
}
+ writeFn, writeFnClose := execSerialized(q.write, privateDB)
+
fn := func(
user userT,
newNetwork newNetworkT,
@@ -847,11 +862,26 @@ func addNetworkStmt(
type_: newNetwork.type_,
}
+ err := writeFn(
+ newNetwork.uuid[:],
+ newNetwork.name,
+ newNetwork.description,
+ newNetwork.type_,
+ user.id,
+ )
+ if err != nil {
+ return networkT{}, err
+ }
+
+ return network, nil
+
+ /*
member := memberT{
}
var timestr string
{
+ FIXME
rows, err := writeStmt.Query(
newNetwork.uuid[:],
newNetwork.name,
@@ -913,11 +943,15 @@ func addNetworkStmt(
}
}
}
+ */
+ }
- return network, nil
+ closeFn := func() error {
+ writeFnClose()
+ return privateDB.Close()
}
- return fn, writeStmt.Close, nil
+ return fn, closeFn, nil
}
func getNetworkSQL(prefix string) queryT {
@@ -971,12 +1005,11 @@ func getNetworkSQL(prefix string) queryT {
}
func getNetworkStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT, guuid.UUID) (networkT, error), func() error, error) {
- q := getNetworkSQL(prefix)
+ q := getNetworkSQL(cfg.prefix)
- readStmt, err := db.Prepare(q.read)
+ readStmt, err := cfg.shared.Prepare(q.read)
if err != nil {
return nil, nil, err
}
@@ -1051,7 +1084,7 @@ func networkEach(rows *sql.Rows, callback func(networkT) error) error {
func networksSQL(prefix string) queryT {
const tmpl_read = `
- -- FIXME
+ -- FIXME %s
`
return queryT{
read: fmt.Sprintf(tmpl_read, prefix),
@@ -1059,12 +1092,11 @@ func networksSQL(prefix string) queryT {
}
func networksStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT) (*sql.Rows, error), func() error, error) {
- q := networksSQL(prefix)
+ q := networksSQL(cfg.prefix)
- readStmt, err := db.Prepare(q.read)
+ readStmt, err := cfg.shared.Prepare(q.read)
if err != nil {
return nil, nil, err
}
@@ -1078,6 +1110,7 @@ func networksStmt(
func setNetworkSQL(prefix string) queryT {
const tmpl_write = `
+ -- FIXME %s
`
return queryT{
write: fmt.Sprintf(tmpl_write, prefix),
@@ -1085,12 +1118,11 @@ func setNetworkSQL(prefix string) queryT {
}
func setNetworkStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT, networkT) error, func() error, error) {
- q := setNetworkSQL(prefix)
+ q := setNetworkSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1105,6 +1137,7 @@ func setNetworkStmt(
func nipNetworkSQL(prefix string) queryT {
const tmpl_write = `
+ -- FIXME %s
`
return queryT{
write: fmt.Sprintf(tmpl_write, prefix),
@@ -1112,12 +1145,11 @@ func nipNetworkSQL(prefix string) queryT {
}
func nipNetworkStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT, guuid.UUID) error, func() error, error) {
- q := nipNetworkSQL(prefix)
+ q := nipNetworkSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1133,20 +1165,19 @@ func nipNetworkStmt(
func addMemberSQL(prefix string) queryT {
const tmpl_write = `
- -- FIXME
+ -- FIXME %s
`
return queryT{
- write: fmt.Sprintf(tmpl_write),
+ write: fmt.Sprintf(tmpl_write, prefix),
}
}
func addMemberStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT, networkT, newMemberT) (memberT, error), func() error, error) {
- q := addMemberSQL(prefix)
+ q := addMemberSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1181,6 +1212,7 @@ func addMemberStmt(
func showMemberSQL(prefix string) queryT {
const tmpl_read = `
+ -- FIXME %s
`
return queryT{
read: fmt.Sprintf(tmpl_read, prefix),
@@ -1188,12 +1220,11 @@ func showMemberSQL(prefix string) queryT {
}
func showMemberStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT, guuid.UUID) (memberT, error), func() error, error) {
- q := showMemberSQL(prefix)
+ q := showMemberSQL(cfg.prefix)
- readStmt, err := db.Prepare(q.read)
+ readStmt, err := cfg.shared.Prepare(q.read)
if err != nil {
return nil, nil, err
}
@@ -1260,7 +1291,7 @@ func memberEach(rows *sql.Rows, callback func(memberT) error) error {
func membersSQL(prefix string) queryT {
const tmpl_read = `
- -- FIXME
+ -- FIXME %s
`
return queryT{
read: fmt.Sprintf(tmpl_read, prefix),
@@ -1268,12 +1299,11 @@ func membersSQL(prefix string) queryT {
}
func membersStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT, guuid.UUID) (*sql.Rows, error), func() error, error) {
- q := membersSQL(prefix)
+ q := membersSQL(cfg.prefix)
- readStmt, err := db.Prepare(q.read)
+ readStmt, err := cfg.shared.Prepare(q.read)
if err != nil {
return nil, nil, err
}
@@ -1287,6 +1317,7 @@ func membersStmt(
func editMemberSQL(prefix string) queryT {
const tmpl_write = `
+ -- FIXME %s
`
return queryT{
write: fmt.Sprintf(tmpl_write, prefix),
@@ -1294,12 +1325,11 @@ func editMemberSQL(prefix string) queryT {
}
func editMemberStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT, memberT) error, func() error, error) {
- q := editMemberSQL(prefix)
+ q := editMemberSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1314,6 +1344,7 @@ func editMemberStmt(
func dropMemberSQL(prefix string) queryT {
const tmpl_write = `
+ -- FIXME
`
return queryT{
write: fmt.Sprintf(tmpl_write),
@@ -1321,12 +1352,11 @@ func dropMemberSQL(prefix string) queryT {
}
func dropMemberStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(userT, guuid.UUID) error, func() error, error) {
- q := dropMemberSQL(prefix)
+ q := dropMemberSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1351,12 +1381,11 @@ func addChannelSQL(prefix string) queryT {
}
func addChannelStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func (guuid.UUID, newChannelT) (channelT, error), func() error, error) {
- q := addChannelSQL(prefix)
+ q := addChannelSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1434,7 +1463,7 @@ func channelEach(rows *sql.Rows, callback func(channelT) error) error {
func channelsSQL(prefix string) queryT {
const tmpl_read = `
- -- FIXME
+ -- FIXME %s
`
return queryT{
read: fmt.Sprintf(tmpl_read, prefix),
@@ -1442,12 +1471,11 @@ func channelsSQL(prefix string) queryT {
}
func channelsStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(guuid.UUID) (*sql.Rows, error), func() error, error) {
- q := channelsSQL(prefix)
+ q := channelsSQL(cfg.prefix)
- readStmt, err := db.Prepare(q.read)
+ readStmt, err := cfg.shared.Prepare(q.read)
if err != nil {
return nil, nil, err
}
@@ -1461,6 +1489,7 @@ func channelsStmt(
func topicSQL(prefix string) queryT {
const tmpl_write = `
+ -- FIXME %s
`
return queryT{
write: fmt.Sprintf(tmpl_write, prefix),
@@ -1468,12 +1497,11 @@ func topicSQL(prefix string) queryT {
}
func topicStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(channelT) error, func() error, error) {
- q := topicSQL(prefix)
+ q := topicSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1488,6 +1516,7 @@ func topicStmt(
func endChannelSQL(prefix string) queryT {
const tmpl_write = `
+ -- FIXME %s
`
return queryT{
write: fmt.Sprintf(tmpl_write, prefix),
@@ -1495,12 +1524,11 @@ func endChannelSQL(prefix string) queryT {
}
func endChannelStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(guuid.UUID) error, func() error, error) {
- q := endChannelSQL(prefix)
+ q := endChannelSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1515,7 +1543,7 @@ func endChannelStmt(
func joinSQL(prefix string) queryT {
const tmpl_write = `
- -- FIXME
+ -- FIXME %s
`
return queryT{
write: fmt.Sprintf(tmpl_write, prefix),
@@ -1523,12 +1551,11 @@ func joinSQL(prefix string) queryT {
}
func joinStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(guuid.UUID, guuid.UUID) error, func() error, error) {
- q := joinSQL(prefix)
+ q := joinSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1543,7 +1570,7 @@ func joinStmt(
func partSQL(prefix string) queryT {
const tmpl_write = `
- -- FIXME
+ -- FIXME %s
`
return queryT{
write: fmt.Sprintf(tmpl_write, prefix),
@@ -1551,12 +1578,11 @@ func partSQL(prefix string) queryT {
}
func partStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(guuid.UUID, guuid.UUID) error, func() error, error) {
- q := partSQL(prefix)
+ q := partSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1606,7 +1632,7 @@ func nameEach(rows *sql.Rows, callback func(memberT) error) error {
func namesSQL(prefix string) queryT {
const tmpl_read = `
- -- FIXME
+ -- FIXME %s
`
return queryT{
read: fmt.Sprintf(tmpl_read, prefix),
@@ -1614,12 +1640,11 @@ func namesSQL(prefix string) queryT {
}
func namesStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func(guuid.UUID) (*sql.Rows, error), func() error, error) {
- q := namesSQL(prefix)
+ q := namesSQL(cfg.prefix)
- readStmt, err := db.Prepare(q.read)
+ readStmt, err := cfg.shared.Prepare(q.read)
if err != nil {
return nil, nil, err
}
@@ -1649,12 +1674,11 @@ func addEventSQL(prefix string) queryT {
}
func addEventStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func (newEventT) (eventT, error), func() error, error) {
- q := addEventSQL(prefix)
+ q := addEventSQL(cfg.prefix)
- writeStmt, err := db.Prepare(q.write)
+ writeStmt, err := cfg.shared.Prepare(q.write)
if err != nil {
return nil, nil, err
}
@@ -1783,12 +1807,11 @@ func allAfterSQL(prefix string) queryT {
}
func allAfterStmt(
- db *sql.DB,
- prefix string,
+ cfg dbconfigT,
) (func (guuid.UUID) (*sql.Rows, error), func() error, error) {
- q := allAfterSQL(prefix)
+ q := allAfterSQL(cfg.prefix)
- readStmt, err := db.Prepare(q.read)
+ readStmt, err := cfg.shared.Prepare(q.read)
if err != nil {
return nil, nil, err
}
@@ -1800,66 +1823,81 @@ func allAfterStmt(
return fn, readStmt.Close, nil
}
+func logMessageSQL(prefix string) queryT{
+ const tmpl_write = `
+ -- FIXME %s
+ `
+ return queryT{
+ write: fmt.Sprintf(tmpl_write, prefix),
+ }
+}
+
+func logMessageStmt(
+ cfg dbconfigT,
+) (func(userT, messageT) error, func() error, error) {
+ q := logMessageSQL(cfg.prefix)
+
+ writeStmt, err := cfg.shared.Prepare(q.write)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ fn := func(user userT, message messageT) error {
+ return nil // FIXME
+ _, err := writeStmt.Exec(user, message)
+ return err
+ }
+
+ return fn, writeStmt.Close, nil
+}
+
func initDB(
- db *sql.DB,
+ dbpath string,
prefix string,
) (queriesT, error) {
- createTablesErr := createTables(db, prefix)
- createUser, createUserClose, createUserErr := createUserStmt(db, prefix)
- userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(db, prefix)
- updateUser, updateUserClose, updateUserErr := updateUserStmt(db, prefix)
- deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(db, prefix)
- addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix)
- getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(db, prefix)
- networks, networksClose, networksErr := networksStmt(db, prefix)
- setNetwork, setNetworkClose, setNetworkErr := setNetworkStmt(db, prefix)
- nipNetwork, nipNetworkClose, nipNetworkErr := nipNetworkStmt(db, prefix)
- addMember, addMemberClose, addMemberErr := addMemberStmt(db, prefix)
- showMember, showMemberClose, showMemberErr := showMemberStmt(db, prefix)
- members, membersClose, membersErr := membersStmt(db, prefix)
- editMember, editMemberClose, editMemberErr := editMemberStmt(db, prefix)
- dropMember, dropMemberClose, dropMemberErr := dropMemberStmt(db, prefix)
- addChannel, addChannelClose, addChannelErr := addChannelStmt(db, prefix)
- channels, channelsClose, channelsErr := channelsStmt(db, prefix)
- topic, topicClose, topicErr := topicStmt(db, prefix)
- endChannel, endChannelClose, endChannelErr := endChannelStmt(db, prefix)
- join, joinClose, joinErr := joinStmt(db, prefix)
- part, partClose, partErr := partStmt(db, prefix)
- names, namesClose, namesErr := namesStmt(db, prefix)
- addEvent, addEventClose, addEventErr := addEventStmt(db, prefix)
- allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix)
-
- err := g.SomeError(
- createTablesErr,
- createUserErr,
- userByUUIDErr,
- updateUserErr,
- deleteUserErr,
- addNetworkErr,
- getNetworkErr,
- networksErr,
- setNetworkErr,
- nipNetworkErr,
- addMemberErr,
- showMemberErr,
- membersErr,
- editMemberErr,
- dropMemberErr,
- addChannelErr,
- channelsErr,
- topicErr,
- endChannelErr,
- joinErr,
- partErr,
- namesErr,
- addEventErr,
- allAfterErr,
- )
+ err := g.ValidateSQLTablePrefix(prefix)
if err != nil {
return queriesT{}, err
}
- close := func() error {
+ shared, err := sql.Open(golite.DriverName, dbpath)
+ if err != nil {
+ return queriesT{}, err
+ }
+
+ cfg := dbconfigT{
+ shared: shared,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+
+ createTablesErr := createTables(shared, prefix)
+ createUser, createUserClose, createUserErr := createUserStmt(cfg)
+ userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(cfg)
+ updateUser, updateUserClose, updateUserErr := updateUserStmt(cfg)
+ deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(cfg)
+ addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(cfg)
+ getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(cfg)
+ networks, networksClose, networksErr := networksStmt(cfg)
+ setNetwork, setNetworkClose, setNetworkErr := setNetworkStmt(cfg)
+ nipNetwork, nipNetworkClose, nipNetworkErr := nipNetworkStmt(cfg)
+ addMember, addMemberClose, addMemberErr := addMemberStmt(cfg)
+ showMember, showMemberClose, showMemberErr := showMemberStmt(cfg)
+ members, membersClose, membersErr := membersStmt(cfg)
+ editMember, editMemberClose, editMemberErr := editMemberStmt(cfg)
+ dropMember, dropMemberClose, dropMemberErr := dropMemberStmt(cfg)
+ addChannel, addChannelClose, addChannelErr := addChannelStmt(cfg)
+ channels, channelsClose, channelsErr := channelsStmt(cfg)
+ topic, topicClose, topicErr := topicStmt(cfg)
+ endChannel, endChannelClose, endChannelErr := endChannelStmt(cfg)
+ join, joinClose, joinErr := joinStmt(cfg)
+ part, partClose, partErr := partStmt(cfg)
+ names, namesClose, namesErr := namesStmt(cfg)
+ addEvent, addEventClose, addEventErr := addEventStmt(cfg)
+ allAfter, allAfterClose, allAfterErr := allAfterStmt(cfg)
+ logMessage, logMessageClose, logMessageErr := logMessageStmt(cfg)
+
+ closeFn := func() error {
return g.SomeFnError(
createUserClose,
userByUUIDClose,
@@ -1884,9 +1922,48 @@ func initDB(
namesClose,
addEventClose,
allAfterClose,
+ logMessageClose,
)
}
+ err = g.SomeError(
+ createTablesErr,
+ createUserErr,
+ userByUUIDErr,
+ updateUserErr,
+ deleteUserErr,
+ addNetworkErr,
+ getNetworkErr,
+ networksErr,
+ setNetworkErr,
+ nipNetworkErr,
+ addMemberErr,
+ showMemberErr,
+ membersErr,
+ editMemberErr,
+ dropMemberErr,
+ addChannelErr,
+ channelsErr,
+ topicErr,
+ endChannelErr,
+ joinErr,
+ partErr,
+ namesErr,
+ addEventErr,
+ allAfterErr,
+ logMessageErr,
+ )
+ if err != nil {
+ ferr := g.SomeError(
+
+ createUserErr,
+
+ )
+ fmt.Printf("ferr: %#v\n", ferr)
+ closeFn()
+ return queriesT{}, err
+ }
+
var connMutex sync.RWMutex
return queriesT{
createUser: func(a newUserT) (userT, error) {
@@ -2078,98 +2155,187 @@ func initDB(
return eventEach(rows, callback)
},
+ logMessage: func(a userT, b messageT) error {
+ connMutex.Lock()
+ defer connMutex.Unlock()
+ return logMessage(a, b)
+ },
close: func() error {
connMutex.Lock()
defer connMutex.Unlock()
- return close()
+ return closeFn()
},
}, nil
}
-var consumers = []consumerT{
+func newChannelHandler(papod papodT) func(fiinha.Message) error {
+ return func(message fiinha.Message) error {
+ return nil
+ }
}
-func registerConsumers(papod papodT, consumers []consumerT) {
- for _, consumer := range consumers {
- papod.queue.Subscribe(
- consumer.topic,
- defaultPrefix + "-" + consumer.topic,
- consumer.handlerFn(papod),
- )
+
+func buildConsumers(prefix string) []consumerT {
+ return []consumerT{
+ consumerT{
+ topic: NEW_CHANNEL,
+ name: prefix + NEW_CHANNEL,
+ handlerFn: newChannelHandler,
+ },
+ }
+}
+
+func unregisterConsumers(
+ unsubscribeFn func(string, string),
+ consumers []consumerT,
+) {
+ for _, c := range consumers {
+ unsubscribeFn(c.topic, c.name)
+ }
+}
+
+func registerConsumers(
+ subscribeFn func(string, string, func(fiinha.Message) error) error,
+ unsubscribeFn func(string, string),
+ papod papodT,
+ consumers []consumerT,
+) error {
+ for _, c := range consumers {
+ err := subscribeFn(c.topic, c.name, c.handlerFn(papod))
+ if err != nil {
+ unregisterConsumers(unsubscribeFn, consumers)
+ return err
+ }
}
+ return nil
}
-func NewWithPrefix(databasePath string, prefix string) (IPapod, error) {
- queue, err := q.NewWithPrefix(databasePath, prefix)
+func initListeners(
+ daemonSocketPath string,
+ commanderSocketPath string,
+) (listenersT, error) {
+ daemon, err := net.Listen("unix", daemonSocketPath)
if err != nil {
- return papodT{}, err
+ return listenersT{}, err
+ }
+
+ commander, err := net.Listen("unix", commanderSocketPath)
+ if err != nil {
+ daemon.Close()
+ return listenersT{}, err
+ }
+
+ return listenersT{
+ daemon: daemon,
+ commander: commander,
+ close: func() error {
+ return g.SomeError(
+ daemon.Close(),
+ commander.Close(),
+ )
+ },
+ }, nil
+}
+
+func makeReceivers() receiversT {
+ var rwmutex sync.Mutex
+ return receiversT{
+ add: func(receiver receiverT) {
+ },
+ remove: func(receiver receiverT) {
+ },
+ get: func(guuid.UUID) []receiverT{
+ return nil
+ },
+ close: func() {
+ rwmutex.Lock()
+ defer rwmutex.Unlock()
+ },
+ }
+}
+
+func buildMetrics(prefix string) metricsT {
+ return metricsT{
+ activeConnections: g.MakeGauge(
+ "active-connection",
+ "prefix", prefix,
+ ),
+ nicksInChannel: g.MakeGauge(
+ "nicks-in-channel",
+ "prefix", prefix,
+ ),
+ sendToClientError: g.MakeCounter(
+ "send-to-client-error",
+ "prefix", prefix,
+ ),
+ receivedMessage: g.MakeCounter(
+ "received-message",
+ "prefix", prefix,
+ ),
+ sentReply: g.MakeCounter(
+ "sent-reply",
+ "prefix", prefix,
+ ),
}
+}
- auth, err := cracha.NewWithPrefix(databasePath, prefix)
+func NewWithPrefix(
+ databasePath string,
+ prefix string,
+ daemonSocketPath string,
+ commanderSocketPath string,
+) (IPapod, error) {
+ queue, err := fiinha.New(databasePath)
if err != nil {
return papodT{}, err
}
- db, err := sql.Open(golite.DriverName, databasePath)
+ auth, err := cracha.New(databasePath)
if err != nil {
+ queue.Close()
return papodT{}, err
}
- err = g.ValidateSQLTablePrefix(prefix)
+ listeners, err := initListeners(daemonSocketPath, commanderSocketPath)
if err != nil {
+ queue.Close()
+ auth.Close()
return papodT{}, err
}
- queries, err := initDB(db, prefix)
+ queries, err := initDB(databasePath, prefix)
if err != nil {
+ queue.Close()
+ auth.Close()
+ listeners.close()
return papodT{}, err
}
+ consumers := buildConsumers(prefix)
+ receivers := makeReceivers()
+ metrics := buildMetrics(prefix)
+ // logger := g.NewLogger("prefix", prefix, "program", "papod")
+
return papodT{
- queue: queue,
- auth: auth,
- db: db,
- queries: queries,
+ queue: queue,
+ auth: auth,
+ queries: queries,
+ listeners: listeners,
+ consumers: consumers,
+ receivers: receivers,
+ metrics: metrics,
+ // logger: logger,
}, nil
}
-func New(databasePath string) (IPapod, error) {
- return NewWithPrefix(databasePath, defaultPrefix)
-}
-
-func (papod papodT) Close() error {
- return g.WrapErrors(
- papod.queries.close(),
- papod.db.Close(),
- papod.auth.Close(),
- papod.queue.Close(),
+func New(basePath string) (IPapod, error) {
+ return NewWithPrefix(
+ basePath + "/papod.db",
+ defaultPrefix,
+ basePath + "/papod.daemon.sock",
+ basePath + "/papod.commander.sock",
)
}
-
-
-
-
-
-
-
-
-
-
-// FIXME: reorder
-var emitActiveConnection = g.MakeGauge("active-connections")
-var emitNicksInChannel = g.MakeGauge("nicks-in-channel")
-var emitReceivedMessage = g.MakeCounter("received-message")
-var emitWriteToClientError = g.MakeCounter("write-to-client")
-
-const pingFrequency = time.Duration(30) * time.Second
-const pongMaxLatency = time.Duration(5) * time.Second
-
-var (
- cmdUSER = messageT{ command: "USER" }
- cmdPRIVMSG = messageT{ command: "PRIVMSG" }
- cmdJOIN = messageT{ command: "JOIN" }
-)
-
func splitOnCRLF(data []byte, _atEOF bool) (int, []byte, error) {
idx := bytes.Index(data, []byte { '\r', '\n' })
if idx == -1 {
@@ -2236,92 +2402,372 @@ func parseMessage(rawMessage string) (messageT, error) {
return msg, nil
}
-func handleUnknown(ctx *contextT, msg messageT) {
- g.Warning(
- "Unsupported command", "unsupported-command",
- "command", msg.command,
- )
- var r replyT = replyUnknown
- r.prefix = "dunno"
- // return []Action { r }
+func asNewEvent(msg messageT) newEventT {
+ return newEventT{}
}
-func handleUSER(ctx *contextT, msg messageT) {
- fmt.Printf("USER: %#v\n", msg)
+func joinEvent(member memberT) eventT {
+ return eventT{}
}
-func handlePRIVMSG(ctx *contextT, msg messageT) {
- // . assert no missing params
- // . write to DB: (after auth)
- // . channel timeline: message from $USER
- // . reply to $USER
- // . broadcast new timeline event to members of the channel
+func asMessage(event eventT) messageT {
+ return messageT{}
+}
- stmt, err := ctx.db.Prepare(`
- INSERT INTO messages
- (id, sender_id, body, timestamp)
- VALUES
- (?, ?, ?, ? );
- `)
- if err != nil {
- // FIXME: reply error
- fmt.Println("can't prepare: ", err)
- return
+func asReply(event eventT) replyT {
+ return replyT{}
+}
+
+func broadcastEvent(event eventT, receiversFn func(guuid.UUID) []receiverT) {
+ message := asMessage(event)
+ for _, receiver := range receiversFn(event.channelID) {
+ // FIXME:
+ // is this death by a thousand goroutines? Is the runtime
+ // able to handle the creation and destruction of hundreds of
+ // thousands of goroutines per second?
+ go receiver.send(message)
}
- defer stmt.Close()
+}
- ret, err := stmt.Exec(
- guuid.New().String(),
- "FIXME",
- "FIXME",
- time.Now(),
- )
+var (
+ replyErrUnknown = replyT{
+ command: "421",
+ params: messageParamsT{
+ middle: []string{},
+ trailing: "Unknown command",
+ },
+ }
+ replyErrNotRegistered = replyT{
+ command: "451",
+ params: messageParamsT{
+ middle: []string{},
+ trailing: "You have not registered",
+ },
+ }
+ replyErrFileError = replyT{
+ command: "424",
+ params: messageParamsT{
+ middle: []string{},
+ trailing: "File error doing query on database",
+ },
+ }
+ RPL_WELCOME = replyT{
+ command: "001",
+ params: messageParamsT{
+ middle: []string{},
+ trailing: "",
+ },
+ }
+)
+
+func handleUnknown(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ // FIXME: user doesn't exist when unauthenticated
+ err := papod.queries.logMessage(userT{ }, msg)
if err != nil {
- // FIXME: reply error
- fmt.Println("xablau can't prepare: ", err)
- return
+ g.Warning(
+ "Failed to log message", fmt.Sprintf("%#v", msg),
+ "group-as", "db-write",
+ "handler-action", "log-and-ignore",
+ "connection", connection.uuid.String(),
+ "err", err,
+ )
}
- fmt.Println("ret: ", ret)
+ return []replyT{ replyErrUnknown }, nil
}
-func handleJOIN(ctx *contextT, msg messageT) {
- fmt.Printf("JOIN: %#v\n", msg)
-
- // . write to DB: (after auth)
- // . $USER now in channel
- // . channel timeline: $USER joined
- // . reply to $USER
- // . broadcast new timeline event to members of the channel
+func handleUSER(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ u := connection.user.username
+ m := []string{ u }
+ return []replyT{ replyT{
+ command: "001",
+ params: messageParamsT{
+ middle: m,
+ trailing: "Welcome to the Internet Relay Network " + u,
+ },
+ }, replyT{
+ command: "002",
+ params: messageParamsT{
+ middle: m,
+ trailing: "Your host is FIXME, running version " +
+ Version,
+ },
+ }, replyT{
+ command: "003",
+ params: messageParamsT{
+ middle: m,
+ trailing: "This server was create FIXME",
+ },
+ }, replyT{
+ command: "004",
+ params: messageParamsT{
+ middle: m,
+ trailing: "FIXME " + Version + " i x",
+ },
+ }, }, nil
}
-func replyAnonymous() {
+func handleNICK(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ connection.user.username = msg.params.middle[0]
+ return []replyT{}, nil
}
-func persistMessage(msg messageT) {
+func handlePRIVMSG(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ // FIXME: check missing params
+ // FIXME: check if user is member of channel, and is authorized to post
+ // FIXME: adapt to handle multiple targets
+
+ return []replyT{}, nil
+
+ event, err := papod.queries.addEvent(asNewEvent(msg))
+ if err != nil {
+ // FIXME: not allowed reply per RFC 1459, check other specs
+ return []replyT{ replyErrFileError }, nil
+ }
+
+ go broadcastEvent(event, papod.receivers.get)
+
+ reply := asReply(event)
+ return []replyT{ reply }, nil
}
-func (reply replyT) typeOf() actionType {
- return actionReply
+func handleTOPIC(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ return []replyT{ replyT{
+ command: "JOIN",
+ params: messageParamsT{
+ middle: []string{ msg.params.middle[0] },
+ trailing: "",
+ },
+ } }, nil
+}
+
+func handleJOIN(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ u := connection.user.username
+ channel := msg.params.middle[0]
+ return []replyT{ replyT{
+ command: "JOIN",
+ params: messageParamsT{
+ middle: []string{ channel },
+ trailing: "",
+ },
+ }, replyT{
+ command: "331",
+ params: messageParamsT{
+ middle: []string{ u, channel },
+ trailing: "No topic is set",
+ },
+ }, replyT{
+ command: "353",
+ params: messageParamsT{
+ middle: []string{ u, "=", channel },
+ trailing: u + " virtualuser",
+ },
+ }, replyT{
+ command: "366",
+ params: messageParamsT{
+ middle: []string{ u, channel },
+ trailing: "End of NAMES list",
+ },
+ } }, nil
+
+
+ member, err := papod.queries.addMember(
+ *connection.user,
+ networkT{},
+ newMemberT{},
+ )
+ if err != nil {
+ // FIXME: not allowed per RFC 1459
+ return []replyT{ replyErrFileError }, nil
+ }
+ event := joinEvent(member)
+
+ papod.metrics.nicksInChannel.Inc()
+
+ go broadcastEvent(event, papod.receivers.get)
+
+ reply := asReply(event)
+ return []replyT{ reply }, nil
}
-var (
- replyUnknown = replyT{
- command: 421,
+func handleMODE(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ u := connection.user.username
+ channel := msg.params.middle[0]
+ return []replyT{ replyT{
+ command: "324",
+ params: messageParamsT{
+ middle: []string{ u, channel, "+Cnst" },
+ trailing: "",
+ },
+ } }, nil
+}
+
+func handleWHOIS(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ u := connection.user.username
+ user := msg.params.middle[0]
+ return []replyT{ replyT{
+ command: "311",
+ params: messageParamsT{
+ middle: []string{ u, user, user, "samehost", "*" },
+ trailing: "my real name is: " + user,
+ },
+ }, replyT{
+ command: "312",
+ params: messageParamsT{
+ middle: []string{ u, user, "stillsamehost" },
+ trailing: "some server info",
+ },
+ }, replyT{
+ command: "319",
+ params: messageParamsT{
+ middle: []string{ u, user },
+ trailing: "#default",
+ },
+ }, replyT{
+ command: "318",
+ params: messageParamsT{
+ middle: []string{ u, user },
+ trailing: "End of WHOIS list",
+ },
+ } }, nil
+}
+
+func handleAWAY(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ u := connection.user.username
+
+ if msg.params.trailing == "" {
+ return []replyT{ replyT{
+ command: "305",
+ params: messageParamsT{
+ middle: []string{ u },
+ trailing: "You are no longer marked as away",
+ },
+ } }, nil
+ } else {
+ return []replyT{ replyT{
+ command: "306",
+ params: messageParamsT{
+ middle: []string{ u },
+ trailing: "You have been marked as away",
+ },
+ } }, nil
+ }
+}
+
+func handlePING(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ return []replyT{ {
+ command: "PONG",
params: messageParamsT{
middle: []string{},
- trailing: "Unknown command",
+ trailing: msg.params.middle[0],
},
- }
-)
+ } }, nil
+}
-var commands = map[string]func(*contextT, messageT) {
- cmdUSER.command: handleUSER,
- cmdPRIVMSG.command: handlePRIVMSG,
- cmdJOIN.command: handleJOIN,
+func handleQUIT(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ connection.conn.Close()
+ return []replyT{}, nil
}
-func actionFnFor(command string) func(*contextT, messageT) {
+func handleCAP(
+ papod papodT,
+ connection *connectionT,
+ msg messageT,
+) ([]replyT, error) {
+ if msg.params.middle[0] == "END" {
+ return nil, nil
+ }
+
+ return []replyT{ replyT{
+ command: "CAP",
+ params: messageParamsT{
+ middle: []string { "*", "LS" },
+ trailing: "",
+ },
+ } }, nil
+}
+
+func authRequired(
+ fn func(papodT, *connectionT, messageT) ([]replyT, error),
+) func(papodT, *connectionT, messageT) ([]replyT, error) {
+ return func(
+ papod papodT,
+ connection *connectionT,
+ message messageT,
+ ) ([]replyT, error) {
+ if connection.user == nil {
+ return []replyT{ replyErrNotRegistered }, nil
+ }
+
+ return fn(papod, connection, message)
+ }
+}
+
+var commands = map[string]func(
+ papodT,
+ *connectionT,
+ messageT,
+) ([]replyT, error) {
+ "USER": handleUSER,
+ "NICK": handleNICK,
+ "QUIT": handleQUIT,
+ "CAP": handleCAP,
+ "AWAY": authRequired(handleAWAY),
+ "PRIVMSG": authRequired(handlePRIVMSG),
+ "PING": authRequired(handlePING),
+ "JOIN": authRequired(handleJOIN),
+ "MODE": authRequired(handleMODE),
+ "TOPIC": authRequired(handleTOPIC),
+ "WHOIS": authRequired(handleWHOIS),
+}
+
+func actionFnFor(
+ command string,
+) func(papodT, *connectionT, messageT) ([]replyT, error) {
fn := commands[command]
if fn != nil {
return fn
@@ -2330,244 +2776,221 @@ func actionFnFor(command string) func(*contextT, messageT) {
return handleUnknown
}
-func processMessage(ctx *contextT, connection *connectionT, rawMessage string) {
- connection.lastReadFrom = time.Now()
+func replyString(reply replyT) string {
+ if reply.params.trailing == "" {
+ return fmt.Sprintf(
+ "%s %s\r\n",
+ reply.command,
+ strings.Join(reply.params.middle, " "),
+ )
+ } else {
+ return fmt.Sprintf(
+ "%s %s :%s\r\n",
+ reply.command,
+ strings.Join(reply.params.middle, " "),
+ reply.params.trailing,
+ )
+ }
+}
+func processMessage(papod papodT, connection *connectionT, rawMessage string) {
msg, err := parseMessage(rawMessage)
if err != nil {
g.Info(
"Error processing message",
"process-message",
+ "text", rawMessage,
"err", err,
)
return
}
- if msg.command == cmdUSER.command {
- args := msg.params.middle
- if len(args) == 0 {
- go replyAnonymous()
- return
- }
- connection.id = args[0]
- connection.isAuthenticated = true
- }
-
- if !connection.isAuthenticated {
- go replyAnonymous()
- return
- }
-
- actionFnFor(msg.command)(ctx, msg)
-}
+ papod.metrics.receivedMessage(
+ "message", fmt.Sprintf("%#v", msg),
+ "text", rawMessage,
+ )
-func readLoop(ctx *contextT, connection *connectionT) {
- scanner := bufio.NewScanner(connection.conn)
- scanner.Split(splitOnRawMessage)
- for scanner.Scan() {
- processMessage(ctx, connection, scanner.Text())
+ var replyErrors []error
+ replies, actionErr := actionFnFor(msg.command)(papod, connection, msg)
+ for _, reply := range replies {
+ text := replyString(reply)
+ _, err = io.WriteString(connection.conn, text)
+ if err != nil {
+ replyErrors = append(replyErrors, err)
+ }
+ papod.metrics.sentReply(
+ "message", rawMessage,
+ "reply", fmt.Sprintf("%#v", reply),
+ "text", text,
+ )
}
-}
-func writeLoop(ctx *contextT, connection *connectionT) {
- for message := range connection.replyChan {
- _, err := io.WriteString(connection.conn, message)
- if err != nil {
- g.Error(
- "Failed to send data to user",
- "user-reply-error",
+ if actionErr != nil || len(replyErrors) != 0 {
+ if actionErr != nil {
+ g.Info(
+ "Handler returned error", "handler-error",
+ "from", "daemon",
"err", err,
)
- emitWriteToClientError()
- continue
}
- connection.lastWrittenTo = time.Now()
- }
-
- emitActiveConnection.Dec()
- connection.conn.Close()
-}
-
-func kill(ctx *contextT, connection *connectionT) {
- // lock?
- delete(ctx.state.users, connection.id)
- // unlock?
- close(connection.replyChan)
- connection.conn.Close() // Ignore errors?
-}
-
-const pingWindow = 30 * time.Second
-func pingLoop(ctx *contextT, connection *connectionT) {
- for {
- time.Sleep(pingWindow)
- if (time.Since(connection.lastReadFrom) <= pingWindow) {
- continue
- }
- window := connection.lastWrittenTo.Sub(connection.lastReadFrom)
- if (window <= pingWindow) {
- connection.replyChan <- "PING"
- continue
+ if len(replyErrors) != 0 {
+ g.Info(
+ "Failed to send reply", "send-reply-error",
+ "from", "daemon",
+ "err", replyErrors,
+ )
}
- kill(ctx, connection)
- break
+ // FIXME: Close the connection
}
}
-func handleConnection(ctx *contextT, conn net.Conn) {
- emitActiveConnection.Inc()
- // FIXME: WaitGroup here?
- now := time.Now()
- connection := connectionT {
+func handleConnection(papod papodT, conn net.Conn) {
+ connection := connectionT{
conn: conn,
- isAuthenticated: false,
- lastReadFrom: now,
- lastWrittenTo: now,
+ uuid: guuid.New(),
+ // user: nil, // FIXME: SASL shenanigan probably goes here
+ user: &userT{},
+ }
+ scanner := bufio.NewScanner(conn)
+ scanner.Split(splitOnRawMessage)
+ for scanner.Scan() {
+ processMessage(papod, &connection, scanner.Text())
}
- go readLoop(ctx, &connection)
- go writeLoop(ctx, &connection)
- go pingLoop(ctx, &connection)
}
-func serverLoop(ctx *contextT, publicSocketPath string) {
- listener, err := net.Listen("unix", publicSocketPath)
- g.FatalIf(err)
- g.Info("IRCd started", "component-up", "component", "ircd")
+func handleCommand(papod papodT, conn net.Conn) {
+ // FIXME
+}
+func daemonLoop(papod papodT) {
for {
- conn, err := listener.Accept()
+ conn, err := papod.listeners.daemon.Accept()
if err != nil {
+ if errors.Is(err, net.ErrClosed) {
+ break
+ }
+
g.Warning(
- "Error accepting a public IRCd connection",
+ "Error accepting daemon connection",
"accept-connection",
+ "from", "daemon",
"err", err,
)
- // conn.Close() // FIXME: is conn nil?
continue
}
// FIXME: where does it get closed
- go handleConnection(ctx, conn)
+ go handleConnection(papod, conn)
}
}
-func commandListenerLoop(ctx *contextT, commandSocketPath string) {
- listener, err := net.Listen("unix", commandSocketPath)
- g.FatalIf(err)
- g.Info(
- "command listener started",
- "component-up",
- "component", "command-listener",
- )
-
+func commanderLoop(papod papodT) {
for {
- conn, err := listener.Accept()
+ conn, err := papod.listeners.commander.Accept()
if err != nil {
+ if errors.Is(err, net.ErrClosed) {
+ break
+ }
+
g.Warning(
- "Error accepting a command connection",
- "accept-command",
+ "Error accepting commander connection",
+ "accept-connection",
+ "from", "commander",
"err", err,
)
continue
}
- defer conn.Close()
-
- // TODO: handle commands
+ go handleCommand(papod, conn)
}
}
-func transactorLoop(ctx *contextT) {
- g.Info("transactor started", "component-up", "component", "transactor")
- emitActiveConnection.Inc()
-
- for tx := range ctx.tx {
- fmt.Println(tx)
+func mkbgrun() (func(func()), func()) {
+ var wg sync.WaitGroup
+ bgrun := func(f func()) {
+ wg.Add(1)
+ go func() {
+ f()
+ wg.Done()
+ }()
}
+ return bgrun, wg.Wait
}
-func initDB2(databasePath string) *sql.DB {
- db, err := sql.Open(golite.DriverName, databasePath)
- g.FatalIf(err)
- return db
-}
-
-func start(ctx *contextT, publicSocketPath string, commandSocketPath string) {
- /*
- buildInfo, ok := debug.ReadBuildInfo()
- if !ok {
- g.Fatal(errors.New("error on debug.ReadBuildInfo()"))
+func (papod papodT) Start() error {
+ err := registerConsumers(
+ papod.queue.Subscribe,
+ papod.queue.Unsubscribe,
+ papod,
+ papod.consumers,
+ )
+ if err != nil {
+ return err
}
- */
- g.Info("-", "lifecycle-event",
+ // FIXME: papod.logger.Info
+ g.Info("Starting service", "lifecycle-event",
"event", "starting-server",
- /*
slog.Group(
- "go",
- "version", buildInfo.GoVersion,
- "settings", buildInfo.Settings,
- "deps", buildInfo.Deps,
+ "versions",
+ "gobang", g.Version,
+ "cracha", cracha.Version,
+ "fiinha", fiinha.Version,
+ "golite", golite.Version,
+ "guuid", guuid.Version,
+ "papod", Version,
+ "this", Version,
),
- */
)
- var wg sync.WaitGroup
- bgRun := func(f func()) {
- wg.Add(1)
- go func() {
- f()
- wg.Done()
- }()
- }
- bgRun(func() { serverLoop(ctx, publicSocketPath) })
- bgRun(func() { commandListenerLoop(ctx, commandSocketPath) })
- bgRun(func() { transactorLoop(ctx) })
- wg.Wait()
+ run, wait := mkbgrun()
+ run(func() { daemonLoop(papod) })
+ run(func() { commanderLoop(papod) })
+ wait()
+
+ return nil
}
-func buildContext(databasePath string) *contextT {
- db := initDB2(databasePath)
- tx := make(chan int, 100)
- return &contextT {
- db: db,
- tx: tx,
+func (papod papodT) Close() error {
+ // FIXME: does this wait for current handlers to wait? Well, it should.
+ unregisterConsumers(papod.queue.Unsubscribe, papod.consumers)
+ return g.WrapErrors(
+ papod.listeners.close(),
+ // papod.connCloser.closeAll(),
+ papod.auth.Close(),
+ papod.queue.Close(),
+ papod.queries.close(),
+ )
+}
+
+func basePathFrom(args []string) (string, error) {
+ if len(args) < 2 {
+ return os.Getwd()
+ } else {
+ return args[1], nil
}
}
-var (
- databasePath = flag.String(
- "f",
- "papod.db",
- "The path to the database file",
- )
- publicSocketPath = flag.String(
- "s",
- "papod.public.socket",
- "The path to the socket that handles the public traffic",
- )
- commandSocketPath = flag.String(
- "S",
- "papod.command.socket",
- "The path to the private IPC commands socket",
- )
-)
func Main() {
- g.Init(slog.Group(
- "versions",
- "cracha", cracha.Version,
- "q", q.Version,
- "golite", golite.Version,
- "guuid", guuid.Version,
- "gobang", g.Version,
- "papod", Version,
- "this", Version,
- ))
- flag.Parse()
- ctx := buildContext(*databasePath)
- start(ctx, *publicSocketPath, *commandSocketPath)
-}
-
-// FIXME: review usage of g.Fatal()
-// https://gist.github.com/xero/2d6e4b061b4ecbeb9f99
+ g.Init("program", "papod")
+
+ basePath, err := basePathFrom(os.Args)
+ if err != nil {
+ fmt.Printf("%v\n", err)
+ os.Exit(1)
+ }
+
+ ipapod, err := New(basePath)
+ if err != nil {
+ fmt.Printf("Failed to create papod: %v\n", err)
+ os.Exit(1)
+ }
+
+ err = ipapod.Start()
+ if err != nil {
+ fmt.Printf("Failed to start: %v\n", err)
+ os.Exit(1)
+ }
+}
diff --git a/tests/papod.go b/tests/papod.go
index 60e670c..42974f4 100644
--- a/tests/papod.go
+++ b/tests/papod.go
@@ -139,6 +139,7 @@ func test_createUserStmt() {
g.TestStart("createUserStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -146,7 +147,12 @@ func test_createUserStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- createUser, createUserClose, createUserErr := createUserStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ createUser, createUserClose, createUserErr := createUserStmt(cfg)
g.TErrorIf(createUserErr)
defer g.SomeFnError(
createUserClose,
@@ -230,6 +236,7 @@ func test_userByUUIDStmt() {
g.TestStart("userByUUIDStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -237,9 +244,14 @@ func test_userByUUIDStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- createUser, createUserClose, createUserErr := createUserStmt(db, prefix)
- userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(db, prefix)
- deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ createUser, createUserClose, createUserErr := createUserStmt(cfg)
+ userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(cfg)
+ deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(cfg)
g.TErrorIf(g.SomeError(
createUserErr,
deleteUserErr,
@@ -310,6 +322,7 @@ func test_updateUserStmt() {
g.TestStart("updateUserStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -317,10 +330,15 @@ func test_updateUserStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- createUser, createUserClose, createUserErr := createUserStmt(db, prefix)
- userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(db, prefix)
- updateUser, updateUserClose, updateUserErr := updateUserStmt(db, prefix)
- deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ createUser, createUserClose, createUserErr := createUserStmt(cfg)
+ userByUUID, userByUUIDClose, userByUUIDErr := userByUUIDStmt(cfg)
+ updateUser, updateUserClose, updateUserErr := updateUserStmt(cfg)
+ deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(cfg)
g.TErrorIf(g.SomeError(
createUserErr,
userByUUIDErr,
@@ -455,6 +473,7 @@ func test_deleteUserStmt() {
g.TestStart("deleteUserStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -462,8 +481,13 @@ func test_deleteUserStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- createUser, createUserClose, createUserErr := createUserStmt(db, prefix)
- deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ createUser, createUserClose, createUserErr := createUserStmt(cfg)
+ deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(cfg)
g.TErrorIf(g.SomeError(
createUserErr,
deleteUserErr,
@@ -507,6 +531,7 @@ func test_addNetworkStmt() {
g.TestStart("addNetworkStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -514,9 +539,14 @@ func test_addNetworkStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- createUser, createUserClose, createUserErr := createUserStmt(db, prefix)
- deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(db, prefix)
- addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ createUser, createUserClose, createUserErr := createUserStmt(cfg)
+ deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(cfg)
+ addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(cfg)
g.TErrorIf(g.SomeError(
createUserErr,
deleteUserErr,
@@ -650,6 +680,7 @@ func test_getNetworkStmt() {
g.TestStart("getNetworkStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -657,12 +688,17 @@ func test_getNetworkStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- createUser, createUserClose, createUserErr := createUserStmt(db, prefix)
- deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(db, prefix)
- addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix)
- getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(db, prefix)
- addMember, addMemberClose, addMemberErr := addMemberStmt(db, prefix)
- dropMember, dropMemberClose, dropMemberErr := dropMemberStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ createUser, createUserClose, createUserErr := createUserStmt(cfg)
+ deleteUser, deleteUserClose, deleteUserErr := deleteUserStmt(cfg)
+ addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(cfg)
+ getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(cfg)
+ addMember, addMemberClose, addMemberErr := addMemberStmt(cfg)
+ dropMember, dropMemberClose, dropMemberErr := dropMemberStmt(cfg)
g.TErrorIf(g.SomeError(
createUserErr,
deleteUserErr,
@@ -897,6 +933,7 @@ func test_networksStmt() {
g.TestStart("networksStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -904,8 +941,8 @@ func test_networksStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix)
- networks, networksClose, networksErr := networksStmt(db, prefix)
+ addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(cfg)
+ networks, networksClose, networksErr := networksStmt(cfg)
g.TErrorIf(g.SomeError(
addNetworkErr,
networksErr,
@@ -966,6 +1003,7 @@ func test_setNetworkStmt() {
g.TestStart("setNetworkStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -973,10 +1011,15 @@ func test_setNetworkStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- createUser, createUserClose, createUserErr := createUserStmt(db, prefix)
- addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix)
- getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(db, prefix)
- setNetwork, setNetworkClose, setNetworkErr := setNetworkStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ createUser, createUserClose, createUserErr := createUserStmt(cfg)
+ addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(cfg)
+ getNetwork, getNetworkClose, getNetworkErr := getNetworkStmt(cfg)
+ setNetwork, setNetworkClose, setNetworkErr := setNetworkStmt(cfg)
g.TErrorIf(g.SomeError(
createUserErr,
addNetworkErr,
@@ -1109,6 +1152,7 @@ func test_addMemberStmt() {
g.TestStart("addMember()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -1116,8 +1160,8 @@ func test_addMemberStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(db, prefix)
- addMember, addMemberClose, addMemberErr := addMemberStmt(db, prefix)
+ addNetwork, addNetworkClose, addNetworkErr := addNetworkStmt(cfg)
+ addMember, addMemberClose, addMemberErr := addMemberStmt(cfg)
g.TErrorIf(g.SomeError(
addNetworkErr,
addMemberErr,
@@ -1187,6 +1231,7 @@ func test_addChannelStmt() {
g.TestStart("addChannelStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -1194,8 +1239,13 @@ func test_addChannelStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- addChannel, addChannelClose, addChannelErr := addChannelStmt(db, prefix)
- channels, channelsClose, channelsErr := channelsStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ addChannel, addChannelClose, addChannelErr := addChannelStmt(cfg)
+ channels, channelsClose, channelsErr := channelsStmt(cfg)
g.TErrorIf(g.SomeError(
addChannelErr,
channelsErr,
@@ -1306,6 +1356,7 @@ func test_addEventStmt() {
g.TestStart("addEventStmt()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -1313,7 +1364,12 @@ func test_addEventStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- addEvent, addEventClose, addEventErr := addEventStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ addEvent, addEventClose, addEventErr := addEventStmt(cfg)
g.TErrorIf(addEventErr)
defer g.SomeFnError(
addEventClose,
@@ -1389,6 +1445,7 @@ func test_allAfterStmt() {
g.TestStart("allAfter()")
const (
+ dbpath = golite.InMemory
prefix = defaultPrefix
)
@@ -1396,9 +1453,14 @@ func test_allAfterStmt() {
g.TErrorIf(err)
g.TErrorIf(createTables(db, prefix))
- addChannel, addChannelClose, addChannelErr := addChannelStmt(db, prefix)
- addEvent, addEventClose, addEventErr := addEventStmt(db, prefix)
- allAfter, allAfterClose, allAfterErr := allAfterStmt(db, prefix)
+ cfg := dbconfigT{
+ shared: db,
+ dbpath: dbpath,
+ prefix: prefix,
+ }
+ addChannel, addChannelClose, addChannelErr := addChannelStmt(cfg)
+ addEvent, addEventClose, addEventErr := addEventStmt(cfg)
+ allAfter, allAfterClose, allAfterErr := allAfterStmt(cfg)
g.TErrorIf(g.SomeError(
addChannelErr,
addEventErr,
@@ -1485,6 +1547,18 @@ func test_allAfterStmt() {
// FIXME
}
+func test_logMessageStmt() {
+ g.TestStart("logMessageStmt()")
+
+ // FIXME
+
+ g.Testing("no error if closed more than once", func() {
+ g.TErrorIf(g.SomeError(
+ // FIXME
+ ))
+ })
+}
+
func test_initDB() {
// FIXME
}
@@ -1987,6 +2061,7 @@ func dumpQueries() {
{ "names", namesSQL },
{ "addEvent", addEventSQL },
{ "allAfter", allAfterSQL },
+ { "logMessage", logMessageSQL },
}
for _, query := range queries {
q := query.fn(defaultPrefix)
@@ -2039,6 +2114,7 @@ func MainTest() {
test_addEventStmt()
test_eventEach()
test_allAfterStmt()
+ test_logMessageStmt()
test_initDB()
test_queriesTclose()
test_splitOnCRLF()
diff --git a/tests/queries.sql b/tests/queries.sql
index 3aa6586..c996f02 100644
--- a/tests/queries.sql
+++ b/tests/queries.sql
@@ -16,88 +16,88 @@
username TEXT NOT NULL,
display_name TEXT NOT NULL,
picture_uuid BLOB UNIQUE,
- deleted INT NOT NULL
+ deleted INT NOT NULL CHECK(deleted IN (0, 1))
) STRICT;
- CREATE TABLE IF NOT EXISTS "papod_user_changes" (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
- user_id INTEGER NOT NULL REFERENCES "papod_users"(id),
- attribute TEXT NOT NULL CHECK(
- attribute IN (
- 'username',
- 'display_name',
- 'picture_uuid',
- 'deleted'
- )
- ),
- value TEXT NOT NULL,
- op INT NOT NULL CHECK(op IN (0, 1))
- ) STRICT;
- CREATE TRIGGER IF NOT EXISTS "papod_user_creation"
- AFTER INSERT ON "papod_users"
- BEGIN
- INSERT INTO "papod_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'username', NEW.username, true),
- (NEW.id, 'display_name', NEW.display_name, true),
- (NEW.id, 'deleted', NEW.deleted, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "papod_user_creation_picture_uuid"
- AFTER INSERT ON "papod_users"
- WHEN NEW.picture_uuid != NULL
- BEGIN
- INSERT INTO "papod_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "papod_user_update_username"
- AFTER UPDATE ON "papod_users"
- WHEN OLD.username != NEW.username
- BEGIN
- INSERT INTO "papod_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'username', OLD.username, false),
- (NEW.id, 'username', NEW.username, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "papod_user_update_display_name"
- AFTER UPDATE ON "papod_users"
- WHEN OLD.display_name != NEW.display_name
- BEGIN
- INSERT INTO "papod_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'display_name', OLD.display_name, false),
- (NEW.id, 'display_name', NEW.display_name, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "papod_user_update_picture_uuid"
- AFTER UPDATE ON "papod_users"
- WHEN OLD.picture_uuid != NEW.picture_uuid
- BEGIN
- INSERT INTO "papod_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'picture_uuid', OLD.picture_uuid, false),
- (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
- ;
- END;
- CREATE TRIGGER IF NOT EXISTS "papod_user_update_deleted"
- AFTER UPDATE ON "papod_users"
- WHEN OLD.deleted != NEW.deleted
- BEGIN
- INSERT INTO "papod_user_changes" (
- user_id, attribute, value, op
- ) VALUES
- (NEW.id, 'deleted', OLD.deleted, false),
- (NEW.id, 'deleted', NEW.deleted, true)
- ;
- END;
+-- CREATE TABLE IF NOT EXISTS "papod_user_changes" (
+-- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+-- timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f000000Z', 'now')),
+-- user_id INTEGER NOT NULL REFERENCES "papod_users"(id),
+-- attribute TEXT NOT NULL CHECK(
+-- attribute IN (
+-- 'username',
+-- 'display_name',
+-- 'picture_uuid',
+-- 'deleted'
+-- )
+-- ),
+-- value TEXT NOT NULL,
+-- op INT NOT NULL CHECK(op IN (0, 1))
+-- ) STRICT;
+-- CREATE TRIGGER IF NOT EXISTS "papod_user_creation"
+-- AFTER INSERT ON "papod_users"
+-- BEGIN
+-- INSERT INTO "papod_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'username', NEW.username, true),
+-- (NEW.id, 'display_name', NEW.display_name, true),
+-- (NEW.id, 'deleted', NEW.deleted, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "papod_user_creation_picture_uuid"
+-- AFTER INSERT ON "papod_users"
+-- WHEN NEW.picture_uuid != NULL
+-- BEGIN
+-- INSERT INTO "papod_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "papod_user_update_username"
+-- AFTER UPDATE ON "papod_users"
+-- WHEN OLD.username != NEW.username
+-- BEGIN
+-- INSERT INTO "papod_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'username', OLD.username, false),
+-- (NEW.id, 'username', NEW.username, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "papod_user_update_display_name"
+-- AFTER UPDATE ON "papod_users"
+-- WHEN OLD.display_name != NEW.display_name
+-- BEGIN
+-- INSERT INTO "papod_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'display_name', OLD.display_name, false),
+-- (NEW.id, 'display_name', NEW.display_name, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "papod_user_update_picture_uuid"
+-- AFTER UPDATE ON "papod_users"
+-- WHEN OLD.picture_uuid != NEW.picture_uuid
+-- BEGIN
+-- INSERT INTO "papod_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'picture_uuid', OLD.picture_uuid, false),
+-- (NEW.id, 'picture_uuid', NEW.picture_uuid, true)
+-- ;
+-- END;
+-- CREATE TRIGGER IF NOT EXISTS "papod_user_update_deleted"
+-- AFTER UPDATE ON "papod_users"
+-- WHEN OLD.deleted != NEW.deleted
+-- BEGIN
+-- INSERT INTO "papod_user_changes" (
+-- user_id, attribute, value, op
+-- ) VALUES
+-- (NEW.id, 'deleted', OLD.deleted, false),
+-- (NEW.id, 'deleted', NEW.deleted, true)
+-- ;
+-- END;
CREATE TABLE IF NOT EXISTS "papod_networks" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -196,7 +196,7 @@
UNIQUE (channel_id, member_id)
) STRICT;
- -- FIXME: create table for connections?
+ -- FIXME: create database table for connections?
-- A user can have multiple sessions (different browsers,
-- mobile, etc.), and each session has multiple connections, as
-- the user connects and disconnections using the same session
@@ -292,7 +292,7 @@
)
) RETURNING id, timestamp;
- INSERT INTO "%!s(MISSING)_members" (
+ INSERT INTO "papod_members" (
network_id, user_id, username, display_name,
picture_uuid, status, active_uniq
) VALUES (
@@ -300,7 +300,7 @@
?,
(
SELECT username, display_name, picture_uuid
- FROM "%!s(MISSING)_users"
+ FROM "papod_users"
WHERE id = ? AND deleted = false
),
'active',
@@ -345,24 +345,26 @@
-- write:
-- read:
- -- FIXME
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- setNetwork.sql:
-- write:
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- read:
-- nipNetwork.sql:
-- write:
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- read:
-- addMember.sql:
-- write:
- -- FIXME
+ -- FIXME papod
-- read:
@@ -371,23 +373,26 @@
-- write:
-- read:
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- members.sql:
-- write:
-- read:
- -- FIXME
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- editMember.sql:
-- write:
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- read:
-- dropMember.sql:
-- write:
+ -- FIXME
-- read:
@@ -405,32 +410,34 @@
-- write:
-- read:
- -- FIXME
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- topic.sql:
-- write:
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- read:
-- endChannel.sql:
-- write:
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- read:
-- join.sql:
-- write:
- -- FIXME
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- read:
-- part.sql:
-- write:
- -- FIXME
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- read:
@@ -438,8 +445,8 @@
-- write:
-- read:
- -- FIXME
- %!(EXTRA string=papod)
+ -- FIXME papod
+
-- addEvent.sql:
-- write:
@@ -483,3 +490,10 @@
SELECT channel_id FROM landmark_event
);
+
+-- logMessage.sql:
+-- write:
+ -- FIXME papod
+
+
+-- read: