NodeJS, promises, streams - processing large CSV files
Asked Answered
K

4

12

I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.

This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).

So, I start with:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

Now, I have two inter-related issues:

  1. I need to throttle the actual amount of data being processed, so as to not create memory pressures.
  2. The function passed as the processor param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now: pg-promise). As such, it will create a promise in memory and move on, repeatedly.

The pg-promise library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable section after each read(), which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.

Does anyone have a working example of this that I can use as a jumping point?

UPDATE: Probably more than one way to skin the cat, but this works:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

Anyone sees a potential problem with this approach?

Kilogram answered 14/10, 2015 at 15:35 Comment(5)
Looks neat, and if this works, then great job! I'm glad that the most recent addition of page into pg-promise wasn't in vain ;)Lialiabilities
Just simplified it in the end of readDataFromStream ;) You do not need to return undefined, that's what happens when you return nothing anyway ;)Lialiabilities
Actually, there may be a problem with this... when you call db.task, yo do not handle the result from it, so in case it rejects, there will be an error thrown by the promise library that your reject wasn't handled.Lialiabilities
Should I do a return this.page() with a catch() on task()?Kilogram
I have updated my answer - it gives your the whole picture of how to solve your problem.Lialiabilities
L
5

Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

Note that the only thing I changed: using library csv-parse instead of csv, as a better alternative.

Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.

Lialiabilities answered 15/10, 2015 at 2:47 Comment(13)
Wouldn't this try to read the next item from the parser after query("INSERT…") is done, irregardless of whether the next item is readable already? Or does parser.read() return a promise?Nunley
Also, what happened to the promise-returning processor callback function that OP was looking for?Nunley
@Nunley My understanding was that parser.read() is synchronous, the way it was shown. And if it turns out to be not, then it will need to be wrapped into a promise, obviously. And readable is fired once, not for each read operation, that's my understanding. As for the promise-returning processor, he was simply looking for a resolve when the data processing has finished, and a reject in case it failed, which my example provides, i.e. the task will resolve/reject accordingly.Lialiabilities
Hm, I'd need to read the stream docs again, but I don't think that's how it worksNunley
Yeah, I'm not quite sure about the streams part myself, and I wrote the example based on the code provided with the question. If that code is wrong, then so would be mine. But still, it does show the general approach.Lialiabilities
The answer has been rewritten completely, based on a fully tested piece of code.Lialiabilities
@vitaly-t: I looked through your addition to spex for read() and it's quite beautiful and tidy. ;) My code above was just my experimentation; hadn't refactored it and cleaned it up yet. Thank you. BTW, csv-parse is part of the whole csv module, or is the smiley face acknowledgement of that?Kilogram
@vitaly-t: Oh, and, in an earlier comment, you state that the readable event is only fired once. But, in fact, it can be fired multiple times. Does this affect your code?Kilogram
@alphadogg, about readable fired once - that's now not relevant, please ignore that, it was for the old code sample I provided. Choice of CSV isn't really important here.Lialiabilities
I get a weird error where the amount of lines read is actually much higher than the amount of operations done: DATA: { calls: 36005, reads: 4129369, length: 0, duration: 169852 } is that intended?Insufferable
@QiongWu this usually means that your code is taking long time to process the data received.Lialiabilities
but how can I ensure that every read results in a call in the end, even if the code takes long time to process the data? right now it seems that a good portion of the lines read are getting lostInsufferable
@QiongWu nothing should be lost, you will get your data in chunks.Lialiabilities
G
8

You might want to look at promise-streams

var ps = require('promise-streams');
passedStream
  .pipe(csv.parse({trim: true}))
  .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
  .wait().then(_ => {
    console.log("All done!");
  });

Works with backpressure and everything.

Ginnygino answered 27/10, 2015 at 4:39 Comment(0)
L
5

Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

Note that the only thing I changed: using library csv-parse instead of csv, as a better alternative.

Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.

