Node.js: can you use asynchronous functions from within streams?
Asked Answered
M

2

30

Consider the following:

var asyncFunction = function(data, callback) {
  doAsyncyThing(function(data){
    // do some stuff
    return callback(err)
  })
}
fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
  .pipe(JSONstream.parse())
  .on('data', asyncFunction)   // <- how to let asyncFunction complete before continuing

How does the stream know when asyncFunction has completed? Is there any way to use asynchronous functions from within streams?

Mossberg answered 16/4, 2016 at 8:50 Comment(5)
I'm not sure how the stream will handle the callback since the documentation doesn't show two parameters in on('data', function). If you do want to do something fancy though, you can pause the stream, do your stuff, then resume the stream.Equalize
@DaveBriand are you saying categorically that you cannot do this, or are you restating the question? :)Mossberg
Categorically you can't pass a two argument function to the data stream event. However, you can pause the stream on the data event, do some asynchronous processing, then resume the stream when your processing is complete.Equalize
Great! Is there a cleanish way to code this? Could you give an example?Mossberg
Just for clarity: yes, on('data', asyncFunction) cannot deal with callbacks, since asyncFunction must be in the form function(data). My point is: "how then do you deal with callbacks?"Mossberg
E
2

I think this is enough:

const Transform = require('node:stream').Transform

const deferTransform = new Transform({
  transform: (chunk, encoding, next) => {
    Promise.resolve(`${chunk.toString().toUpperCase()} `).then((data) =>
      next(null, data)
    );
  },
});


fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(deferTransform)
Elitism answered 11/6, 2023 at 21:35 Comment(0)
I
24

Check out transform streams. They give you the ability to run async code on a chunk, and then call a callback when you are finished. Here are the docs: https://nodejs.org/api/stream.html#transform_transformchunk-encoding-callback

As a simple example, you can do something like:

const Transform = require('stream').Transform
class WorkerThing extends Transform {
    _transform(chunk, encoding, cb) {
        asyncFunction(chunk, cb)
    }
}

const workerThing = new WorkerThing()

fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(workerThing)
Interloper answered 5/5, 2016 at 3:43 Comment(1)
I didnt really understand what you were saying at first, but yes, I see now that transform streams are probably the way forward. Will try this out.Mossberg
E
2

I think this is enough:

const Transform = require('node:stream').Transform

const deferTransform = new Transform({
  transform: (chunk, encoding, next) => {
    Promise.resolve(`${chunk.toString().toUpperCase()} `).then((data) =>
      next(null, data)
    );
  },
});


fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(deferTransform)
Elitism answered 11/6, 2023 at 21:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.