Iterating over a mongodb cursor serially (waiting for callbacks before moving to next document)
Asked Answered
T

10

74

Using mongoskin, I can do a query like this, which will return a cursor:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

      }
}

However, I'd like to call some async functions for each document, and only move on to the next item on the cursor after this has called back (similar to the eachSeries structure in the async.js module). E.g:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

            externalAsyncFunction(result, function(err) {
               //externalAsyncFunction completed - now want to move to next doc
            });

      }
}  

How could I do this?

Thanks

UPDATE:

I don't wan't to use toArray() as this is a large batch operation, and the results might not fit in memory in one go.

Tainataint answered 8/8, 2013 at 6:36 Comment(4)
If you're blocking and waiting for the async function to complete before moving on, what's the point of calling it asynchronously?Tertiary
@RotemHermon I don't have any choice! It's not my function and it's async. (Will rename myAsyncFunction to externalAsyncFunction...)Tainataint
Why are you not using toArray() and then a recursive function to iterate over the result?Stanislaw
@Салман - good question - I'm not using toArray as it's a large batch operation and the full result might not fit in memory. (I'll update question)Tainataint
H
49

If you don't want to load all of the results into memory using toArray, you can iterate using the cursor with something like the following.

myCollection.find({}, function(err, resultCursor) {
  function processItem(err, item) {
    if(item === null) {
      return; // All done!
    }

    externalAsyncFunction(item, function(err) {
      resultCursor.nextObject(processItem);
    });

  }

  resultCursor.nextObject(processItem);
}  
Hemstitch answered 8/8, 2013 at 7:0 Comment(9)
This method didn't work for me for large dataset. I get "RangeError: Maximum call stack size exceeded".Congruence
@SoichiHayashi There are a number of things which will cause a RangeError, but the example above shouldn't throw it. Maybe if you provide more details as a separate question I can help you figure out where it is going wrong.Hemstitch
@SoichiHayashi wrap the async function or callback in a process.nextTick!Impermanent
@SoichiHayashi to follow up on @Impermanent - the reason your stack is overflowing with the example above is because every time you process an item, you run another callback to process the next item within the processing function of the current one. As your resultset grows, you loop through more function calls and each one creates a new stack frame on top of the previous. Wrapping the async callback in process.nextTick, setImmediate or setTimeout causes it to run in the next loop, 'outside' of the call stack we've created to process each document.Colossal
What about cursor.forEach()?Leatherback
@Leatherback - cursor.forEach() provides no async way of signalling to move to next item.Tainataint
for clarity I think adding the externalAsyncFunction example would help. I didn't understand at first. function externalAsyncFunction(doc, next) { // process here next(); }Bandstand
@Bandstand That is the example function used in the original question. It makes more sense when you start there and then read the answer!Hemstitch
callstack over flow and process.nextTick should be avoided.Deyoung
P
95

A more modern approach that uses async/await:

const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
  const doc = await cursor.next();
  // process doc here
}

Notes:

  • This may be even more simple to do when async iterators arrive.
  • You'll probably want to add try/catch for error checking.
  • The containing function should be async or the code should be wrapped in (async function() { ... })() since it uses await.
  • If you want, add await new Promise(resolve => setTimeout(resolve, 1000)); (pause for 1 second) at the end of the while loop to show that it does process docs one after the other.
Potentate answered 5/7, 2017 at 6:3 Comment(4)
Worked perfectly, thank you. Any idea if there are any pitfalls with large datasets?Wealth
great, this is the best solution, unlike the chosen one that will just crashMilieu
How do you use this in node? I get this error: "SyntaxError: Unexpected identifier" at the "cursor.hasNext()"Turbulence
@Nico, Sorry for the late reply, but see point number 3 in the notes ;)Potentate
H
49

If you don't want to load all of the results into memory using toArray, you can iterate using the cursor with something like the following.

