summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/hero.mjs45
1 files changed, 41 insertions, 4 deletions
diff --git a/src/hero.mjs b/src/hero.mjs
index ed105cd..716921f 100644
--- a/src/hero.mjs
+++ b/src/hero.mjs
@@ -350,14 +350,51 @@ export const makeLineEmitter = fn => {
};
};
+export const makeReopeningPipeReader = (shouldReopenPipe, path, {
+ lineFn,
+ logger,
+} = {}, out) => {
+ const reader = fs.createReadStream(path, "UTF-8")
+ out.ref = reader;
+ reader.on("data", makeLineEmitter(lineFn));
+ reader.on("close", () => {
+ if (shouldReopenPipe.ref) {
+ logger.debug({
+ message: "pipe closed, reopening",
+ });
+ makeReopeningPipeReader(
+ shouldReopenPipe,
+ path,
+ { lineFn, logger },
+ out,
+ );
+ return;
+ }
+
+ logger.debug({
+ message: "pipe closed, NOT reopening",
+ });
+ });
+};
+
export const makePipeReaderFn = ({
lineFn = lineHandler,
+ logger = log,
} = {}) => path => {
mkfifo(path);
- fs.createReadStream(path, "UTF-8").on("data", makeLineEmitter(lineFn));
- return () =>
- new Promise((resolve, reject) =>
- fs.createWriteStream(path).end().close(resolve));
+ let shouldReopenPipe = { ref: true };
+ const pipe = {};
+ makeReopeningPipeReader(
+ shouldReopenPipe,
+ path,
+ { lineFn, logger },
+ pipe,
+ );
+ return () => new Promise((resolve, reject) => {
+ shouldReopenPipe.ref = false;
+ fs.createWriteStream(path).end().close();
+ pipe.ref.on("close", resolve);
+ });
};
export const makePipeReader = makePipeReaderFn();