异步并行请求正在依次运行。

4

我正在使用Node.js运行服务器,并需要从另一个我正在运行的服务器(localhost:3001)请求数据。我需要向数据服务器发出许多请求(约200个),并收集数据(响应大小从约20Kb到约20Mb不等)。每个请求都是独立的,我希望将响应保存为一个形式为以下的巨大数组:

[{"urlAAA": responseAAA}, {"urlCCC": responseCCC}, {"urlBBB": responseBBB}, etc ]

请注意,项目的顺序不重要,理想情况下,它们应该按照数据可用的顺序填充数组。

var express = require('express');
var router = express.Router();
var async = require("async");
var papa = require("papaparse");
var sync_request = require('sync-request');
var request = require("request");

var pinnacle_data = {};
var lookup_list = [];
for (var i = 0; i < 20; i++) {
    lookup_list.push(i);
}

function write_delayed_files(object, key, value) {
    object[key] = value;
    return;
}

var show_file = function (file_number) {
    var file_index = Math.round(Math.random() * 495) + 1;
    var pinnacle_file_index = 'http://localhost:3001/generate?file=' + file_index.toString();
    var response_json = sync_request('GET', pinnacle_file_index);
    var pinnacle_json = JSON.parse(response_json.getBody('utf8'));
    var object_key = "file_" + file_number.toString();
    pinnacle_data[object_key] = pinnacle_json;
    console.log("We've handled file:    " + file_number);
    return;
};

async.each(lookup_list, show_file, function (err) {});



console.log(pinnacle_data);

/* GET contact us page. */
router.get('/', function (req, res, next) {
    res.render('predictionsWtaLinks', {title: 'Async Trial'});
});

module.exports = router;

现在当运行这个程序时,它会显示:
We've handled file:    0
We've handled file:    1
We've handled file:    2
We've handled file:    3
We've handled file:    4
We've handled file:    5
etc

现在由于文件大小不同,我原本期望这个程序可以“并行”执行请求,但实际上它似乎是按顺序执行的,这正是我试图通过使用async.each()来避免的。目前连接到数据服务器大约需要1-2秒,因此如果对许多文件执行此操作,时间会太长。
我意识到我正在使用同步请求,因此最好替换为:
var response_json = sync_request('GET', pinnacle_file_index);

使用类似于某物的东西

request(pinnacle_file_index, function (error, response, body) {
    if (!error && response.statusCode == 200) {
        pinnacle_data[object_key] = JSON.parse(body);
    }
});

