TLDR;
pipe
has those problems
pipeline
was created to solve them all, and it does
pipeline
is great if have all of the parts from start to finish, but if not:
Long winded answer:
The accepted answer just brushes off pipeline
, but it's specifically designed to solve this problem. pipe
absolutely suffered from it (more below), but I've not found a case where pipeline
doesn't properly close streams around files, http, etc. YMMV with random npm packages, but if it has a close
or destroy
function, as well as an on('error'
event, it should be fine.
To demonstrate, this makes a call to the shell to see if our test files are open:
const listOpenFiles = async () => {
const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");
// only show our test files
const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));
console.log('***** open files:\n', openFiles, '\n-------------');
};
If you call that inside the loop in the example above:
for await (const chunk of source) {
await listOpenFiles();
The output will keep repeating:
***** open files:
[
'/path/to/lowercase.txt',
'/path/to/uppercase.txt'
]
If you call it again after your catch, you can see that everything is closed.
***** open files:
[]
About the docs referenced:
What the pipeline
docs are referring to in the first 2 bullet points is that it won't close streams that have already closed because... well, they're already closed. As for the dangling listeners, those are indeed left on the individual streams passed to pipeline
. However, in your example (a typical case), you're not keeping a reference to the individual streams anyway; they'll be garbage collected immediately after the pipeline completes. It's a warning about potential side effects if you have, for example, a constant reference to one of them.
// using this same instance over and over will end up with tons of dangling listeners
export const capitalizer = new Transform(// ...
Instead, it's better to have "clean" instances. Now that generator functions are easy to chain, it's less common to even have a reference to transforms at all, but you can simply make a function that returns a new instance rather than having a constant one:
export const createCaptilizer = () => new Transform(// ...
In short, the above example is fine on all 3 points.
More info on pipe
pipe
, on the other hand, does indeed have the above problems with propagation.
const csvStream = (file) => {
// does not expose file errors, nor clean up the file stream on parsing errors!!!
return fs.createReadStream(file).pipe(createCsvTransform());
};
It's widely agreed that it's painful/unintuitive, but it's too late to change it now. I try to avoid it where I can, and I recommend pipeline
where possible. However, it's important to note that pipeline
requires having all of the parts together. So e.g. for the above, you need the final Writable
target as well. You still have to use pipe
in cases like this if you want to build just part of a chain. The workaround for this is easier to reason about in isolation:
const csvStream = (file) => {
const fileStream = fs.createReadStream(file);
const transform = createCsvTransform();
// pass file errors forward
fileStream.on('error', (error) => transform.emit('error', error));
// close file stream on parsing errors
transform.on('error', () => fileStream.close());
return transform;
}
However, there is good news. It's still experimental, but soon stream will expose a stream.compose
function. It has all of the propagation/cleanup advantages of pipeline
, but just returns a new stream. Essentially, it's what most people thought that pipe
would do. ;)
// NO propagation or cleanup
readable.pipe(transform);
// automatic propagation and cleanup
stream.compose(readable, transform);
Until then, check out https://www.npmjs.com/package/stream-chain
A note on pipeline
and await
Note that the example above uses await pipeline(//...
, but the linked docs are to the synchronous version. That doesn't return a promise, so the await
does nothing. From node 15 on up, you will generally want the stream/promises
api here: https://nodejs.org/api/stream.html#streams-promises-api
import { pipeline } from 'stream/promises'; // NOT 'stream'
Before node 15, you can make it a promise with util's promisify
:
import { pipeline } from 'stream';
import { promisify } from 'util';
await promisify(pipeline)(// ...
Or, to make it simpler for a whole file:
import * as stream from 'stream';
import { promisify } from 'util';
const pipeline = promisify(stream.pipeline);
I only mention this because, were you use await
with the synchronous version, it wouldn't actually be completed after the try/catch
, so might give the false impression that it failed to clean up when, in fact, it had yet to complete.
pipeline()
myself. I've figured that I just had to wrap my own promises around things where I could totally control the error handling. I'm' the kind of guy that would rather code it myself than wrestle with pre-made, but poorly documented code. The devil-you-know and control vs. the pre-made code you don't understand how to use properly. – Polyzoarium