From 76a41d1728fa8111d72e18293297528d35d5aa78 Mon Sep 17 00:00:00 2001 From: EuAndreh Date: Thu, 7 Mar 2024 19:23:32 -0300 Subject: src/hero.mjs: Add makeReopeningPipeReader() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This addresses point nÂș 2 from commit 40118a188fb05219d1188ff775ce71f66c8cb56a. --- src/hero.mjs | 45 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 41 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/hero.mjs b/src/hero.mjs index ed105cd..716921f 100644 --- a/src/hero.mjs +++ b/src/hero.mjs @@ -350,14 +350,51 @@ export const makeLineEmitter = fn => { }; }; +export const makeReopeningPipeReader = (shouldReopenPipe, path, { + lineFn, + logger, +} = {}, out) => { + const reader = fs.createReadStream(path, "UTF-8") + out.ref = reader; + reader.on("data", makeLineEmitter(lineFn)); + reader.on("close", () => { + if (shouldReopenPipe.ref) { + logger.debug({ + message: "pipe closed, reopening", + }); + makeReopeningPipeReader( + shouldReopenPipe, + path, + { lineFn, logger }, + out, + ); + return; + } + + logger.debug({ + message: "pipe closed, NOT reopening", + }); + }); +}; + export const makePipeReaderFn = ({ lineFn = lineHandler, + logger = log, } = {}) => path => { mkfifo(path); - fs.createReadStream(path, "UTF-8").on("data", makeLineEmitter(lineFn)); - return () => - new Promise((resolve, reject) => - fs.createWriteStream(path).end().close(resolve)); + let shouldReopenPipe = { ref: true }; + const pipe = {}; + makeReopeningPipeReader( + shouldReopenPipe, + path, + { lineFn, logger }, + pipe, + ); + return () => new Promise((resolve, reject) => { + shouldReopenPipe.ref = false; + fs.createWriteStream(path).end().close(); + pipe.ref.on("close", resolve); + }); }; export const makePipeReader = makePipeReaderFn(); -- cgit v1.2.3