NodeJS异步队列太快(减缓异步队列方法)

5
我有一个HTTP Get请求,希望解析响应并将其保存到我的数据库中。
如果我独立调用crawl(i),我可以获得良好的结果。但是我必须从1到2000调用crawl()。我得到了很好的结果,但是有些响应似乎丢失了,而且有些响应是重复的。我不认为我知道如何调用数千个异步函数。我正在使用async模块队列函数,但到目前为止,我仍然缺少一些数据,并且仍然存在一些重复项。我在这里做错了什么?谢谢你的帮助。 我正在爬取的内容 我的节点函数:
 function getOptions(i) {
    return {
        host: 'magicseaweed.com',
        path: '/syndicate/rss/index.php?id='+i+'&unit=uk',
        method: 'GET'
    }
};

function crawl(i){
var req = http.request(getOptions(i), function(res) {
    res.on('data', function (body) {
        parseLocation(body);
    });
});
req.end();

}

function parseLocation(body){
    parser.parseString(body, function(err, result) {
        if(result && typeof result.rss != 'undefined') {
            var locationTitle = result.rss.channel[0].title;
            var locationString = result.rss.channel[0].item[0].link[0];
            var location = new Location({
                id: locationString.split('/')[2],
                name: locationTitle
            });
            location.save();
        }
    });
  }

N = 2 //# of simultaneous tasks
var q = async.queue(function (task, callback) {
        crawl(task.url);
        callback();
}, N);


q.drain = function() {
    console.log('Crawling done.');
}

for(var i = 0; i < 100; i++){
   q.push({url: 'http://magicseaweed.com/syndicate/rss/index.php?id='+i+'&unit=uk'});
}

[编辑] 经过大量测试,似乎我正在爬取的服务无法处理那么快的许多请求。因为当我按顺序执行每个请求时,我可以得到所有良好的响应。

有没有一种方法可以减缓异步队列方法的速度?


现在似乎有很多请求失败了...我该如何确保请求不会失败? - William Fortin
дҪ е°қиҜ•дҪҝз”ЁsetTimeOutжқҘ延иҝҹйҳҹеҲ—е·ҘдҪңеҮҪж•°дёӯзҡ„callback()и°ғз”ЁдәҶеҗ—пјҹиҝҷж ·еҸҜд»ҘеҮҸзј“йҳҹеҲ—дёӯд»»еҠЎзҡ„жү§иЎҢйҖҹеәҰгҖӮ - max
5个回答

