nodejs async await inside createReadStream
Asked Answered
P

3

8

I am reading a CSV file line by line and inserting/updating in MongoDB. The expected output will be 1. console.log(row); 2. console.log(cursor); 3.console.log("stream");

But getting output like 1. console.log(row); console.log(row); console.log(row); console.log(row); console.log(row); ............ ............ 2. console.log(cursor); 3.console.log("stream"); Please let me know what i am missing here.

const csv = require('csv-parser');
const fs = require('fs');

var mongodb = require("mongodb");

var client = mongodb.MongoClient;
var url = "mongodb://localhost:27017/";
var collection;
client.connect(url,{ useUnifiedTopology: true }, function (err, client) {

  var db = client.db("UKCompanies");
  collection = db.collection("company");
  startRead();
});
var cursor={};

async function insertRec(row){
  console.log(row);
  cursor = await collection.update({CompanyNumber:23}, row, {upsert: true});
  if(cursor){
    console.log(cursor);
  }else{
    console.log('not exist')
  }
  console.log("stream");
}



async function startRead() {
  fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      await insertRec(row);
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}
Profound answered 1/2, 2020 at 14:46 Comment(0)
S
23

In your startRead() function, the await insertRec() does not stop more data events from flowing while the insertRec() is processing. So, if you don't want the next data event to run until the insertRec() is done, you need to pause, then resume the stream.

async function startRead() {
  const stream = fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      try {
        stream.pause();
        await insertRec(row);
      } finally {
        stream.resume();
      }
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}

FYI, you also need some error handling if insertRec() fails.

Sekofski answered 1/2, 2020 at 19:48 Comment(4)
Thnks @SekofskiProfound
stream pause/resume is very problematic and rarely worksUnsure
@Unsure then what is the right solution?Spinks
@HungTran i haven't found a reliable way to stream with pause. Nothing I tried workedUnsure
D
6

From Node 10+ ReadableStream got property Symbol.asyncIterator and it's allow processing stream using for-await-of

async function startRead() {
    const readStream = fs.createReadStream('./data/inside/6.csv');    
    
    for await (const row of readStream.pipe(csv())) {
        await insertRec(row);
    }

    console.log('CSV file successfully processed');
}
Deme answered 28/10, 2022 at 12:57 Comment(0)
C
2

That is expected behavior in this case because your on data listener triggers the insertRec asynchronously as and when data is available in stream. So that is why your first line of insert method is getting executed kind of in parallel. If you want to control this behavior you can use highWaterMark (https://nodejs.org/api/stream.html#stream_readable_readablehighwatermark) property while creating the read stream. This way you will get 1 record at a time but I am not sure what your use case is.

something like this

fs.createReadStream(`somefile.csv`, {
  "highWaterMark": 1
})

Also you are not awaiting your startRead method. I would wrap it inside the promise and resolve it in end listener else you will not know when the processing got finished. Something like

function startRead() {
  return new Promise((resolve, reject) => {
    fs.createReadStream(`somepath`)
      .pipe(csv())
      .on("data", async row => {
        await insertRec(row);
      })
      .on("error", err => {
        reject(err);
      })
      .on("end", () => {
        console.log("CSV file successfully processed");
        resolve();
      });
  });

}
Crannog answered 1/2, 2020 at 15:26 Comment(5)
Setting the highWaterMark does not let you throttle the rate of data events. The OP should instead implement a stream Writable that can be configured to write document-by-document, or writev a bulk of documents. The highWaterMark lets you control memory pressure.Overmuch
@Overmuch that is true. Thanks for clarifying.Crannog
@Overmuch - "For streams operating in object mode, the highWaterMark specifies a total number of objects" - nodejs.org/api/stream.html#stream_bufferingCrannog
Yes - the number of objects that will be buffered in the (read/write) stream's internal buffer. Objects will always be processed one at a time with write.The highWaterMark indicate how many objects that can be buffered for the stream instance.Overmuch
Also in this example, the last result is not awaited as the "end" event will fire during its execution of "data" event handler. If the process were to now exit for example there would be data loss in that last record.Spermatogonium

© 2022 - 2024 — McMap. All rights reserved.