diff options
Diffstat (limited to 'src/papo.go')
-rw-r--r-- | src/papo.go | 454 |
1 files changed, 443 insertions, 11 deletions
diff --git a/src/papo.go b/src/papo.go index aa55df3..413b902 100644 --- a/src/papo.go +++ b/src/papo.go @@ -1,28 +1,460 @@ package papo import ( + // "bufio" + "crypto/rand" "database/sql" + "encoding/hex" + "errors" + "flag" "fmt" - "log" + "io" + // "log" + "log/slog" + "math/big" + "net" "os" + "runtime/debug" + "sync" + "syscall" + "time" _ "github.com/mattn/go-sqlite3" ) -func Hello(name string) string { - message := fmt.Sprintf("Hi, %v. Welcome!", name) - return message + +/* Global variables */ +var ( + Hostname string + Version string + Colour string +) + + +// FIXME: finish rewriting +// +// lastV7time is the last time we returned stored as: +// +// 52 bits of time in milliseconds since epoch +// 12 bits of (fractional nanoseconds) >> 8 +var lastV7Time int64 +var timeMu sync.Mutex +// getV7Time returns the time in milliseconds and nanoseconds / 256. +// The returned (milli << (12 + seq)) is guaranteed to be greater than +// (milli << (12 + seq)) returned by any previous call to getV7Time. +// `seq` Sequence number is between 0 and 3906 (nanoPerMilli >> 8) +func getV7Time(nano int64) (int64, int64) { + const nanoPerMilli = 1000000 + + milli := nano / nanoPerMilli + seq := (nano - (milli * nanoPerMilli)) >> 8 + now := milli << (12 + seq) + + timeMu.Lock() + defer timeMu.Unlock() + if now <= lastV7Time { + now = lastV7Time + 1 + milli = now >> 12 + seq = now & 0xfff + } + lastV7Time = now + return milli, seq } -func Main() { - fmt.Println(Hello("papo")) +const lengthUUID = 16 + +type UUID struct { + bytes [lengthUUID]byte +} + +func NewUUID() UUID { + var buf [lengthUUID]byte + _, err := io.ReadFull(rand.Reader, buf[7:]) + FatalIf(err) + + buf[6] = (buf[6] & 0x0f) | 0x40 // Version 4 + buf[8] = (buf[8] & 0x3f) | 0x80 // Variant is 10 + + t, s := getV7Time(time.Now().UnixNano()) + + buf[0] = byte(t >> 40) + buf[1] = byte(t >> 32) + buf[2] = byte(t >> 24) + buf[3] = byte(t >> 16) + buf[4] = byte(t >> 8) + buf[5] = byte(t >> 0) + + buf[6] = 0x70 | (0x0f & byte(s >> 8)) + buf[7] = byte(s) + return UUID { bytes: buf } +} + +func UUIDToString(uuid UUID) string { + const dashCount = 4 + const encodedLength = (lengthUUID * 2) + dashCount + dst := [encodedLength]byte { + 0, 0, 0, 0, + 0, 0, 0, 0, + '-', + 0, 0, 0, 0, + '-', + 0, 0, 0, 0, + '-', + 0, 0, 0, 0, + '-', + 0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0, + } + + hex.Encode(dst[ 0:8], uuid.bytes[0:4]) + hex.Encode(dst[ 9:13], uuid.bytes[4:6]) + hex.Encode(dst[14:18], uuid.bytes[6:8]) + hex.Encode(dst[19:23], uuid.bytes[8:10]) + hex.Encode(dst[24:36], uuid.bytes[10:]) + + return string(dst[:]) +} + + +func Debug(message string, type_ string, args ...any) { + slog.Debug( + message, + append( + []any { + "id", UUIDToString(NewUUID()), + "kind", "log", + "type", type_, + }, + args..., + )..., + ) +} + +func Info(message string, type_ string, args ...any) { + slog.Info( + message, + append( + []any { + "id", UUIDToString(NewUUID()), + "kind", "log", + "type", type_, + }, + args..., + )..., + ) +} + +func Warning(message string, type_ string, args ...any) { + slog.Warn( + message, + append( + []any { + "id", UUIDToString(NewUUID()), + "kind", "log", + "type", type_, + }, + args..., + )..., + ) +} + +func Error(message string, type_ string, args ...any) { + slog.Error( + message, + append( + []any { + "id", UUIDToString(NewUUID()), + "kind", "log", + "type", type_, + }, + args..., + )..., + ) +} + +func Metric(type_ string, label string, args ...any) { + slog.Info( + "_", + append( + []any { + "id", UUIDToString(NewUUID()), + "kind", "metric", + "type", type_, + "label", label, + }, + args..., + )..., + ) +} + +type Gauge struct { + Inc func(...any) + Dec func(...any) +} + +var zero = big.NewInt(0) +var one = big.NewInt(1) +func MakeGauge(label string, staticArgs ...any) Gauge { + count := big.NewInt(0) + emitGauge := func(dynamicArgs ...any) { + if count.Cmp(zero) == -1 { + Error( + "Gauge went negative", + "process-metric", + append( + []any { "value", count }, + append( + staticArgs, + dynamicArgs..., + )..., + )..., + ) + return // avoid wrong metrics being emitted + } + Metric( + "gauge", label, + // TODO: we'll have slices.Concat on Go 1.22 + append( + []any { "value", count }, + append( + staticArgs, + dynamicArgs..., + )..., + )..., + ) + } + return Gauge { + Inc: func(dynamicArgs ...any) { + count.Add(count, one) + emitGauge(dynamicArgs...) + }, + Dec: func(dynamicArgs ...any) { + count.Sub(count, one) + emitGauge(dynamicArgs...) + }, + } +} + +func MakeCounter(label string) func(...any) { + return func(args ...any) { + Metric( + "counter", label, + append([]any { "value", 1 }, args...)..., + ) + } +} + +var EmitActiveConnection = MakeGauge("active-connections") +var EmitNicksInChannel = MakeGauge("nicks-in-channel") +var EmitReceivedCommand = MakeCounter("received-command") + +const pingFrequency = time.Duration(30) * time.Second +const pongMaxLatency = time.Duration(5) * time.Second - path := "./foo.db" - os.Remove(path) - db, err := sql.Open("sqlite3", path) +func Fatal(err error) { + slog.Error("fatal-error", "error", err, "stack", string(debug.Stack())) + syscall.Kill(os.Getpid(), syscall.SIGABRT) + os.Exit(3) +} + +func FatalIf(err error) { if err != nil { - log.Fatal(err) + Fatal(err) } - defer db.Close() +} + + +type Channel struct { +} + +type Context struct { + dbConn *sql.DB + tx chan int +} + +type Message struct { + Prefix string +} + +func ReadLoop(ctx *Context, conn net.Conn) { + fmt.Println("ReadLoop") +} + +func WriteLoop(ctx *Context, conn net.Conn) { + fmt.Println("WriteLoop") +} + +func PingLoop(ctx *Context, conn net.Conn) { + fmt.Println("PingLoop") +} + +func HandleConnection(ctx *Context, conn net.Conn) { + EmitActiveConnection.Inc() + // FIXME: WaitGroup here? + go ReadLoop(ctx, conn) + go WriteLoop(ctx, conn) + go PingLoop(ctx, conn) +} + +func IRCdLoop(ctx *Context, publicSocketPath string) { + listener, err := net.Listen("unix", publicSocketPath) + FatalIf(err) + Info("IRCd started", "component-up", "component", "ircd") + + for { + conn, err := listener.Accept() + if err != nil { + slog.Warn( + "Error accepting a public IRCd connection", + "err", err, + ) + // conn.Close() // FIXME: is conn nil? + continue + } + go HandleConnection(ctx, conn) // FIXME: where does it get closed + } +} + +func CommandListenerLoop(ctx *Context, commandSocketPath string) { + listener, err := net.Listen("unix", commandSocketPath) + FatalIf(err) + Info("command listener started", "component-up", "component", "command-listener") + + for { + conn, err := listener.Accept() + if err != nil { + slog.Warn( + "Error accepting a command connection", + "err", err, + ) + continue + } + defer conn.Close() + + // TODO: handle commands + } +} + +func TransactorLoop(ctx *Context) { + Info("transactor started", "component-up", "component", "transactor") + EmitActiveConnection.Inc() + + for tx := range ctx.tx { + fmt.Println(tx) + } +} + +func SetLoggerOutput(w io.Writer) { + slog.SetDefault(slog.New(slog.NewJSONHandler(w, &slog.HandlerOptions { + AddSource: true, + })).With( + slog.Group( + "info", + "pid", os.Getpid(), + "ppid", os.Getppid(), + "puuid", UUIDToString(NewUUID()), + ), + )) +} + +func SetTraceback() { + if os.Getenv("GOTRACEBACK") == "" { + debug.SetTraceback("crash") + } +} + +func SetHostname() { + var err error + Hostname, err = os.Hostname() + FatalIf(err) +} + +func SetEnvironmentVariables() { + Version = os.Getenv("PAPO_VERSION") + if Version == "" { + Version = "PAPO-VERSION-UNKNOWN" + } + + Colour = os.Getenv("PAPO_COLOUR") + if Colour == "" { + Colour = "PAPO-COLOUR-UNKNOWN" + } +} + +func InitDB(databasePath string) *sql.DB { + DB, err := sql.Open("sqlite3", databasePath) + FatalIf(err) + return DB +} + +func init() { + SetLoggerOutput(os.Stdout) + SetTraceback() + SetHostname() + SetEnvironmentVariables() +} + +func Start(ctx *Context, publicSocketPath string, commandSocketPath string) { + buildInfo, ok := debug.ReadBuildInfo() + if !ok { + Fatal(errors.New("error on debug.ReadBuildInfo()")) + } + + Info("-", "lifecycle-event", + "event", "starting-server", + slog.Group( + "go", + "version", buildInfo.GoVersion, + "settings", buildInfo.Settings, + "deps", buildInfo.Deps, + ), + ) + + var wg sync.WaitGroup + bgRun := func(f func()) { + wg.Add(1) + go func() { + f() + wg.Done() + }() + } + bgRun(func() { IRCdLoop(ctx, publicSocketPath) }) + bgRun(func() { CommandListenerLoop(ctx, commandSocketPath) }) + bgRun(func() { TransactorLoop(ctx) }) + wg.Wait() +} + +func BuildContext(databasePath string) *Context { + dbConn := InitDB(databasePath) + tx := make(chan int, 100) + return &Context { + dbConn, + tx, + } +} + +var ( + databasePath = flag.String( + "f", + "papo.db", + "The path to the database file", + ) + publicSocketPath = flag.String( + "s", + "papo.public.socket", + "The path to the socket that handles the public traffic", + ) + commandSocketPath = flag.String( + "S", + "papo.command.socket", + "The path to the private IPC commands socket", + ) +) + +func Main() { + flag.Parse() + ctx := BuildContext(*databasePath) + Start(ctx, *publicSocketPath, *commandSocketPath) } |