summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2024-03-07 19:23:32 -0300
committerEuAndreh <eu@euandre.org>2024-03-07 19:23:43 -0300
commit76a41d1728fa8111d72e18293297528d35d5aa78 (patch)
tree3f80a6cd63b6fee9d2b852c3e897ec13e18d1227
parentsrc/hero.mjs: makePipeReader() now returns a function for closing it (diff)
downloadpapod-76a41d1728fa8111d72e18293297528d35d5aa78.tar.gz
papod-76a41d1728fa8111d72e18293297528d35d5aa78.tar.xz
src/hero.mjs: Add makeReopeningPipeReader()
This addresses point nÂș 2 from commit 40118a188fb05219d1188ff775ce71f66c8cb56a.
-rw-r--r--src/hero.mjs45
-rw-r--r--tests/js/hero.mjs114
2 files changed, 130 insertions, 29 deletions
diff --git a/src/hero.mjs b/src/hero.mjs
index ed105cd..716921f 100644
--- a/src/hero.mjs
+++ b/src/hero.mjs
@@ -350,14 +350,51 @@ export const makeLineEmitter = fn => {
};
};
+export const makeReopeningPipeReader = (shouldReopenPipe, path, {
+ lineFn,
+ logger,
+} = {}, out) => {
+ const reader = fs.createReadStream(path, "UTF-8")
+ out.ref = reader;
+ reader.on("data", makeLineEmitter(lineFn));
+ reader.on("close", () => {
+ if (shouldReopenPipe.ref) {
+ logger.debug({
+ message: "pipe closed, reopening",
+ });
+ makeReopeningPipeReader(
+ shouldReopenPipe,
+ path,
+ { lineFn, logger },
+ out,
+ );
+ return;
+ }
+
+ logger.debug({
+ message: "pipe closed, NOT reopening",
+ });
+ });
+};
+
export const makePipeReaderFn = ({
lineFn = lineHandler,
+ logger = log,
} = {}) => path => {
mkfifo(path);
- fs.createReadStream(path, "UTF-8").on("data", makeLineEmitter(lineFn));
- return () =>
- new Promise((resolve, reject) =>
- fs.createWriteStream(path).end().close(resolve));
+ let shouldReopenPipe = { ref: true };
+ const pipe = {};
+ makeReopeningPipeReader(
+ shouldReopenPipe,
+ path,
+ { lineFn, logger },
+ pipe,
+ );
+ return () => new Promise((resolve, reject) => {
+ shouldReopenPipe.ref = false;
+ fs.createWriteStream(path).end().close();
+ pipe.ref.on("close", resolve);
+ });
};
export const makePipeReader = makePipeReaderFn();
diff --git a/tests/js/hero.mjs b/tests/js/hero.mjs
index f01ced8..09c1030 100644
--- a/tests/js/hero.mjs
+++ b/tests/js/hero.mjs
@@ -33,6 +33,7 @@ import {
rmIf,
mkfifo,
makeLineEmitter,
+ makeReopeningPipeReader,
makePipeReaderFn,
buildRoutes,
promisifyServer,
@@ -1389,25 +1390,96 @@ const test_makeLineEmitter = async t => {
});
};
-const test_makePipeReaderFn = async t => {
- t.start("makePipeReaderFn()");
+const test_makeReopeningPipeReader = async t => {
+ t.start("makeReopeningPipeReader()");
- await t.test("we can close it manually with no data", async () => {
- const path = "tests/hero-makePipeReader-0.pipe";
- const lines = [];
+ await t.test("we can init to not reopen from the start", async () => {
+ const path = "tests/hero-makeReopeningPipeReader-0.pipe";
+ const shouldReopenPipe = { ref: false };
+ const lines = []
const logs = [];
- const lineFn = x => lines.push(x);
- const logger = { debug: x => logs.push(x) };
- const makePipeReader = makePipeReaderFn({ lineFn, logger });
+ const lineFn = x => lines.push(x);
+ const logger = { debug: x => logs.push(x) };
+
+ const previous = process.env.DEBUG;
+ delete process.env.DEBUG;
rmIf(path);
- await makePipeReader(path)();
+ mkfifo(path);
+ const pipe = {};
+ makeReopeningPipeReader(
+ shouldReopenPipe,
+ path,
+ { lineFn, logger },
+ pipe,
+ );
- assert.deepEqual(lines, []);
+ return new Promise((resolve, reject) => {
+ fs.createWriteStream(path).end().close();
+ pipe.ref.on("close", () => {
+ assert.deepEqual(lines, []);
+ assert.deepEqual(logs, [{
+ message: "pipe closed, NOT reopening",
+ }]);
+
+ process.env.DEBUG = previous;
+ resolve();
+ });
+ });
+ });
+
+ await t.test("we can reopen more than once", async () => {
+ const path = "tests/hero-makeReopeningPipeReader-1.pipe";
+ const shouldReopenPipe = { ref: true };
+ const lines = [];
+ const logs = [];
+ const lineFn = x => lines.push(x);
+ const logger = { debug: x => logs.push(x) };
+
+ const previous = process.env.DEBUG;
+ delete process.env.DEBUG;
+
+ rmIf(path);
+ mkfifo(path);
+ const pipe = {};
+ makeReopeningPipeReader(
+ shouldReopenPipe,
+ path,
+ { lineFn, logger },
+ pipe,
+ );
+ return new Promise((resolve, reject) => {
+ fs.createWriteStream(path).end("first\n").close();
+ pipe.ref.on("close", () => {
+ fs.createWriteStream(path).end("second\n").close();
+ pipe.ref.on("close", () => {
+ shouldReopenPipe.ref = false;
+ fs.createWriteStream(path).end("third\n").close();
+ pipe.ref.on("close", () => {
+ assert.deepEqual(lines, [
+ "first",
+ "second",
+ "third",
+ ]);
+ assert.deepEqual(logs, [
+ { message: "pipe closed, reopening" },
+ { message: "pipe closed, reopening" },
+ { message: "pipe closed, NOT reopening" },
+ ]);
+ process.env.DEBUG = previous;
+ resolve();
+ });
+ });
+ });
+ });
});
+};
- await t.test("closing on pipe EOF reopens", async () => {
- const path = "tests/hero-makePipeReader-1.pipe";
+const test_makePipeReaderFn = async t => {
+ t.start("makePipeReaderFn()");
+
+ await t.test("we can close it directly on creation with no data", async () => {
+ const path = "tests/hero-makePipeReader-0.pipe";
const lines = [];
const logs = [];
const lineFn = x => lines.push(x);
@@ -1415,19 +1487,10 @@ const test_makePipeReaderFn = async t => {
const makePipeReader = makePipeReaderFn({ lineFn, logger });
rmIf(path);
- const closeReader = makePipeReader(path);
- const writer = fs.createWriteStream(path);
- writer.end("first\nsecond\nthird\n");
- return new Promise((resolve, reject) =>
- writer.close(async () => {
- await closeReader();
- assert.deepEqual(lines, [
- "first",
- "second",
- "third",
- ]);
- resolve();
- }));
+ await makePipeReader(path)();
+
+ assert.deepEqual(lines, []);
+ assert.deepEqual(logs, [{ message: "pipe closed, NOT reopening" }]);
});
};
@@ -1668,6 +1731,7 @@ await runner.runTests([
test_rmIf,
test_mkfifo,
test_makeLineEmitter,
+ test_makeReopeningPipeReader,
test_makePipeReaderFn,
test_promisifyServer,
test_buildServer,