非常感谢您的帮助。
此外,我已经尝试过:
将URL列表转换为匿名函数列表,并使用async.parallel(function_list,function(err,results){ //将结果添加到pinnacle_data []})。 (尝试为数组中的每个元素定义唯一函数时遇到问题)。
同样,我还查看了其他相关主题:
我已经尝试模仿Asynchronous http calls with nodeJS中的建议解决方案,但没有取得进展。 Node.js - Async.js: how does parallel execution work?How to do parallel async multiple requests at once with Promises in Node 编辑 - 工作解决方案
以下代码现在完成任务(每个请求需要约80ms,包括使用npm requestretry进行重复请求)。 同样,这很好地扩展,平均请求时间为~ 80ms,可在总共发出5个请求至1000个请求之间进行。
var performance = require("performance-now");
var time_start = performance();
var async = require("async");
var request_retry = require('requestretry');

var lookup_list = [];
var total_requests = 50;
for (var i = 0; i < total_requests; i++) {
    lookup_list.push(i);
}

var pinnacle_data = {};
async.map(lookup_list, function (item, callback) {
        var file_index = Math.round(Math.random() * 495) + 1;
        var pinnacle_file_index = 'http://localhost:3001/generate?file=' + file_index;
        request_retry({
                url: pinnacle_file_index,
                maxAttempts: 20,
                retryDelay: 20,
                retryStrategy: request_retry.RetryStrategies.HTTPOrNetworkError
            },
            function (error, response, body) {
                if (!error && response.statusCode == 200) {
                    body = JSON.parse(body);
                    var data_array = {};
                    data_array[file_index.toString()] = body;
                    callback(null, data_array);
                } else {
                    console.log(error);
                    callback(error || response.statusCode);
                }
            });
    },
    function (err, results) {
        var time_finish = performance();
        console.log("It took " + (time_finish - time_start).toFixed(3) + "ms to complete " + total_requests + " requests.");
        console.log("This gives an average rate of " + ((time_finish - time_start) / total_requests).toFixed(3) + " ms/request");
        if (!err) {
            for (var i = 0; i < results.length; i++) {
                for (key in results[i]) {
                    pinnacle_data[key] = results[i][key];
                }
            }
            var length_array = Object.keys(pinnacle_data).length.toString();
            console.log("We've got all the data, totalling " + length_array + " unique entries.");
        } else {
            console.log("We had an error somewhere.");
        }
    });

感谢您的帮助。
3个回答

5
正如您所发现的那样,async.parallel()只能并行处理本身是异步操作的任务。如果这些操作是同步的,由于node.js的单线程特性,这些操作将一个接一个地运行,而不是并行运行。但是,如果这些操作本身是异步的,那么async.parallel()(或其他异步方法)将同时启动它们,并为您协调结果。
以下是使用async.map()的一般思路。我使用async.map(),因为其思路是以数组作为输入,并以与原始顺序相同的顺序生成结果数组,但会并行运行所有请求,这似乎符合您的要求:
var async = require("async");
var request = require("request");

// create list of URLs
var lookup_list = [];
for (var i = 0; i < 20; i++) {
    var index = Math.round(Math.random() * 495) + 1;
    var url = 'http://localhost:3001/generate?file=' + index;
    lookup_list.push(url);
}

async.map(lookup_list, function(url, callback) {
    // iterator function
    request(url, function (error, response, body) {
        if (!error && response.statusCode == 200) {
            var body = JSON.parse(body);
            // do any further processing of the data here
            callback(null, body);
        } else {
            callback(error || response.statusCode);
        }
    });
}, function(err, results) {
    // completion function
    if (!err) {
        // process all results in the array here
        console.log(results);
        for (var i = 0; i < results.length; i++) {
            // do something with results[i]
        }
    } else {
        // handle error here
    }
});

而且,这里有一个使用Bluebird promises的版本,并且类似地使用Promise.map()迭代初始数组:
var Promise = require("bluebird");
var request = Promise.promisifyAll(require("request"), {multiArgs: true});

// create list of URLs
var lookup_list = [];
for (var i = 0; i < 20; i++) {
    var index = Math.round(Math.random() * 495) + 1;
    var url = 'http://localhost:3001/generate?file=' + index;
    lookup_list.push(url);
}

Promise.map(lookup_list, function(url) {
    return request.getAsync(url).spread(function(response, body) {
        if response.statusCode !== 200) {
            throw response.statusCode;
        }
        return JSON.parse(body);
    });
}).then(function(results) {
    console.log(results);
    for (var i = 0; i < results.length; i++) {
        // process results[i] here
    }
}, function(err) {
    // process error here
});

你在第一段代码块中所制作的解决方案模板效果非常好,使性能得以显著提升。但是我对其进行了修改,使用了requestretry而非request,因为我的另一个服务器在请求数量过多时会崩溃或打开太多文件,尽管在稍微延迟一会儿后重新尝试请求,大部分这些错误都会消失。感谢@jfriend00。 - oliversm
1
@oliversm - 如果您正在进行大量并行请求,以至于您不断地冲垮服务器,那么您可能需要使用 async.mapLimit(),其中您可以指定允许同时进行多少个请求。这对于仍然并行运行请求但又保护服务器免受洪水侵袭非常有用,并且.mapLimit()会为您完成所有工作。您只需传递一个额外的参数来指定允许的同时请求数量就可以了。这比淹没服务器、导致错误,然后重试要好得多。 - jfriend00
谢谢你的建议,目前我正在自己托管数据服务器,它并不是非常“重型”,因此这个限制减少了数据服务器生成的错误数量。然而,当我到达完整的生产版本时,理想情况下将会轰炸别人的服务器,这应该会更加“重型”,并且希望能够轻松处理200个同时请求。 - oliversm
@oliversm - 你真的不应该使用200个同时请求攻击其他服务器。许多为大规模设计的服务器将通过速率限制来保护其他人的服务质量,以防止单个端点这样做。由于实际上任何单个服务器都无法真正处理200个请求,因此以这种方式轰炸服务器几乎没有意义,因为大多数请求都将排队等待或者您可能会触发速率限制。我建议你将其降低到每次10-20个请求。 - jfriend00
谢谢信息,我会做的。 - oliversm
@ningappa - 我采纳了你的 multiArgs 建议。Bluebird 在他们的某个版本中更改了默认设置,这可能导致了你看到的问题。 - jfriend00

4
听起来你只是想同时下载一堆URL。这个代码可以做到:
var request = require('request');
var async = require('async');

var urls = ['http://microsoft.com', 'http://yahoo.com', 'http://google.com', 'http://amazon.com'];

var loaders = urls.map( function(url) {
  return function(callback) {
        request(url, callback);
  }
});

async.parallel(loaders, function(err, results) {
        if (err) throw(err); // ... handle appropriately
        // results will be an array of the results, in 
        // the same order as 'urls', even thought the operation
        // was done in parallel
        console.log(results.length); // == urls.length
});

甚至更简单,使用async.map:
var request = require('request');
var async = require('async');

var urls = ['http://microsoft.com', 'http://yahoo.com', 'http://google.com', 'http://amazon.com'];

async.map(urls, request, function(err, results) {
        if (err) throw(err);          // handle error 
        console.log(results.length);  // == urls.length
});

在传递到async.map函数之后,我们可以更改“urls”数组中推送的URL吗? - Udit Kumawat
你只需要从一个新数组中映射你的新URL,async.map会创建一系列异步执行的函数。如果你想添加更多的URL,只需重复这个过程即可。所以,即使你修改了urls数组,已经使用async.map创建的函数也不会受到影响。 - caasjj
我可以使用第二种方式将响应与其相应的响应进行映射吗? - TGW

0

试试这个:

var async = require("async");
var request = require("request");
var show_file = function (file_number,cb) {
    //..Sync ops
     var file_index = Math.round(Math.random() * 495) + 1;
     var pinnacle_file_index = 'http://localhost:3001/generate?file='+file_index.toString();
    //request instance from Request npm Module
    //..Async op --> this should make async.each asynchronous
    request(pinnacle_file_index, function (error, response, body) {
       if(error)
           return cb(error);
       var object_key = "file_" + file_number.toString();
      pinnacle_data[object_key] = JSON.parse(body);
      return cb();
    });
};

async.each(
  lookup_list, 
  show_file,
  function (err) {
    if(err){
       console.log("Error",err);
    }else{
       console.log("Its ok");
       console.log(pinnacle_data);
   }
});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接