如何在node.js中在断开连接期间缓冲MongoDB插入操作?

13

我们使用xml-stream读取一个包含大约500k个元素的XML文件,并以以下方式将它们插入到MongoDB中:

xml.on(`endElement: product`, writeDataToDb.bind(this, "product"));

writeDataToDb(type, obj)中的插入操作应该如下:

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { });

当Mongo连接断开时,XML流仍在读取,控制台会不断地显示错误消息(无法插入、已断开连接、EPIPE broken等)。

文档中提到:

当你关闭mongod进程时,驱动程序停止处理操作,并将它们缓冲,因为默认情况下bufferMaxEntries为-1,意味着缓冲所有操作。

这个缓冲区实际上是做什么的?

我们注意到当我们插入数据并关闭mongo服务器时,事情被缓冲了,然后我们重新启动mongo服务器,原生驱动程序成功地重新连接,节点继续插入数据,但是在mongo掉线期间缓冲的文档(不再被插入)不再被插入。

所以我对这个缓冲区及其使用表示怀疑。

目标:

我们正在寻找在Mongo返回之前(根据wtimeout为15000毫秒),最好的方法是将插入保留在缓冲区中,然后插入缓冲的文档,或者利用xml.pause();xml.resume(),我们尝试过没有成功。

基本上,我们需要一点帮助来处理断开连接而不会丢失数据或中断。


无法复制此问题,文档中的示例和使用xml-stream进行的测试都会在Mongo服务器恢复后插入缓冲区对象。也许您可以发布更多代码/提供有关设置的更多信息? - cviejo
@cviejo,我不能分享我的脚本,因为它们与公司有关,但您是否介意向我发送您尝试复制此脚本的脚本?Gist/pastebin可以。 - Daniel W.
2个回答

2
使用insertOne()插入500K个元素是一个非常糟糕的想法。相反,您应该使用允许您在单个请求中插入多个文档的批量操作。(这里例如10000个,所以可以在50个单个请求中完成)为了避免缓冲问题,您可以手动处理它:
  1. 使用bufferMaxEntries: 0禁用缓冲
  2. 设置重新连接属性:reconnectTries:30,reconnectInterval:1000
  3. 创建bulkOperation并将其用10000个项目进行填充
  4. 暂停xml阅读器。尝试插入10000个项目。如果失败,请每3000ms重试,直到成功。
  5. 如果批量操作在执行期间被中断,则可能会遇到一些重复ID问题,因此请忽略它们(错误代码:11000)
以下是一个样本脚本:
var fs = require('fs')
var Xml = require('xml-stream')

var MongoClient = require('mongodb').MongoClient
var url = 'mongodb://localhost:27017/test'

