Async queue, filestream end how to know when both finished
Asked Answered
A

1

11

I am having a slight issue when using async.queue with a filestream

  1. I have a scenario where my filestream will finish
  2. I set fileRead to true
  3. however the queue will be empty and already have called drain
  4. this then leads my "done" to never be called

what is the proper way to say "end the queue" after my filestream is "end" and the queue is empty?

var fs = require('fs')
    , util = require('util')
    , stream = require('stream')
    , es = require('event-stream');

var async = require('async');

var fileRead = false;
var lineNr = 0;

var q = async.queue(function(task, callback) {
    task(function(err, lineData){
        responseLines.push(lineData);
        callback();
      });
  }, 5);

var q.drain = function() {
  if(fileRead){
    done(null, responseLines);
  }
}

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){
        s.pause();
        q.push(async.apply(insertIntoDb, line))
        s.resume();
    })
    .on('error', function(err){
       done(err);
    })
    .on('end', function(){
      fileRead = true;
    })
);

or is there a better use of async which would allow me to do this? async process line by line with the ability to exit early if one of the lines has errors

Aperiodic answered 9/2, 2017 at 19:46 Comment(1)
You could just add another task immediately after you set fileRead to true. I think your problem is that the task function you're calling with each queue item is getting called and finished before the end event gets called on your stream.Alpinist
S
1

Firstly, I'm not sure how much of your snippet is pseudo code but var q.drain = ... is not valid javascript and should error. It should just be q.drain = as you're defining a property on an existing object not declaring a new variable. This could be why your drain function isn't firing if it isn't pseudo code.

There are a few ways you could achieve what I think you're trying to do. One would be to check the length of the queue in your end handler and set the drain function if there are still items to process.

.on('end', function(){
  if(!q.length){
    callDone();
  }
  else {
    q.drain = callDone;
  }
});

function callDone(){
  done(null, responseLines);
}

This is effectively saying "if the queue's been processed call done, if not, call done when it has!" I'm certain there are lots of ways to tidy up your code but hopefully this provides a solution to your specific problem.

Shaina answered 12/2, 2017 at 18:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.