Node - Closing Streams Properly after Pipeline
Asked Answered
U

2

8

Let's say I have the following code:

try {
    let size = 0;

    await pipeline(
        fs.createReadStream('lowercase.txt'),
        async function* (source) {
            for await (const chunk of source) {
                size += chunk.length;
           
                if (size >= 1000000) {
                    throw new Error('File is too big');
                }

                yield String(chunk).toUpperCase();
            }
        },
        fs.createWriteStream('uppercase.txt')
    );

    console.log('Pipeline succeeded.');
} catch (error) {
    console.log('got error:', error);
}

How do I make sure I am properly closing the streams in every single case? The node docs aren't much help -- they just tell me that I am going to have dangling event listeners:

stream.pipeline() will call stream.destroy(err) on all streams except:

Readable streams which have emitted 'end' or 'close'.

Writable streams which have emitted 'finish' or 'close'.

stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.

Untouchable answered 7/4, 2020 at 3:42 Comment(2)
I've also noticed that the doc is oddly incomplete on this topic and that's why I've never used pipeline() myself. I've figured that I just had to wrap my own promises around things where I could totally control the error handling. I'm' the kind of guy that would rather code it myself than wrestle with pre-made, but poorly documented code. The devil-you-know and control vs. the pre-made code you don't understand how to use properly.Polyzoarium
@Polyzoarium that makes sense to me. If it's not too much to ask, would you be able to write a version of the above code using promises and event listeners? Would appreciate it and accept it as an answer :)Untouchable
P
8

So, I find many of the node.js stream compound operations such as pipeline() and .pipe() to be really bad/incomplete at error handling. For example, if you just do this:

fs.createReadStream("input.txt")
  .pipe(fs.createWriteStream("output.txt"))
  .on('error', err => {
      console.log(err);
  }).on('finish', () => {
      console.log("all done");
  });

You would expect that if there was an error opening the readStream that you'd get that error in your error handler here, but "no" that is not the case. An error opening that input file will be unhandled. There's some logic to that as .pipe() returns the output stream and an input error isn't an error on the output stream, but when that's not passed through, it makes it very easy to miss errors on the input stream. The .pipe() operation could have listened for errors on the input stream and passed an error through (even if it was a pipeErr or something different) and then it could have also cleaned up the writeStream properly upon a read error. But, .pipe() wasn't implemented that thoroughly. It seems to want to assume that there would never be an error on the input stream.

Instead, you have to separately save the readStream object and attach an error handler to it directly in order to see that error. So, I just don't trust this compound stuff any more and the doc never really explains how to do proper error handling. I tried to look at the code for pipeline() to see if I could understand the error handling and that did not prove to be a fruitful endeavor.

So, your particular problem seems like it could be done with a transform stream:

const fs = require('fs');
const { Transform } = require('stream');

const myTransform = new Transform({
    transform: function(chunk, encoding, callback) {
        let str = chunk.toString('utf8');
        this.push(str.toUpperCase());
        callback();
    }
});

function upperFile(input, output) {
    return new Promise((resolve, reject) => {
        // common function for cleaning up a partial output file
        function errCleanup(err) {
            fs.unlink(output, function(e) {
                if (e) console.log(e);
                reject(err);
            });
        }

        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
        let outputStream = fs.createWriteStream(output, {emitClose: true});

        // have to separately listen for read/open errors
        inputStream.on("error", err => {
            // have to manually close writeStream when there was an error reading
            if (outputStream) outputStream.destroy();
            errCleanup(err);
        });
        inputStream.pipe(myTransform)
            .pipe(outputStream)
            .on("error", errCleanup)
            .on("close", resolve);        
    });
}

// sample usage
upperFile("input.txt", "output.txt").then(() => {
    console.log("all done");
}).catch(err => {
    console.log("got error", err);
});

As you can see, about 2/3 of this code is dealing with errors in a robust manner (the part that the built-in operations don't do properly).

Polyzoarium answered 8/4, 2020 at 1:0 Comment(0)
C
15

TLDR;

  • pipe has those problems
  • pipeline was created to solve them all, and it does
  • pipeline is great if have all of the parts from start to finish, but if not:
    • a future version of node will have a stream.compose function to address that
    • the stream-chain library is a good option until then

Long winded answer:

The accepted answer just brushes off pipeline, but it's specifically designed to solve this problem. pipe absolutely suffered from it (more below), but I've not found a case where pipeline doesn't properly close streams around files, http, etc. YMMV with random npm packages, but if it has a close or destroy function, as well as an on('error' event, it should be fine.

To demonstrate, this makes a call to the shell to see if our test files are open:

const listOpenFiles = async () => {
  const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");

  // only show our test files
  const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));
  console.log('***** open files:\n', openFiles, '\n-------------');
};

If you call that inside the loop in the example above:

