import assert from "node:assert/strict"; import child_process from "node:child_process"; import crypto from "node:crypto"; import fs from "node:fs"; import http from "node:http"; import process from "node:process"; import util from "node:util"; import * as u from "./utils.mjs"; export const loggerDefaults = { pid: process.pid, ppid: process.ppid, tool: "hero", }; export let loggerGlobals = {}; export const configLogger = o => loggerGlobals = o; export const logit = (writerFn, timestampFn, level, o) => writerFn(JSON.stringify({ ...loggerDefaults, ...loggerGlobals, level, timestamp: timestampFn(), ...o, })); const now = () => (new Date()).toISOString(); export const makeLogger = ({ writerFn = console.error, timestampFn = now, } = {}) => ({ debug: (...args) => process.env.DEBUG ? logit(writerFn, timestampFn, "DEBUG", ...args) : null, info: u.partial(logit, writerFn, timestampFn, "INFO"), warn: u.partial(logit, writerFn, timestampFn, "WARN"), error: u.partial(logit, writerFn, timestampFn, "ERROR"), }); export const log = makeLogger(); export const isValidMethod = method => method === "GET"; export const isValidUpgrade = val => val.toLowerCase() === "websocket"; export const isValidKey = key => /^[0-9a-zA-Z+/]{22}==$/.test(key); export const isValidVersion = n => n === 13; export const validateUpgrade = (method, headers) => { const upgrade = headers["upgrade"]; const key = headers["sec-websocket-key"]; const versionStr = headers["sec-websocket-version"]; const version = parseInt(versionStr); if (!isValidMethod(method)) { /// Unreachable by default, unless one is constructing tables /// manually. Otherwise `findHandler()` will return a 404 /// before the request gets here. return { isValid: false, response: { status: 405, }, }; } if (!upgrade) { return { isValid: false, response: { status: 400, body: 'Missing "Upgrade" header\n', }, }; } if (!isValidUpgrade(upgrade)) { return { isValid: false, response: { status: 400, body: 'Invalid "Upgrade" value\n', }, }; } if (!key) { return { isValid: false, response: { status: 400, body: 'Missing "Sec-WebSocket-Key" header\n', }, }; } if (!isValidKey(key)) { return { isValid: false, response: { status: 400, body: 'Invalid "Sec-WebSocket-Key" value\n', }, }; } if (!version) { return { isValid: false, response: { status: 400, body: 'Missing "Sec-WebSocket-Version" header\n', }, }; } if (!isValidVersion(version)) { return { isValid: false, response: { status: 400, body: 'Invalid "Sec-WebSocket-Version" value\n', }, }; } return { isValid: true, }; }; const GUID_MAGIC_NUMBER = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; export const computeHash = key => crypto .createHash("SHA1") .update(key + GUID_MAGIC_NUMBER) .digest("base64"); export const interceptorsFn = ({ uuidFn = crypto.randomUUID, logger = log, } = {}) => ({ requestId: (req, next) => next({ ...req, id: uuidFn() }), logged: async (req, next) => { const { id, url, method, upgrade } = req; logger.info({ id, url, method, upgrade, type: "in-request", }); const beforeDate = new Date(); const response = await next(req); const afterDate = new Date(); const { status } = response; const before = beforeDate.getTime(); const after = afterDate.getTime(); const duration = after - before; logger.info({ id, status, type: "in-response", timings: { ms: { before, after, duration }, }, }); return response; }, contentType: async (req, next) => { const response = await next(req); const { status, body, headers } = response; assert.equal(typeof status, "number"); const mappings = { string: () => [ "text/html", body ], undefined: () => [ "text/plain", http.STATUS_CODES[status] + "\n" ], FALLBACK: () => [ "application/json", JSON.stringify(body) ], }; const type = typeof body; assert.notEqual(type, "FALLBACK"); const [mimeType, renderedBody] = (mappings[type] || mappings.FALLBACK)(); return { ...response, body: renderedBody, headers: { "Content-Type": mimeType, "Content-Length": Buffer.byteLength(renderedBody), ...(response.headers || {}) }, }; }, serverError: async (req, next) => { try { const response = await next(req); assert.ok("status" in response, `Missing "status"`); return response; } catch (error) { logger.error({ id: req.id, type: "server-error-interceptor", message: error.message, stacktrace: error.stack }); return { status: 500, body: "Internal Server Error\n", }; } }, websocketHandshake: async (req, next) => { if (!req.upgrade) { return await next(req); } const { method, headers} = req; const { isValid, response } = validateUpgrade(method, headers); if (!isValid) { return response; } const _response = await next(req); const hash = computeHash(headers["sec-websocket-key"]); return { status: 101, headers: { "Connection": "Upgrade", "Upgrade": "websocket", "Sec-WebSocket-Accept": hash, }, }; }, }); export const interceptors = interceptorsFn(); export const defaultInterceptors = [ interceptors.serverError, interceptors.requestId, interceptors.logged, interceptors.contentType, interceptors.websocketHandshake, ]; export const chainInterceptors = arr => req => arr.length === 0 ? req : arr[0](req, chainInterceptors(arr.slice(1))); export const wrapHandler = (fn, arr) => arr.length === 0 ? fn : chainInterceptors(arr.concat([ (req, _next) => fn(req) ])); export const normalizeSegments = segments => segments.length === 1 && segments[0] === "" ? segments : segments.concat([""]); export const pathToSegments = path => normalizeSegments(path .replace(/^\/*/, "") .replace(/\/*$/, "") .replace(/\/+/, "/") .split("/")); export const hasPathParams = segments => segments.some(s => s.startsWith(":")); const HTTP_METHODS = new Set([ "GET", "HEAD", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", ]); const HTTP_METHODS_ARR = [...HTTP_METHODS.keys()].sort(u.strSortFn); export const isValidLabel = name => HTTP_METHODS.has(name) || name === "WEBSOCKET"; export const comboForLabel = (label, keyword) => label === "WEBSOCKET" ? [ "websocket", "GET" ] : [ keyword, label ]; export const addRoute = (table, methods, path, handlerFn) => { if (methods === "*") { return addRoute(table, HTTP_METHODS_ARR, path, handlerFn); } if (!Array.isArray(methods)) { return addRoute(table, [methods], path, handlerFn); } assert.ok(methods.every(isValidLabel)); const segments = pathToSegments(path); const kw = hasPathParams(segments) ? "dynamic" : "static"; return methods.reduce( (acc, el) => u.assocIn( acc, comboForLabel(el, kw).concat(segments), handlerFn, ), table, ); }; export const findStaticHandler = (table, method, segments, section) => { const handlerFn = u.getIn(table, [section, method].concat(segments)); return !handlerFn ? null : { handlerFn, params: {} }; }; /** * "first" as is: * - depth-first, as we look for a full match and use it instead of searching * in parallel; * - the first param in the same level to show up alphabetically, e.g. * ":a-user" matches before ":id" does. */ export const firstParamMatch = (tree, segments, params) => { assert.notEqual(segments.length, 0); const [seg, ...nextSegments] = segments; if (segments.length === 1) { assert.equal(seg, ""); const handlerFn = tree[""]; return !handlerFn ? null : { handlerFn, params }; } const subtree = tree[seg]; if (subtree) { const submatch = firstParamMatch(subtree, nextSegments, params); /// propagation of the end of recursion if (submatch) { return submatch; } } /// literal matching failed, we now look for patterns that might match const paramOptions = Object.keys(tree) .filter(s => s.startsWith(":")) .sort(u.strSortFn); return u.findFirst(paramOptions, param => firstParamMatch(tree[param], nextSegments, { ...params, [param.slice(1)]: seg })); }; export const findDynamicHandler = (table, method, segments) => { const tree = table?.dynamic?.[method]; return !tree ? null : firstParamMatch(tree, segments, {}); }; export const findHandler = (table, method, path, upgrade) => { const segments = pathToSegments(path); return upgrade ? findStaticHandler(table, method, segments, "websocket") : ( findStaticHandler(table, method, segments, "static") || findDynamicHandler(table, method, segments) ); }; export const extractQueryParams = s => { const ret = {}; for (const [k, v] of new URLSearchParams(s)) { ret[k] = v; } return ret; }; export const renderStatus = code => `HTTP/1.1 ${code} ${http.STATUS_CODES[code]}` export const renderHeaders = (obj = {}) => Object.keys(obj) .sort(u.strSortFn) .map(name => `${name}: ${obj[name]}`); export const buildHeader = (status, headers) => [renderStatus(status)] .concat(renderHeaders(headers)) .concat(["\r\n"]) .join("\r\n"); export const writeHead = (socket, status, headers) => socket.write(buildHeader(status, headers)); export const handle404 = _req => ({ status: 404, body: "Not Found\n", }); export const make404Handler = interceptors => ({ params: {}, handlerFn: wrapHandler(handle404, interceptors), }); export const handleRequest = async (table, reqHandle) => { const { method, url, headers, upgrade, socket } = reqHandle; const [ path, queryParams ] = url.split("?"); const { params, handlerFn } = ( findHandler(table, method, path, upgrade) || make404Handler(table.interceptors) ); const request = { params: { path: params, query: extractQueryParams(queryParams), }, method, path, headers, upgrade, socket, handler: handlerFn, ref: reqHandle, }; return await handlerFn(request); }; export const makeRequestListener = table => async (req, res) => { const { status, headers, body } = await handleRequest(table, { ...req, upgrade: false, socket: null, headers: req.headers, /// API docs mention getHeaders(), but req doesn't have it... }); res.writeHead(status, headers); res.end(body); }; export const makeUpgradeListener = table => async (req, socket, _head) => { assert.ok(req.upgrade); const { status, headers, body } = await handleRequest(table, { ...req, socket, headers: req.headers, }); writeHead(socket, status, headers); socket.write(body); if (status !== 101) { socket.end(); } }; export const actionsFn = ({ logger = log, } = {}) => ({ "toggle-debug-env": action => { const before = process.env.DEBUG; if (process.env.DEBUG) { delete process.env.DEBUG; } else { process.env.DEBUG = "1"; } const after = process.env.DEBUG; logger.info({ type: "action-response", action, message: "toggle process.env.DEBUG", before: u.undefinedAsNull(before), after: u.undefinedAsNull(after), }); }, "config-dump": action => logger.info({ type: "action-response", action, data: { ...loggerDefaults, ...loggerGlobals, }, }), "ping": _ => logger.info({ message: "pong" }), }); export const actions = actionsFn(); export const lineHandlerFn = ({ logger = log, actionsMap = actions, } = {}) => line => { let cmd = null; try { cmd = JSON.parse(line); } catch (e) { logger.info({ type: "invalid-cmd-input", message: e.message, }); return; } if (typeof cmd?.action !== "string") { logger.info({ type: "missing-key-action", message: `missing the "action" key from the given object`, }); return; } const fn = actionsMap[cmd.action]; if (!fn) { logger.info({ type: "unsupported-action", message: `can't run given action: ${cmd.action}`, }); return; } return fn(cmd.action, ...(cmd?.args || [])); }; export const lineHandler = lineHandlerFn(); export const rmIf = path => { if (fs.existsSync(path)) { fs.unlinkSync(path); } }; export const mkfifo = path => child_process.execFileSync("mkfifo", [path]); export const makeLineEmitter = fn => { let data = ""; return chunk => { const segments = chunk.split("\n"); assert.ok(segments.length > 0); if (segments.length === 1) { data += segments[0]; return; } [ data + u.first(segments), ...u.butlast(u.rest(segments)), ].forEach(fn); data = u.last(segments); }; }; export const makeReopeningPipeReader = (shouldReopenPipe, path, { lineFn, logger, } = {}, out) => { const reader = fs.createReadStream(path, "UTF-8") out.ref = reader; reader.on("data", makeLineEmitter(lineFn)); reader.on("close", () => { if (shouldReopenPipe.ref) { logger.debug({ message: "pipe closed, reopening", }); makeReopeningPipeReader( shouldReopenPipe, path, { lineFn, logger }, out, ); return; } logger.debug({ message: "pipe closed, NOT reopening", }); }); }; export const makePipeReaderFn = ({ lineFn = lineHandler, logger = log, } = {}) => path => { mkfifo(path); let shouldReopenPipe = { ref: true }; const pipe = {}; makeReopeningPipeReader( shouldReopenPipe, path, { lineFn, logger }, pipe, ); return () => new Promise((resolve, reject) => { shouldReopenPipe.ref = false; fs.createWriteStream(path).end().close(); pipe.ref.on("close", resolve); }); }; export const makePipeReader = makePipeReaderFn(); export const buildRoutes = (routes, globalInterceptors = []) => routes.reduce( (acc, [methods, path, handlerFn, interceptors = []]) => addRoute( acc, methods, path, wrapHandler( handlerFn, globalInterceptors.concat(interceptors), ), ), {} ); export const buildTable = (routes, globalInterceptors = []) => u.assocIn( buildRoutes(routes, globalInterceptors), ["interceptors"], globalInterceptors, ); export const promisifyServer = (name, serverHandle, socket, pipe) => { let closePipeFn = null; return { ref: serverHandle, start: util.promisify((...args) => { assert.equal(typeof socket, "string"); assert.equal(typeof pipe, "string"); configLogger({ name }); log.info({ type: "starting-server", name, socket, pipe, node: { version: process.version, versions: process.versions, }, }); rmIf(pipe); closePipeFn = makePipeReader(pipe); rmIf(socket); return serverHandle.listen(socket, ...args) }), stop: util.promisify(async (...args) => { log.info({ type: "stopping-server", name, socket, pipe, node: { version: process.version, versions: process.versions, }, }); await closePipeFn(); return serverHandle.close(...args); }), events: serverHandle, }; }; export const buildServer = ({ name, routes = [], socket = `${name}.socket`, pipe = `${name}.pipe`, globalInterceptors = defaultInterceptors, }) => { const table = buildTable(routes, globalInterceptors); const requestListener = makeRequestListener(table); const server = http.createServer(requestListener); return promisifyServer(name, server, socket, pipe); };