Lialiabilities answered 15/10, 2015 at 2:47 Comment(13)
Wouldn't this try to read the next item from the parser after query("INSERT…") is done, irregardless of whether the next item is readable already? Or does parser.read() return a promise?Nunley
Also, what happened to the promise-returning processor callback function that OP was looking for?Nunley
@Nunley My understanding was that parser.read() is synchronous, the way it was shown. And if it turns out to be not, then it will need to be wrapped into a promise, obviously. And readable is fired once, not for each read operation, that's my understanding. As for the promise-returning processor, he was simply looking for a resolve when the data processing has finished, and a reject in case it failed, which my example provides, i.e. the task will resolve/reject accordingly.Lialiabilities
Hm, I'd need to read the stream docs again, but I don't think that's how it worksNunley
Yeah, I'm not quite sure about the streams part myself, and I wrote the example based on the code provided with the question. If that code is wrong, then so would be mine. But still, it does show the general approach.Lialiabilities
The answer has been rewritten completely, based on a fully tested piece of code.Lialiabilities
@vitaly-t: I looked through your addition to spex for read() and it's quite beautiful and tidy. ;) My code above was just my experimentation; hadn't refactored it and cleaned it up yet. Thank you. BTW, csv-parse is part of the whole csv module, or is the smiley face acknowledgement of that?Kilogram
@vitaly-t: Oh, and, in an earlier comment, you state that the readable event is only fired once. But, in fact, it can be fired multiple times. Does this affect your code?Kilogram
@alphadogg, about readable fired once - that's now not relevant, please ignore that, it was for the old code sample I provided. Choice of CSV isn't really important here.Lialiabilities
I get a weird error where the amount of lines read is actually much higher than the amount of operations done: DATA: { calls: 36005, reads: 4129369, length: 0, duration: 169852 } is that intended?Insufferable
@QiongWu this usually means that your code is taking long time to process the data received.Lialiabilities
but how can I ensure that every read results in a call in the end, even if the code takes long time to process the data? right now it seems that a good portion of the lines read are getting lostInsufferable
@QiongWu nothing should be lost, you will get your data in chunks.Lialiabilities
H
2

I found a slightly better way of doing the same thing; with more control. This is a minimal skeleton with precise parallelism control. With parallel value as one all records are processed in sequence without having the entire file in memory, we can increase parallel value for faster processing.

      const csv = require('csv');
      const csvParser = require('csv-parser')
      const fs = require('fs');

      const readStream = fs.createReadStream('IN');
      const writeStream = fs.createWriteStream('OUT');

      const transform = csv.transform({ parallel: 1 }, (record, done) => {
                                           asyncTask(...) // return Promise
                                           .then(result => {
                                             // ... do something when success
                                             return done(null, record);
                                           }, (err) => {
                                             // ... do something when error
                                             return done(null, record);
                                           })
                                       }
                                     );

      readStream
      .pipe(csvParser())
      .pipe(transform)
      .pipe(csv.stringify())
      .pipe(writeStream);

This allows doing an async task for each record.

To return a promise instead we can return with an empty promise, and complete it when stream finishes.

    .on('end',function() {
      //do something wiht csvData
      console.log(csvData);
    });
Hullabaloo answered 19/2, 2020 at 10:14 Comment(0)
V
1

So to say you don't want streaming but some kind of data chunks? ;-)

Do you know https://github.com/substack/stream-handbook?

I think the simplest approach without changing your architecture would be some kind of promise pool. e.g. https://github.com/timdp/es6-promise-pool

Valedictory answered 14/10, 2015 at 17:51 Comment(1)
Well, I have thought of using async.queue in the function, returning a promise of eventually finishing the file (or not). However, I was wondering how one ties a promise library like Bluebird with the typical stream-based processing of large files. ('pg-promise` includes spex, which provides for higher-level promise functions)Kilogram

© 2022 - 2024 — McMap. All rights reserved.