MongoClient.connect(url, {
  reconnectTries: 30,
  reconnectInterval: 1000,
  bufferMaxEntries: 0
}, function (err, db) {
  if (err != null) {
    console.log('connect error: ' + err)
  } else {
    var collection = db.collection('product')
    var bulk = collection.initializeUnorderedBulkOp()
    var totalSize = 500001
    var size = 0

    var fileStream = fs.createReadStream('data.xml')
    var xml = new Xml(fileStream)
    xml.on('endElement: product', function (product) {
      bulk.insert(product)
      size++
      // if we have enough product, save them using bulk insert
      if (size % 10000 == 0) {
        xml.pause()
        bulk.execute(function (err, result) {
          if (err == null) {
            bulk = collection.initializeUnorderedBulkOp()
            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try')
            xml.resume()
          } else {
            console.log('bulk insert failed: ' + err)
            counter = 0
            var retryInsert = setInterval(function () {
              counter++
              bulk.execute(function (err, result) {
                if (err == null) {
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else if (err.code === 11000) { // ignore duplicate ID error
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else {
                  console.log('failed after first try: ' + counter, 'error: ' + err)
                }
              })
            }, 3000) // retry every 3000ms until success
          }
        })
      } else if (size === totalSize) {
        bulk.execute(function (err, result) {
          if (err == null) {
            db.close()
          } else {
            console.log('bulk insert failed: ' + err)
          }
        })
      }
    })
  }
})

示例日志输出:

doc 0 : 10000 saved on first try
doc 10000 : 20000 saved on first try
doc 20000 : 30000 saved on first try
[...]
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0
doc 130000 : 140000 saved after 4 tries
doc 140000 : 150000 saved on first try
[...]

你的回答没有提供有关Mongo写缓冲区的信息,也没有解决在副本集主节点更改或断开连接时如何插入所有文档的问题。关于批量插入的信息很有趣,我会去看一下,谢谢! - Daniel W.
@DanFromGermany 是的,因为在我看来,你试图解决错误的问题:真正的问题是你的应用程序与数据库断开连接。如果减少对数据库的调用,自动重新连接将更容易实现,因此无需写入缓冲。 - felix
我的应用程序不会与数据库断开连接。我想编写这样的应用程序,即在断开连接或副本集中的主开关情况下,断言重新连接并写入所有数据。 - Daniel W.
@DanFromGermany 我错了,我误解了问题,请查看更新后的答案! - felix
我看到你忽略了重复的ID错误。我们曾经遇到过这种情况,在重新连接后,Mongo驱动程序会打印出这些错误,你知道它们为什么会出现吗? - Daniel W.
显示剩余2条评论

1
我不了解具体的Mongodb驱动程序和这个条目缓冲区。也许它只在特定情况下保留数据。
因此,我将用更通用的方法回答这个问题,适用于任何数据库。
总之,您有两个问题:
1. 您无法从失败的尝试中恢复 2. XML流发送数据过快
为了处理第一个问题,您需要实现一个重试算法,以确保在放弃之前进行多次尝试。
为了处理第二个问题,您需要在xml流上实施反压。您可以使用“暂停”方法、“恢复”方法和输入缓冲区来实现。
var Promise = require('bluebird');
var fs = require('fs');
var Xml = require('xml-stream');

var fileStream = fs.createReadStream('myFile.xml');
var xml = new Xml(fileStream);

// simple exponential retry algorithm based on promises
function exponentialRetry(task, initialDelay, maxDelay, maxRetry) {
    var delay = initialDelay;
    var retry = 0;
    var closure = function() {
        return task().catch(function(error) {
            retry++;
            if (retry > maxRetry) {
                throw error
            }
            var promise = Promise.delay(delay).then(closure);
            delay = Math.min(delay * 2, maxDelay);
            return promise;
        })
    };
    return closure();
}

var maxPressure = 100;
var currentPressure = 0;
var suspended = false;
var stopped = false;
var buffer = [];

// handle back pressure by storing incoming tasks in the buffer
// pause the xml stream as soon as we have enough tasks to work on
// resume it when the buffer is empty
function writeXmlDataWithBackPressure(product) {
    // closure used to try to start a task
    var tryStartTask = function() {
        // if we have enough tasks running, pause the xml stream
        if (!stopped && !suspended && currentPressure >= maxPressure) {
            xml.pause();
            suspended = true;
            console.log("stream paused");
        }
        // if we have room to run tasks
        if (currentPressure < maxPressure) {
            // if we have a buffered task, start it
            // if not, resume the xml stream
            if (buffer.length > 0) {
                buffer.shift()();
            } else if (!stopped) {
                try {
                    xml.resume();
                    suspended = false;
                    console.log("stream resumed");
                } catch (e) {
                    // the only way to know if you've reached the end of the stream
                    // xml.on('end') can be triggered BEFORE all handlers are called
                    // probably a bug of xml-stream
                    stopped = true;
                    console.log("stream end");
                }
            }
        }
    };

    // push the task to the buffer
    buffer.push(function() {
        currentPressure++;
        // use exponential retry to ensure we will try this operation 100 times before giving up
        exponentialRetry(function() {
            return writeDataToDb(product)
        }, 100, 2000, 100).finally(function() {
            currentPressure--;
            // a task has just finished, let's try to run a new one
            tryStartTask();
        });
    });

    // we've just buffered a task, let's try to run it
    tryStartTask();
}

// write the product to database here :)
function writeDataToDb(product) {
    // the following code is here to create random delays and random failures (just for testing)
    var timeToWrite = Math.random() * 100;
    var failure = Math.random() > 0.5;
    return Promise.delay(timeToWrite).then(function() {
        if (failure) {
            throw new Error();
        }
        return null;
    })
}

xml.on('endElement: product', writeXmlDataWithBackPressure);

玩一下它,加些console.log来了解它的行为。希望这能帮助您解决问题 :)


这基本上是一个不错的实现,但我希望能够利用Mongo的内部写入关注点/写入缓冲区 - 请查看此页面和关键字bufferMaxEntries - Daniel W.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接