for await (const chunk of source) {
  await listOpenFiles();

The output will keep repeating:

***** open files:
[
  '/path/to/lowercase.txt',
  '/path/to/uppercase.txt'
]

If you call it again after your catch, you can see that everything is closed.

***** open files:
 [] 

About the docs referenced:

What the pipeline docs are referring to in the first 2 bullet points is that it won't close streams that have already closed because... well, they're already closed. As for the dangling listeners, those are indeed left on the individual streams passed to pipeline. However, in your example (a typical case), you're not keeping a reference to the individual streams anyway; they'll be garbage collected immediately after the pipeline completes. It's a warning about potential side effects if you have, for example, a constant reference to one of them.

// using this same instance over and over will end up with tons of dangling listeners
export const capitalizer = new Transform(// ...

Instead, it's better to have "clean" instances. Now that generator functions are easy to chain, it's less common to even have a reference to transforms at all, but you can simply make a function that returns a new instance rather than having a constant one:

export const createCaptilizer = () => new Transform(// ...

In short, the above example is fine on all 3 points.

More info on pipe

pipe, on the other hand, does indeed have the above problems with propagation.

const csvStream = (file) => {
  // does not expose file errors, nor clean up the file stream on parsing errors!!!
  return fs.createReadStream(file).pipe(createCsvTransform());
};

It's widely agreed that it's painful/unintuitive, but it's too late to change it now. I try to avoid it where I can, and I recommend pipeline where possible. However, it's important to note that pipeline requires having all of the parts together. So e.g. for the above, you need the final Writable target as well. You still have to use pipe in cases like this if you want to build just part of a chain. The workaround for this is easier to reason about in isolation:

const csvStream = (file) => {
  const fileStream = fs.createReadStream(file);
  const transform = createCsvTransform();
  // pass file errors forward
  fileStream.on('error', (error) => transform.emit('error', error));
  // close file stream on parsing errors
  transform.on('error', () => fileStream.close());

  return transform;
}

However, there is good news. It's still experimental, but soon stream will expose a stream.compose function. It has all of the propagation/cleanup advantages of pipeline, but just returns a new stream. Essentially, it's what most people thought that pipe would do. ;)

// NO propagation or cleanup
readable.pipe(transform);

// automatic propagation and cleanup
stream.compose(readable, transform);

Until then, check out https://www.npmjs.com/package/stream-chain

A note on pipeline and await

Note that the example above uses await pipeline(//..., but the linked docs are to the synchronous version. That doesn't return a promise, so the await does nothing. From node 15 on up, you will generally want the stream/promises api here: https://nodejs.org/api/stream.html#streams-promises-api

import { pipeline } from 'stream/promises'; // NOT 'stream'

Before node 15, you can make it a promise with util's promisify:

import { pipeline } from 'stream';
import { promisify } from 'util';

await promisify(pipeline)(// ...

Or, to make it simpler for a whole file:

import * as stream from 'stream';
import { promisify } from 'util';

const pipeline = promisify(stream.pipeline);

I only mention this because, were you use await with the synchronous version, it wouldn't actually be completed after the try/catch, so might give the false impression that it failed to clean up when, in fact, it had yet to complete.

Comparable answered 22/1, 2022 at 5:45 Comment(0)
P
8

So, I find many of the node.js stream compound operations such as pipeline() and .pipe() to be really bad/incomplete at error handling. For example, if you just do this:

fs.createReadStream("input.txt")
  .pipe(fs.createWriteStream("output.txt"))
  .on('error', err => {
      console.log(err);
  }).on('finish', () => {
      console.log("all done");
  });

You would expect that if there was an error opening the readStream that you'd get that error in your error handler here, but "no" that is not the case. An error opening that input file will be unhandled. There's some logic to that as .pipe() returns the output stream and an input error isn't an error on the output stream, but when that's not passed through, it makes it very easy to miss errors on the input stream. The .pipe() operation could have listened for errors on the input stream and passed an error through (even if it was a pipeErr or something different) and then it could have also cleaned up the writeStream properly upon a read error. But, .pipe() wasn't implemented that thoroughly. It seems to want to assume that there would never be an error on the input stream.

Instead, you have to separately save the readStream object and attach an error handler to it directly in order to see that error. So, I just don't trust this compound stuff any more and the doc never really explains how to do proper error handling. I tried to look at the code for pipeline() to see if I could understand the error handling and that did not prove to be a fruitful endeavor.

So, your particular problem seems like it could be done with a transform stream:

const fs = require('fs');
const { Transform } = require('stream');

const myTransform = new Transform({
    transform: function(chunk, encoding, callback) {
        let str = chunk.toString('utf8');
        this.push(str.toUpperCase());
        callback();
    }
});

function upperFile(input, output) {
    return new Promise((resolve, reject) => {
        // common function for cleaning up a partial output file
        function errCleanup(err) {
            fs.unlink(output, function(e) {
                if (e) console.log(e);
                reject(err);
            });
        }

        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
        let outputStream = fs.createWriteStream(output, {emitClose: true});

        // have to separately listen for read/open errors
        inputStream.on("error", err => {
            // have to manually close writeStream when there was an error reading
            if (outputStream) outputStream.destroy();
            errCleanup(err);
        });
        inputStream.pipe(myTransform)
            .pipe(outputStream)
            .on("error", errCleanup)
            .on("close", resolve);        
    });
}

// sample usage
upperFile("input.txt", "output.txt").then(() => {
    console.log("all done");
}).catch(err => {
    console.log("got error", err);
});

As you can see, about 2/3 of this code is dealing with errors in a robust manner (the part that the built-in operations don't do properly).

Polyzoarium answered 8/4, 2020 at 1:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.