How to buffer MongoDB inserts during disconnect in node.js?
Asked Answered
B

2

13

We do read an XML file (using xml-stream) with about 500k elements and do insert them into MongoDB like this:

xml.on(`endElement: product`, writeDataToDb.bind(this, "product"));

Insert in writeDataToDb(type, obj) looks like this:

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { });

Now when the Mongo connection gets disconnected, the xml stream still reads and the console gets flooded with error messages (can't insert, disconnected, EPIPE broken, ...).

In the docs it says:

When you shut down the mongod process, the driver stops processing operations and keeps buffering them due to bufferMaxEntries being -1 by default meaning buffer all operations.

What does this buffer actually do?

We notice when we insert data and close the mongo server, the things get buffered, then we bring the mongo server back up, the native driver successfully reconnects and node resumes inserting data but the buffered documents (during mongo beeing offline) do not get inserted again.

So I question this buffer and its use.

Goal:

We are looking for the best way to keep inserts in buffer until mongo comes back (in 15000milliseconds according to wtimeout) and let then insert the buffered documents or make use of xml.pause(); and xml.resume() which we tried without success.

Basically we need a little help in how to handle disconnects without data loss or interrupts.

Bollinger answered 17/2, 2017 at 16:35 Comment(2)
Can't replicate this, both the example in the docs and tests using xml-stream insert the buffered objects once the mongo server is back up..maybe you can post more code / give some more information about your setup?Ordeal
@Ordeal I can't share my scripts as they are company related, but would you mind sending me the script you tried to replicate this? Gist/pastebin would be ok.Bollinger
P
2

Inserting 500K elements with insertOne() is a very bad idea. You should instead use bulk operations that allows you to insert many document in a single request. (here for example 10000, so it can be done in 50 single requests) To avoid buffering issue, you can manually handle it:

  1. Disable buffering with bufferMaxEntries: 0
  2. Set reconnect properties: reconnectTries: 30, reconnectInterval: 1000
  3. Create a bulkOperation and feed it with 10000 items
  4. Pause the xml reader. Try to insert the 10000 items. If it fails, retry every 3000ms until it succeed
  5. You may face some duplicate ID issues if the bulk operation is interrupted during execution, so ignore them (error code: 11000)

here is a sample script :

var fs = require('fs')
var Xml = require('xml-stream')

var MongoClient = require('mongodb').MongoClient
var url = 'mongodb://localhost:27017/test'

MongoClient.connect(url, {
  reconnectTries: 30,
  reconnectInterval: 1000,
  bufferMaxEntries: 0
}, function (err, db) {
  if (err != null) {
    console.log('connect error: ' + err)
  } else {
    var collection = db.collection('product')
    var bulk = collection.initializeUnorderedBulkOp()
    var totalSize = 500001
    var size = 0

    var fileStream = fs.createReadStream('data.xml')
    var xml = new Xml(fileStream)
    xml.on('endElement: product', function (product) {
      bulk.insert(product)
      size++
      // if we have enough product, save them using bulk insert
      if (size % 10000 == 0) {
        xml.pause()
        bulk.execute(function (err, result) {
          if (err == null) {
            bulk = collection.initializeUnorderedBulkOp()
            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try')
            xml.resume()
          } else {
            console.log('bulk insert failed: ' + err)
            counter = 0
            var retryInsert = setInterval(function () {
              counter++
              bulk.execute(function (err, result) {
                if (err == null) {
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else if (err.code === 11000) { // ignore duplicate ID error
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else {
                  console.log('failed after first try: ' + counter, 'error: ' + err)
                }
              })
            }, 3000) // retry every 3000ms until success
          }
        })
      } else if (size === totalSize) {
        bulk.execute(function (err, result) {
          if (err == null) {
            db.close()
          } else {
            console.log('bulk insert failed: ' + err)
          }
        })
      }
    })
  }
})

sample log output:

doc 0 : 10000 saved on first try
doc 10000 : 20000 saved on first try
doc 20000 : 30000 saved on first try
[...]
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0
doc 130000 : 140000 saved after 4 tries
doc 140000 : 150000 saved on first try
[...]
Posey answered 22/2, 2017 at 7:30 Comment(7)
Your answer does not provide information about the mongo write buffer and no solution on how to have all documents inserted even during mater change in replica set or disconnect. The information about bulk insert is interesting and I will have a look into it, thanks!Bollinger
@DanFromGermany yes, because to me it looks like you're trying to solve the wrong problem: the real problem is that your apps get disconnected from the database. With fewer call to the database, it would be easier to auto-reconnect, hence no need for write bufferingPosey
My apps don't get disconnected from the database. I want to write apps that in case of disconnects or master-switches in the replica set assert to reconnect and write all of the data.Bollinger
@DanFromGermany my bad I misunderstood the question, see the updated answer !Posey
I see you ignore duplicate id error. We have experienced this, when after reconnect, the mongo driver prints those errors, do you know why they come up?Bollinger
@DanFromGermany in my case it is because the disconnection occurs during the bulkInsert, ie mongodb inserted several documents in the database, but not all the documents that were supposed to be saved during the operation. When you retry the bulkinsert, mongodb try to insert all documents contained in the bulkOperation, even if some of them have already been saved. But in your case it's hard to tell as we don't know your configuration and the code your using...Posey
@DanFromGermany it might also be related to journalingPosey
S
1

I don't know specifically about Mongodb driver and this buffer of entries. Maybe it only keeps data in specific scenarios.

So I will answer to this question with a more general approach that can work with any database.

To summarize, you have two problems:

  1. You are not recovering from failed attempts
  2. XML stream send data too fast

To handle the first issue, you need to implement a retry algorithm that will ensure that many attempts are made before giving up.

To handle the second issue, you need to implement back pressure on the xml stream. You can do that using the pause method, the resume method and an input buffer.

var Promise = require('bluebird');
var fs = require('fs');
var Xml = require('xml-stream');

var fileStream = fs.createReadStream('myFile.xml');
var xml = new Xml(fileStream);

// simple exponential retry algorithm based on promises
function exponentialRetry(task, initialDelay, maxDelay, maxRetry) {
    var delay = initialDelay;
    var retry = 0;
    var closure = function() {
        return task().catch(function(error) {
            retry++;
            if (retry > maxRetry) {
                throw error
            }
            var promise = Promise.delay(delay).then(closure);
            delay = Math.min(delay * 2, maxDelay);
            return promise;
        })
    };
    return closure();
}

var maxPressure = 100;
var currentPressure = 0;
var suspended = false;
var stopped = false;
var buffer = [];

// handle back pressure by storing incoming tasks in the buffer
// pause the xml stream as soon as we have enough tasks to work on
// resume it when the buffer is empty
function writeXmlDataWithBackPressure(product) {
    // closure used to try to start a task
    var tryStartTask = function() {
        // if we have enough tasks running, pause the xml stream
        if (!stopped && !suspended && currentPressure >= maxPressure) {
            xml.pause();
            suspended = true;
            console.log("stream paused");
        }
        // if we have room to run tasks
        if (currentPressure < maxPressure) {
            // if we have a buffered task, start it
            // if not, resume the xml stream
            if (buffer.length > 0) {
                buffer.shift()();
            } else if (!stopped) {
                try {
                    xml.resume();
                    suspended = false;
                    console.log("stream resumed");
                } catch (e) {
                    // the only way to know if you've reached the end of the stream
                    // xml.on('end') can be triggered BEFORE all handlers are called
                    // probably a bug of xml-stream
                    stopped = true;
                    console.log("stream end");
                }
            }
        }
    };

    // push the task to the buffer
    buffer.push(function() {
        currentPressure++;
        // use exponential retry to ensure we will try this operation 100 times before giving up
        exponentialRetry(function() {
            return writeDataToDb(product)
        }, 100, 2000, 100).finally(function() {
            currentPressure--;
            // a task has just finished, let's try to run a new one
            tryStartTask();
        });
    });

    // we've just buffered a task, let's try to run it
    tryStartTask();
}

// write the product to database here :)
function writeDataToDb(product) {
    // the following code is here to create random delays and random failures (just for testing)
    var timeToWrite = Math.random() * 100;
    var failure = Math.random() > 0.5;
    return Promise.delay(timeToWrite).then(function() {
        if (failure) {
            throw new Error();
        }
        return null;
    })
}

xml.on('endElement: product', writeXmlDataWithBackPressure);

Play with it, put some console.log to understand how it behaves. I hope this will help you to solve your issue :)

Skippy answered 21/2, 2017 at 22:12 Comment(1)
That's basically a good implementation but I hoped to be able to make use of the internal write concern / write buffer of mongo - please have a look into this page and the keyword bufferMaxEntries.Bollinger

© 2022 - 2024 — McMap. All rights reserved.