createReadStream end fires before data finished processing
Asked Answered
N

1

7

I am trying to do the following:

  1. Stream a csv file in line by line.
  2. Modify the data contained in each line.
  3. Once all lines are streamed and processed, finish and move on to next task.

The problem is .on("end") fires before .on("data") finishes processing each line. How can I get .on("end") to fire after .on("data") has finished processing all the lines?

Below is a simple example of what I am talking about:

import parse from 'csv-parse'; 

var parser = parse({});

fs.createReadStream(this.upload.location)
.pipe(parser)
.on("data", line => {
  var num = Math.floor((Math.random() * 100) + 1);
  num = num % 3;
  num = num * 1000;
  setTimeout( () => { 
    console.log('data process complete');
  }, num);
})
.on("end", () => {
   console.log('Done: parseFile');
   next(null);
});

Thanks in advance.

Newfeld answered 31/3, 2016 at 23:28 Comment(3)
From the Node docs: "Note that the 'end' event will not fire unless the data is completely consumed. " I can only imagine something bizarre is happening.Swetlana
Could it be that pipe and data are not meant to be used together? Maybe add a data event handler to whatever parser is rather than here?Swetlana
That is a smart idea. Parser is referencing import parse from 'csv-parse'; I'll give it a tryNewfeld
D
0

I think the issue is the setTimeout (or any other async task) within the data event listener. end is firing after data but the async task is causing it to log messages even after the stream fires end.

If you take out the setTimeout then you'll see that it logs all the messages in data before end. You can still perform async tasks but there will be a potential batch of them that run after the stream has ended.

This code helps explain what is going on:

const fs = require('fs')

const testFileName = 'testfile.txt'

fs.writeFileSync(testFileName, '123456789')

let count = 0
const readStream = fs.createReadStream(testFileName, {
  encoding: 'utf8',
  highWaterMark: 1  // low highWaterMark so we can have more chunks to observe
})
readStream.on('data', (data) => {
  console.log('+++++++++++processing sync+++++++++++++')
  console.log(data)
  console.log('+++++++++++end processing sync+++++++++++++')
  setTimeout(() => {
    console.log('-----------processing async-------------')
    console.log(data)
    console.log('-----------end processing async-------------')
  }, ++count * 1000)
})
readStream.on('end', () => {
  console.log('stream ended but still have async tasks doing their thing')
  fs.unlinkSync(testFileName)
})
Dislodge answered 7/4, 2022 at 19:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.