How to create a Duplex stream in nodejs?
Asked Answered
L

1

12

I create a new Duplex stream like this

const Duplex = require('stream').Duplex;
let myStream = new Duplex()

Via a Websocket I receive chunks/buffer which I add to the stream like this every time a new chunk comes in via the Websocket:

myStream.push(buffer)

I then pipe the stream to another process (ffmpeg in this example)

myStream.pipe(process.stdout); This causes the error NodeError: The _read() method is not implemented which I understand but I don't understand why and how I should implement it. I also see that in the Duplex class constructor you can pass a read function, but why is this neccessary? I just wanna continuously push chunks into the stream and then pipe it to another process

Lammers answered 7/11, 2019 at 11:31 Comment(0)
V
12

The nodejs Duplex stream requires the implementer to specify both a write and a read method:

import stream from 'stream';

const duplex = new stream.Duplex({
  write: (chunk, encoding, next) {
    // Do something with the chunk and then call next() to indicate 
    // that the chunk has been processed. The write() fn will handle
    // data piped into this duplex stream. After the write() has
    // finished, the data will be processed by the read() below.
    next();
  },
  read: ( size ) {
    // Add new data to be read by streams piped from this duplex
    this.push( "some data" )
  }
})

The official nodejs documentation on streams is available here: API for Stream Implementers

The websocket scenario
The websocket example described above should probably use a Readable rather than a duplex stream. Duplex streams are useful in store-and-forward or process-and-forward scenarios. However, it sounds like the stream in the websocket example is used solely to move data from the websocket to a stream interface. This can be achieved using a Readable:


import stream from 'stream';

const onSocketConnection = ( socket ) => {
    const readable = new stream.Readable({
      // The read logic is omitted since the data is pushed to the socket
      // outside of the script's control. However, the read() function 
      // must be defined.
      read(){}
    });

    socket.on('message', ( data ) => {
        // Push the data on the readable queue
        readable.push( data );
    });

    readable.pipe( ffmpeg );
}
Vigilant answered 9/11, 2019 at 7:59 Comment(4)
I see, thanks for the explanation! But I still wonder why you should push inside read() . Also when you continuously write (push) to a Readable stream, that feels like a Readable + Writable. How is that different to a Duplex then? Im not sure why the example code I have uses Duplex streams but it just pipes the stream to sox and then to ffmpeg, but if I use a Readable that behaves like you describe that should work fine I guessLammers
Doing a push means sending data down the pipe, while having some other component call read on your Readable signals that the respective data sink is ready for more data. I think that the easiest reason to reconcile with as to why a readable is the right choice in this scenario is that you don't want the two buffers (write & read) that a duplex would introduce - only the read buffer is necessary (which is where the push'ed data goes). Personally, I only became comfortable with when to use duplexes/transforms and when to use something else after having used the stream API for some time.Vigilant
okay so I understand it right that my stream is Readable because "someone" else reads from it via .pipe but its not writable because I define myself how the read-buffer is filled (via readable.push)? My confusion lies in the naming since I first thought the "read" function reads in the data that comes into the stream - which is not the case since it is the function called when someone wants to get data from it. But then in some way my Readable stream is kinda "writable" because I perform a custom push from the outside, right? I write something into the stream - that confused me!Lammers
Yes! That's well put. I misunderstood the documentation the same way initially :)Vigilant

© 2022 - 2024 — McMap. All rights reserved.