diff options
author | EuAndreh <eu@euandre.org> | 2024-03-07 19:23:32 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2024-03-07 19:23:43 -0300 |
commit | 76a41d1728fa8111d72e18293297528d35d5aa78 (patch) | |
tree | 3f80a6cd63b6fee9d2b852c3e897ec13e18d1227 | |
parent | src/hero.mjs: makePipeReader() now returns a function for closing it (diff) | |
download | papod-76a41d1728fa8111d72e18293297528d35d5aa78.tar.gz papod-76a41d1728fa8111d72e18293297528d35d5aa78.tar.xz |
src/hero.mjs: Add makeReopeningPipeReader()
This addresses point nÂș 2 from commit
40118a188fb05219d1188ff775ce71f66c8cb56a.
-rw-r--r-- | src/hero.mjs | 45 | ||||
-rw-r--r-- | tests/js/hero.mjs | 114 |
2 files changed, 130 insertions, 29 deletions
diff --git a/src/hero.mjs b/src/hero.mjs index ed105cd..716921f 100644 --- a/src/hero.mjs +++ b/src/hero.mjs @@ -350,14 +350,51 @@ export const makeLineEmitter = fn => { }; }; +export const makeReopeningPipeReader = (shouldReopenPipe, path, { + lineFn, + logger, +} = {}, out) => { + const reader = fs.createReadStream(path, "UTF-8") + out.ref = reader; + reader.on("data", makeLineEmitter(lineFn)); + reader.on("close", () => { + if (shouldReopenPipe.ref) { + logger.debug({ + message: "pipe closed, reopening", + }); + makeReopeningPipeReader( + shouldReopenPipe, + path, + { lineFn, logger }, + out, + ); + return; + } + + logger.debug({ + message: "pipe closed, NOT reopening", + }); + }); +}; + export const makePipeReaderFn = ({ lineFn = lineHandler, + logger = log, } = {}) => path => { mkfifo(path); - fs.createReadStream(path, "UTF-8").on("data", makeLineEmitter(lineFn)); - return () => - new Promise((resolve, reject) => - fs.createWriteStream(path).end().close(resolve)); + let shouldReopenPipe = { ref: true }; + const pipe = {}; + makeReopeningPipeReader( + shouldReopenPipe, + path, + { lineFn, logger }, + pipe, + ); + return () => new Promise((resolve, reject) => { + shouldReopenPipe.ref = false; + fs.createWriteStream(path).end().close(); + pipe.ref.on("close", resolve); + }); }; export const makePipeReader = makePipeReaderFn(); diff --git a/tests/js/hero.mjs b/tests/js/hero.mjs index f01ced8..09c1030 100644 --- a/tests/js/hero.mjs +++ b/tests/js/hero.mjs @@ -33,6 +33,7 @@ import { rmIf, mkfifo, makeLineEmitter, + makeReopeningPipeReader, makePipeReaderFn, buildRoutes, promisifyServer, @@ -1389,25 +1390,96 @@ const test_makeLineEmitter = async t => { }); }; -const test_makePipeReaderFn = async t => { - t.start("makePipeReaderFn()"); +const test_makeReopeningPipeReader = async t => { + t.start("makeReopeningPipeReader()"); - await t.test("we can close it manually with no data", async () => { - const path = "tests/hero-makePipeReader-0.pipe"; - const lines = []; + await t.test("we can init to not reopen from the start", async () => { + const path = "tests/hero-makeReopeningPipeReader-0.pipe"; + const shouldReopenPipe = { ref: false }; + const lines = [] const logs = []; - const lineFn = x => lines.push(x); - const logger = { debug: x => logs.push(x) }; - const makePipeReader = makePipeReaderFn({ lineFn, logger }); + const lineFn = x => lines.push(x); + const logger = { debug: x => logs.push(x) }; + + const previous = process.env.DEBUG; + delete process.env.DEBUG; rmIf(path); - await makePipeReader(path)(); + mkfifo(path); + const pipe = {}; + makeReopeningPipeReader( + shouldReopenPipe, + path, + { lineFn, logger }, + pipe, + ); - assert.deepEqual(lines, []); + return new Promise((resolve, reject) => { + fs.createWriteStream(path).end().close(); + pipe.ref.on("close", () => { + assert.deepEqual(lines, []); + assert.deepEqual(logs, [{ + message: "pipe closed, NOT reopening", + }]); + + process.env.DEBUG = previous; + resolve(); + }); + }); + }); + + await t.test("we can reopen more than once", async () => { + const path = "tests/hero-makeReopeningPipeReader-1.pipe"; + const shouldReopenPipe = { ref: true }; + const lines = []; + const logs = []; + const lineFn = x => lines.push(x); + const logger = { debug: x => logs.push(x) }; + + const previous = process.env.DEBUG; + delete process.env.DEBUG; + + rmIf(path); + mkfifo(path); + const pipe = {}; + makeReopeningPipeReader( + shouldReopenPipe, + path, + { lineFn, logger }, + pipe, + ); + return new Promise((resolve, reject) => { + fs.createWriteStream(path).end("first\n").close(); + pipe.ref.on("close", () => { + fs.createWriteStream(path).end("second\n").close(); + pipe.ref.on("close", () => { + shouldReopenPipe.ref = false; + fs.createWriteStream(path).end("third\n").close(); + pipe.ref.on("close", () => { + assert.deepEqual(lines, [ + "first", + "second", + "third", + ]); + assert.deepEqual(logs, [ + { message: "pipe closed, reopening" }, + { message: "pipe closed, reopening" }, + { message: "pipe closed, NOT reopening" }, + ]); + process.env.DEBUG = previous; + resolve(); + }); + }); + }); + }); }); +}; - await t.test("closing on pipe EOF reopens", async () => { - const path = "tests/hero-makePipeReader-1.pipe"; +const test_makePipeReaderFn = async t => { + t.start("makePipeReaderFn()"); + + await t.test("we can close it directly on creation with no data", async () => { + const path = "tests/hero-makePipeReader-0.pipe"; const lines = []; const logs = []; const lineFn = x => lines.push(x); @@ -1415,19 +1487,10 @@ const test_makePipeReaderFn = async t => { const makePipeReader = makePipeReaderFn({ lineFn, logger }); rmIf(path); - const closeReader = makePipeReader(path); - const writer = fs.createWriteStream(path); - writer.end("first\nsecond\nthird\n"); - return new Promise((resolve, reject) => - writer.close(async () => { - await closeReader(); - assert.deepEqual(lines, [ - "first", - "second", - "third", - ]); - resolve(); - })); + await makePipeReader(path)(); + + assert.deepEqual(lines, []); + assert.deepEqual(logs, [{ message: "pipe closed, NOT reopening" }]); }); }; @@ -1668,6 +1731,7 @@ await runner.runTests([ test_rmIf, test_mkfifo, test_makeLineEmitter, + test_makeReopeningPipeReader, test_makePipeReaderFn, test_promisifyServer, test_buildServer, |