18
你应该看一下这个很棒的模块 async,它可以简化异步任务。你可以使用队列,下面是一个简单的例子:
N = # of simultaneous tasks
var q = async.queue(function (task, callback) {
    somehttprequestfunction(task.url, function(){
    callback();
    } 
}, N);


q.drain = function() {
    console.log('all items have been processed');
}

for(var i = 0; i < 2000; i++){
   q.push({url:"http://somewebsite.com/"+i+"/feed/"});
}

它将具有进行中操作的窗口,如果您仅调用回调函数,则任务房间将可用于将来的任务。不同之处在于,您的代码现在立即打开2000个连接,显然故障率很高。将其限制为合理值,例如5、10、20(取决于网站和连接)将导致更好的成功率。如果请求失败,您可以随时尝试重新请求,或者将任务推到另一个异步队列进行另一次尝试。关键点是在队列函数中调用callback(),以便在完成后为房间腾出空间。


我尝试了你的解决方案,但仍然存在重复和许多缺失的响应。我已经更新了我的问题和代码,请问你能否再解释一下?谢谢。 - William Fortin
由于您的请求很简单,您可以使用 request 模块。目前我看不出您的代码有什么问题。 - Mustafa
有没有办法限制每秒从队列中弹出的项目数量? - Michael
你没有考虑到 Node.js 没有类似于 C++ 或 VB 中正在处理的函数标准阻塞效应,因此,一旦调用被执行,它就会继续进行下一个调用,而不等待响应。为了解决这个问题,您应该使用 promises(搜索 Node.js http 使用 promises)。使用 async 和 promises,可以在服务器上创建任务缓冲区,因此您的代码不会受到大量连接和响应延迟的影响。简而言之,如果您的爬行函数只有顺序操作(如数学),那么一切都将完美运行。 - Thiago Conrado
由于某些原因,q.drain 没有被触发。将 q.drain = function() {} 更改为箭头函数 q.drain(() => {}) 解决了我的问题。 - Jiří Zahálka
感谢您提供的解决方案,#N选项是关键,因为我使用的API每秒只能调用10次。 - Jiří Zahálka

10
var q = async.queue(function (task, callback) {
    crawl(task.url);
    callback();
}, N);

在启动前一个任务后立即执行下一个任务,这样队列就没有意义了。你应该像这样修改你的代码:

// first, modify your 'crawl' function to take a callback argument, and call this callback after the job is done.

// then
var q = async.queue(function (task, next/* name this argument as 'next' is more meaningful */) {
    crawl(task.url, function () {
        // after this one is done, start next one.
        next();
    });     
    // or, more simple way, crawl(task.url, next);
}, N);

1

如果您想要的话,另一种选择是使用纯粹的JavaScript而不是花哨的库。

var incrementer = 0;
var resultsArray = [];

var myInterval = setInterval(function() {
    incrementer++
    if(incrementer == 100){
        clearInterval(myInterval)
        //when done parse results array
    }
    //make request here
    //push request result to array here

}, 500);

每半秒调用该函数。强制同步并在x个请求后退出的简单方法。

我们曾经使用过这种方法,但它确实存在问题:像任何与HTTP相关的服务一样,都有限制。虽然您可以了解最大请求速率并调整间隔以适应它,但如果服务器出现故障,之后的所有请求都将收到503(请求太多)错误,或者您必须将限制保持较低以弥补此问题。 - Thiago Conrado

0

我知道我有点晚回答这个问题,但是这里是我写的一个解决方案,用于在测试API端点时减缓请求数量,使用Node 4或Node 5:

var fs = require('fs');
var supertest = require('supertest');
var request = supertest("http://sometesturl.com/api/test/v1/")
var Helper = require('./check.helper');
var basicAuth = Helper.basicAuth;
var options = Helper.options;

fs.readFile('test.txt', function(err, data){
  var parsedItems = JSON.parse(data);
  var urlparts = []
  // create a queue
  for (let year of range(1975, 2016)) {
    for (var make in parsedItems[year]){
      console.log(year, make, '/models/' + year + '/' + make)
      urlparts.push({urlpart:'/models/' + year + '/' + make, year: year, make: make})
    }
  }
  // start dequeue
  waitDequeue();

  // This function calls itself after the makeRequest promise completes
  function waitDequeue(){
    var item = urlparts.pop()
    if (item){
      makeRequest(item)
        .then(function(){
          // wait this time before next dequeue
          setTimeout(function() {
            waitDequeue();
          }, 3000);
        })
    } else {
      write(parsedItems)
    }
  }

  // make a request, mutate parsedItems then resolve
  function makeRequest(item){
    return new Promise((resolve, reject)=>{
      request
        .get(item.urlpart)
        .set(options.auth[0], options.auth[1])
        .set(options.type[0], options.type[1])
        .end(function(err, res) {
          if (err) return done1(err);
          console.log(res.body)
          res.body.forEach(function(model){
            parsedItems[item.year][item.make][model] = {}
          });
          resolve()
        })
      })
  }

  // write the results back to the file
  function write(parsedItems){
    fs.writeFile('test.txt', JSON.stringify(parsedItems, null, 4), function(err){
      console.log(err)
    })
  }

})

0
有点晚了,但我发现这个方法可行! 使用异步函数,你可以在任务处理程序中使用 whilst 函数来减缓队列的速度,例如:
var q = async.priorityQueue(function(task, callback) {
// your code process here for each task
//when ready to complete the task delay it by calling
async.whilst( //wait 6 seconds
   function() {         
        return count < 10;      
   },
   function(callback) {
      count++;
      setTimeout(function() {
        callback(null, count);
      }, 1000);
   },
   function (err, n) {
    // n seconds have passed  
    callback(); //callback to q handler 
   }
 ); //whilst
} , 5);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接