myCollection.find({}, function(err, resultCursor) {
  function processItem(err, item) {
    if(item === null) {
      return; // All done!
    }

    externalAsyncFunction(item, function(err) {
      resultCursor.nextObject(processItem);
    });

  }

  resultCursor.nextObject(processItem);
}  
Hemstitch answered 8/8, 2013 at 7:0 Comment(9)
This method didn't work for me for large dataset. I get "RangeError: Maximum call stack size exceeded".Congruence
@SoichiHayashi There are a number of things which will cause a RangeError, but the example above shouldn't throw it. Maybe if you provide more details as a separate question I can help you figure out where it is going wrong.Hemstitch
@SoichiHayashi wrap the async function or callback in a process.nextTick!Impermanent
@SoichiHayashi to follow up on @Impermanent - the reason your stack is overflowing with the example above is because every time you process an item, you run another callback to process the next item within the processing function of the current one. As your resultset grows, you loop through more function calls and each one creates a new stack frame on top of the previous. Wrapping the async callback in process.nextTick, setImmediate or setTimeout causes it to run in the next loop, 'outside' of the call stack we've created to process each document.Colossal
What about cursor.forEach()?Leatherback
@Leatherback - cursor.forEach() provides no async way of signalling to move to next item.Tainataint
for clarity I think adding the externalAsyncFunction example would help. I didn't understand at first. function externalAsyncFunction(doc, next) { // process here next(); }Bandstand
@Bandstand That is the example function used in the original question. It makes more sense when you start there and then read the answer!Hemstitch
callstack over flow and process.nextTick should be avoided.Deyoung
W
40

since node.js v10.3 you can use async iterator

const cursor = db.collection('foo').find({});
for await (const doc of cursor) {
  // do your thing
  // you can even use `await myAsyncOperation()` here
}

Jake Archibald wrote a great blog post about async iterators, that I came to know after reading @user993683's answer.

Wholesome answered 16/6, 2019 at 14:51 Comment(1)
Was looking for this solution, thanks!Hillie
G
11

This works with large dataset by using setImmediate:

var cursor = collection.find({filter...}).cursor();

cursor.nextObject(function fn(err, item) {
    if (err || !item) return;

    setImmediate(fnAction, item, arg1, arg2, function() {
        cursor.nextObject(fn);
    });
});

function fnAction(item, arg1, arg2, callback) {
    // Here you can do whatever you want to do with your item.
    return callback();
}
Glycerite answered 17/8, 2016 at 13:16 Comment(2)
This is great, but you don't need the ".cursor()" on the first line (I got an error).Turbulence
It depends on the version of mongoose used. It was for an older oneGlycerite
D
4

If someone is looking for a Promise way of doing this (as opposed to using callbacks of nextObject), here it is. I am using Node v4.2.2 and mongo driver v2.1.7. This is kind of an asyncSeries version of Cursor.forEach():

function forEachSeries(cursor, iterator) {
  return new Promise(function(resolve, reject) {
    var count = 0;
    function processDoc(doc) {
      if (doc != null) {
        count++;
        return iterator(doc).then(function() {
          return cursor.next().then(processDoc);
        });
      } else {
        resolve(count);
      }
    }
    cursor.next().then(processDoc);
  });
}

To use this, pass the cursor and an iterator that operates on each document asynchronously (like you would for Cursor.forEach). The iterator needs to return a promise, like most mongodb native driver functions do.

Say, you want to update all documents in the collection test. This is how you would do it:

var theDb;
MongoClient.connect(dbUrl).then(function(db) {
  theDb = db;     // save it, we'll need to close the connection when done.
  var cur = db.collection('test').find();

  return forEachSeries(cur, function(doc) {    // this is the iterator
    return db.collection('test').updateOne(
      {_id: doc._id},
      {$set: {updated: true}}       // or whatever else you need to change
    );
    // updateOne returns a promise, if not supplied a callback. Just return it.
  });
})
.then(function(count) {
  console.log("All Done. Processed", count, "records");
  theDb.close();
})
Deplorable answered 18/6, 2016 at 15:50 Comment(2)
I don't see where forEachSeries is being called.Deyoung
call stack overflow.Deyoung
R
2

You can do something like this using the async lib. The key point here is to check if the current doc is null. If it is, it means you are finished.

async.series([
        function (cb) {
            cursor.each(function (err, doc) {
                if (err) {
                    cb(err);
                } else if (doc === null) {
                    cb();
                } else {
                    console.log(doc);
                    array.push(doc);
                }
            });
        }
    ], function (err) {
        callback(err, array);
    });
Rishi answered 8/1, 2014 at 22:56 Comment(1)
Hi Antoine - the problem I had with this approach is that if you need to do something for each record asyncronously, then there's no way for the cursor loop to wait until that's done. (The cursor.each doesn't provide a 'next' callback, so only sync operations are possible within it).Tainataint
S
0

You can get the result in an Array and iterate using a recursive function, something like this.

myCollection.find({}).toArray(function (err, items) {
    var count = items.length;
    var fn = function () {
        externalAsyncFuntion(items[count], function () {
            count -= 1;
            if (count) fn();
        })
    }

    fn();
});

Edit:

This is only applicable for small datasets, for larger one's you should use cursors as mentioned in other answers.

Stanislaw answered 8/8, 2013 at 6:57 Comment(3)
Sorry, I was too slow to answer your question in the comments - I can't use toArray as the result set is too big.Tainataint
Oh ok. Then the other answer is suitable for you.Stanislaw
While true, it's better to avoid this pattern as it can break as data grows.Carmacarmack
R
0

You could use a Future:

myCollection.find({}, function(err, resultCursor) {
    resultCursor.count(Meteor.bindEnvironment(function(err,count){
        for(var i=0;i<count;i++)
        {
            var itemFuture=new Future();

            resultCursor.nextObject(function(err,item)){
                itemFuture.result(item);
            }

            var item=itemFuture.wait();
            //do what you want with the item, 
            //and continue with the loop if so

        }
    }));
});
Rotation answered 23/12, 2014 at 12:4 Comment(0)
R
0

A more modern approach that uses for await:

const cursor = db.collection("foo").find({});

for await(const doc of cursor) {
  // process doc here with await
  await processDoc(doc);
}
Rabon answered 14/2, 2023 at 9:40 Comment(0)
C
-2

You could use simple setTimeOut's. This is an example in typescript running on nodejs (I am using promises via the 'when' module but it can be done without them as well):

        import mongodb = require("mongodb");

        var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {});
        var db =  new mongodb.Db('myDb', dbServer);

        var util = require('util');
        var when = require('when'); //npm install when

        var dbDefer = when.defer();
        db.open(function() {
            console.log('db opened...');
            dbDefer.resolve(db);
        });

        dbDefer.promise.then(function(db : mongodb.Db){
            db.collection('myCollection', function (error, dataCol){
                if(error) {
                    console.error(error); return;
                }

                var doneReading = when.defer();

                var processOneRecordAsync = function(record) : When.Promise{
                    var result = when.defer();

                    setTimeout (function() {
                        //simulate a variable-length operation
                        console.log(util.inspect(record));
                        result.resolve('record processed');
                    }, Math.random()*5);

                    return result.promise;
                }

                var runCursor = function (cursor : MongoCursor){
                    cursor.next(function(error : any, record : any){
                        if (error){
                            console.log('an error occurred: ' + error);
                            return;
                        }
                        if (record){
                            processOneRecordAsync(record).then(function(r){
                                setTimeout(function() {runCursor(cursor)}, 1);
                            });
                        }
                        else{
                            //cursor up
                            doneReading.resolve('done reading data.');
                        }
                    });
                }

                dataCol.find({}, function(error, cursor : MongoCursor){
                    if (!error)
                    {
                        setTimeout(function() {runCursor(cursor)}, 1);
                    }
                });

                doneReading.promise.then(function(message : string){
                    //message='done reading data'
                    console.log(message);
                });
            });
        });
Cloudscape answered 19/3, 2014 at 22:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.