import assert from "node:assert/strict"; import child_process from "node:child_process"; import crypto from "node:crypto"; import fs from "node:fs"; import http from "node:http"; import process from "node:process"; import util from "node:util"; import * as u from "./utils.mjs"; export const loggerDefaults = { pid: process.pid, ppid: process.ppid, tool: "hero", }; export let loggerGlobals = {}; export const configLogger = o => loggerGlobals = o; export const logit = (writerFn, level, o) => writerFn(JSON.stringify({ ...loggerDefaults, ...loggerGlobals, level, ...o, })); export const makeLogger = (writerFn = console.error) => ({ debug: (...args) => process.env.DEBUG ? logit(writerFn, "DEBUG", ...args) : null, info: u.partial(logit, writerFn, "INFO"), warn: u.partial(logit, writerFn, "WARN"), error: u.partial(logit, writerFn, "ERROR"), }); export const log = makeLogger(); export const interceptorsFn = ({ uuidFn = crypto.randomUUID, logger = log, } = {}) => ({ requestId: (req, next) => next({ ...req, id: uuidFn() }), logged: async (req, next) => { const { id, url, method, upgrade } = req; logger.info({ id, url, method, upgrade, type: "in-request", }); const beforeDate = new Date(); const response = await next(req); const afterDate = new Date(); const { status } = response; const before = beforeDate.getTime(); const after = afterDate.getTime(); const duration = after - before; logger.info({ id, status, type: "in-response", timings: { ms: { before, after, duration }, }, }); return response; }, contentType: async (req, next) => { const response = await next(req); const { status, body, headers } = response; assert.equal(typeof status, "number"); const mappings = { string: () => [ "text/html", body ], undefined: () => [ "text/plain", http.STATUS_CODES[status] + "\n" ], FALLBACK: () => [ "application/json", JSON.stringify(body) ], }; const type = typeof body; assert.notEqual(type, "FALLBACK"); const [mimeType, renderedBody] = (mappings[type] || mappings.FALLBACK)(); return { ...response, body: renderedBody, headers: { "Content-Type": mimeType, "Content-Length": Buffer.byteLength(renderedBody), ...(response.headers || {}) }, }; }, serverError: async (req, next) => { try { const response = await next(req); assert.ok("status" in response, `Missing "status"`); return response; } catch (error) { logger.error({ id: req.id, type: "server-error-interceptor", message: error.message, stacktrace: error.stack }); return { status: 500, body: "Internal Server Error\n", }; } }, }); export const interceptors = interceptorsFn(); export const defaultInterceptors = [ interceptors.serverError, interceptors.contentType, interceptors.requestId, interceptors.logged, ]; export const chainInterceptors = arr => req => arr.length === 0 ? req : arr[0](req, chainInterceptors(arr.slice(1))); export const wrapHandler = (fn, arr) => arr.length === 0 ? fn : chainInterceptors(arr.concat([ (req, _next) => fn(req) ])); export const normalizeSegments = segments => segments.length === 1 && segments[0] === "" ? segments : segments.concat([""]); export const pathToSegments = path => normalizeSegments(path .replace(/^\/*/, "") .replace(/\/*$/, "") .replace(/\/+/, "/") .split("/")); export const hasPathParams = segments => segments.some(s => s.startsWith(":")); const HTTP_METHODS = new Set([ "GET", "HEAD", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", ]); const HTTP_METHODS_ARR = [...HTTP_METHODS.keys()].sort(u.strSortFn); const WEBSOCKET_LABEL = "WEBSOCKET"; const WEBSOCKET_METHOD = "GET"; const WEBSOCKET_KEYWORD = "websocket"; assert.ok(HTTP_METHODS.has(WEBSOCKET_METHOD)); export const isValidLabel = name => HTTP_METHODS.has(name) || name === WEBSOCKET_LABEL; export const comboForLabel = (label, keyword) => label === WEBSOCKET_LABEL ? [ WEBSOCKET_KEYWORD, WEBSOCKET_METHOD ] : [ keyword, label ]; export const addRoute = (table, methods, path, handlerFn) => { if (methods === "*") { return addRoute(table, HTTP_METHODS_ARR, path, handlerFn); } if (!Array.isArray(methods)) { return addRoute(table, [methods], path, handlerFn); } assert.ok(methods.every(isValidLabel)); const segments = pathToSegments(path); const kw = hasPathParams(segments) ? "dynamic" : "static"; return methods.reduce( (acc, el) => u.assocIn( acc, comboForLabel(el, kw).concat(segments), handlerFn, ), table, ); }; export const findStaticHandler = (table, method, segments) => { const handlerFn = u.getIn(table, ["static", method].concat(segments)); return !handlerFn ? null : { handlerFn, params: {} }; }; /** * "first" as is: * - depth-first, as we look for a full match and use it instead of searching * in parallel; * - the first param in the same level to show up alphabetically, e.g. * ":a-user" matches before ":id" does. */ export const firstParamMatch = (tree, segments, params) => { assert.notEqual(segments.length, 0); const [seg, ...nextSegments] = segments; if (segments.length === 1) { assert.equal(seg, ""); const handlerFn = tree[""]; return !handlerFn ? null : { handlerFn, params }; } const subtree = tree[seg]; if (subtree) { const submatch = firstParamMatch(subtree, nextSegments, params); /// propagation of the end of recursion if (submatch) { return submatch; } } /// literal matching failed, we now look for patterns that might match const paramOptions = Object.keys(tree) .filter(s => s.startsWith(":")) .sort(u.strSortFn); return u.findFirst(paramOptions, param => firstParamMatch(tree[param], nextSegments, { ...params, [param.slice(1)]: seg })); }; export const findDynamicHandler = (table, method, segments) => { const tree = table?.dynamic?.[method]; return !tree ? null : firstParamMatch(tree, segments, {}); }; export const findHandler = (table, method, path) => { const segments = pathToSegments(path); return ( findStaticHandler(table, method, segments) || findDynamicHandler(table, method, segments) ); }; export const extractQueryParams = s => { const ret = {}; for (const [k, v] of new URLSearchParams(s)) { ret[k] = v; } return ret; }; export const handle404 = _req => ({ status: 404, body: "Not Found\n", }); export const make404Handler = interceptors => ({ params: {}, handlerFn: wrapHandler(handle404, interceptors), }); export const handleRequest = async (table, reqHandle) => { const { method, url, headers, upgrade, socket } = reqHandle; const [ path, queryParams ] = url.split("?"); const { params, handlerFn } = ( findHandler(table, method, path) || make404Handler(table.interceptors) ); const request = { params: { path: params, query: extractQueryParams(queryParams), }, method, path, headers, upgrade, socket, handler: handlerFn, ref: reqHandle, }; return await handlerFn(request); }; export const emitHeaders = obj => Object.keys(obj) .sort(u.strSortFn) .map(name => `${name}: ${obj[name]}`) .join("\r\n"); const STATUS_CODES = { 404: "Not Found", 405: "Method Not Allowed", }; export const buildHttpPayload = (code, { headers = {}, message = `${STATUS_CODES[code]}\n`, } = {}) => `HTTP/1.1 ${code} ${STATUS_CODES[code]}\r\n` + emitHeaders({ "Connection": "close", "Content-Type": "text/plain; charset=UTF-8", "Content-Length": Buffer.byteLength(message), ...headers, }) + "\r\n\r\n" + message; export const fallback404Handler = (_req, socket) => socket.end(buildHttpPayload(404)).destroySoon(); export const fallback405Handler = (_req, socket) => socket.end(buildHttpPayload(405)).destroySoon(); export const handlerForConnection = (table, method, path) => { if (method !== WEBSOCKET_METHOD) { return fallback405Handler; } const segments = pathToSegments(path); const handlerFn = u.getIn(table, [WEBSOCKET_KEYWORD, WEBSOCKET_METHOD].concat(segments)); return handlerFn || fallback404Handler; }; export const makeUpgradeListener = table => async (req, socket, _head) => { const { method, url, headers } = req; const [ path, queryParams ] = url.split("?"); const handlerFn = handlerForConnection(table, method, path); const request = { params: { path: {}, query: extractQueryParams(queryParams), }, method, path, headers, handler: handlerFn, ref: req, }; return await handlerFn(request, socket); }; export const makeRequestListener = table => async (req, res) => { const { status, headers, body } = await handleRequest(table, { ...req, upgrade: false, socket: null, }); res.writeHead(status, headers); res.end(body); }; export const actionsFn = ({ logger = log, } = {}) => ({ "toggle-debug-env": action => { const before = process.env.DEBUG; if (process.env.DEBUG) { delete process.env.DEBUG; } else { process.env.DEBUG = "1"; } const after = process.env.DEBUG; logger.info({ type: "action-response", action, message: "toggle process.env.DEBUG", before: u.undefinedAsNull(before), after: u.undefinedAsNull(after), }); }, "config-dump": action => logger.info({ type: "action-response", action, data: { ...loggerDefaults, ...loggerGlobals, }, }), "ping": _ => logger.info({ message: "pong" }), }); export const actions = actionsFn(); export const lineHandlerFn = ({ logger = log, actionsMap = actions, } = {}) => line => { let cmd = null; try { cmd = JSON.parse(line); } catch (e) { logger.info({ type: "invalid-cmd-input", message: e.message, }); return; } if (typeof cmd?.action !== "string") { logger.info({ type: "missing-key-action", message: `missing the "action" key from the given object`, }); return; } const fn = actionsMap[cmd.action]; if (!fn) { logger.info({ type: "unsupported-action", message: `can't run given action: ${cmd.action}`, }); return; } return fn(cmd.action, ...(cmd?.args || [])); }; export const lineHandler = lineHandlerFn(); export const rmIf = path => { if (fs.existsSync(path)) { fs.unlinkSync(path); } }; export const mkfifo = path => child_process.execFileSync("mkfifo", [path]); export const makeLineEmitter = fn => { let data = ""; return chunk => { const segments = chunk.split("\n"); assert.ok(segments.length > 0); if (segments.length === 1) { data += segments[0]; return; } [ data + u.first(segments), ...u.butlast(u.rest(segments)), ].forEach(fn); data = u.last(segments); }; }; export const makeReopeningPipeReader = (shouldReopenPipe, path, { lineFn, logger, } = {}, out) => { const reader = fs.createReadStream(path, "UTF-8") out.ref = reader; reader.on("data", makeLineEmitter(lineFn)); reader.on("close", () => { if (shouldReopenPipe.ref) { logger.debug({ message: "pipe closed, reopening", }); makeReopeningPipeReader( shouldReopenPipe, path, { lineFn, logger }, out, ); return; } logger.debug({ message: "pipe closed, NOT reopening", }); }); }; export const makePipeReaderFn = ({ lineFn = lineHandler, logger = log, } = {}) => path => { mkfifo(path); let shouldReopenPipe = { ref: true }; const pipe = {}; makeReopeningPipeReader( shouldReopenPipe, path, { lineFn, logger }, pipe, ); return () => new Promise((resolve, reject) => { shouldReopenPipe.ref = false; fs.createWriteStream(path).end().close(); pipe.ref.on("close", resolve); }); }; export const makePipeReader = makePipeReaderFn(); export const buildRoutes = (routes, globalInterceptors = []) => routes.reduce( (acc, [methods, path, handlerFn, interceptors = []]) => addRoute( acc, methods, path, wrapHandler( handlerFn, globalInterceptors.concat(interceptors), ), ), {} ); export const buildTable = (routes, globalInterceptors = []) => u.assocIn( buildRoutes(routes, globalInterceptors), ["interceptors"], globalInterceptors, ); export const promisifyServer = (name, serverHandle, socket, pipe) => { let closePipeFn = null; return { ref: serverHandle, start: util.promisify((...args) => { assert.equal(typeof socket, "string"); assert.equal(typeof pipe, "string"); configLogger({ name }); rmIf(pipe); closePipeFn = makePipeReader(pipe); rmIf(socket); return serverHandle.listen(socket, ...args) }), stop: util.promisify(async (...args) => { await closePipeFn(); return serverHandle.close(...args); }), events: serverHandle, }; }; export const buildServer = ({ name, routes = [], socket = `${name}.socket`, pipe = `${name}.pipe`, globalInterceptors = defaultInterceptors, }) => { const table = buildTable(routes, globalInterceptors); const requestListener = makeRequestListener(table); const server = http.createServer(requestListener); return promisifyServer(name, server, socket, pipe); };