Piping data from child to parent in nodejs
Asked Answered
C

3

11

I have a nodejs parent process that starts up another nodejs child process. The child process executes some logic and then returns output to the parent. The output is large and I'm trying to use pipes to communicate, as suggested in documentation for child.send() method (which works fine BTW).

I would like someone to suggest how to properly build this communication channel. I want to be able to send data from parent to child and also to be able to send data from child to parent. I've started it a bit, but it is incomplete (sends message only from parent to child) and throws an error.

Parent File Code:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

Child file code:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

var chunks = []; 

readable.on('data', function(chunk) {
    chunks.push(chunk);
});

readable.on('end', function() {
    console.log(chunks.join().toString());
})

The above code prints expected output ("test 2") along with the following error:

events.js:85
      throw er; // Unhandled 'error' event
            ^
Error: shutdown ENOTCONN
    at exports._errnoException (util.js:746:11)
    at Socket.onSocketFinish (net.js:232:26)
    at Socket.emit (events.js:129:20)
    at finishMaybe (_stream_writable.js:484:14)
    at afterWrite (_stream_writable.js:362:3)
    at _stream_writable.js:349:9
    at process._tickCallback (node.js:355:11)
    at Function.Module.runMain (module.js:503:11)
    at startup (node.js:129:16)
    at node.js:814:3

Full Answer:

Parent's code:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe', 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

child.stdio[3].write('First message.\n', 'utf8', function() {
    child.stdio[3].write('Second message.\n', 'utf8', function() {

    });
}); 

child.stdio[4].pipe(process.stdout);

Child's code:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

readable.pipe(process.stdout);
fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');
Coextensive answered 9/3, 2015 at 16:21 Comment(1)
Can you include the error it throws please?Sackman
T
7

Your code works, but by using the streamifier package to create a read stream from a string, your communication channel is automatically closed after that string is transmitted, which is the reason you get an ENOTCONN error.

To be able to send multiple messages over the stream, consider using .write on it. You can call this as often as you like:

child.stdio[3].write('First message.\n');
child.stdio[3].write('Second message.\n');

If you want to use this method to send multiple discrete messages (which I believe is the case based on your remark of using child.send() before), it's a good idea to use some separator symbol to be able to split the messages when the stream is read in the child. In the above example, I used newlines for that. A useful package for helping with this splitting is event-stream.

Now, in order to create another communication channel from the child in the parent, just add another 'pipe' to your stdio.

You can write to it in the child:

fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');

And read from it in the parent:

child.stdio[4].pipe(process.stdout);

This will print 'Sending a message back.' to the console.

Thorn answered 9/3, 2015 at 17:57 Comment(3)
Thank you for your input. I would like however to be able to communicate in both directions, and possibly multiple times. I would prefer to use stdout/stderr to handle other type of data, that's why I used the 'pipe' there.Coextensive
I completely rewrote my answer to better fit the question you asked. Let me know if this answers your question :).Thorn
Thank you again for the input. This helped me solve my problem !Coextensive
L
2

You can do this with fork()

I just solved this one for myself...fork() is the the higher level version of spawn, and it's recommended to use fork() instead of spawn() in general.

if you use the {silent:true} option, stdio will be piped to the parent process

          const cp = require('child_process');

          const n = cp.fork(<path>, args, {
              cwd: path.resolve(__dirname),
              detached: true,
           });

          n.stdout.setEncoding('utf8');

          // here we can listen to the stream of data coming from the child process:
          n.stdout.on('data', (data) => {
            ee.emit('data',data);
          });

          //you can also listen to other events emitted by the child process
          n.on('error', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });

          n.on('message', function (msg) {
            ee.emit('message', msg);
          });

          n.on('uncaughtException', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });


          n.once('exit', function (err) {
             console.error(err.stack);
             ee.emit('exit', err);
          });
Livery answered 13/1, 2016 at 0:30 Comment(0)
S
2

I was running into the same issue and used the {end:false} option to fix the error. Unfortunately the accepted answer works only while handling discrete writes of short amounts of data. In case you have a lot of data (rather than just short messages), you need to handle flow control and using the .write() is not the best. For scenarios like this (large data transfers), its better you use the .pipe() function as originally in your code to handle flow control.

The error is thrown because the readable stream in your parent process is trying to end and close the writable stream input pipe of your child process. You should use the {end: false} option in the parent process pipe:

Original Code: require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

Suggested Modification: require('streamifier').createReadStream('test 2').pipe(child.stdio[3], {end:false});

See details here from the NodeJs documentation: https://nodejs.org/dist/latest-v5.x/docs/api/stream.html#stream_readable_pipe_destination_options

Hope this helps someone else facing this problem.

Sottish answered 17/2, 2016 at 10:27 Comment(1)
Thank you for this input. I'll test it later, and if it works, I'll flag your answer as accepted !Coextensive

© 2022 - 2024 — McMap. All rights reserved.