由于每秒限制,需要对API请求进行节流和排队处理。

76

我使用mikeal/request进行API调用。其中最常用的API之一(Shopify API)最近发布了新的调用限制,我看到了这样的错误:

Exceeded 6.0 calls per second for api client. Slow your requests or contact support for higher limits.

我已经升级过了,但无论我得到多少带宽,我都必须考虑这个问题。Shopify API的大部分请求都在async.map()函数中,该函数循环进行异步请求并收集响应体。

我正在寻找任何帮助,例如一个已经存在的库,可以包装请求模块并实际地阻塞、休眠、限制、管理许多同时发生的异步请求,并将它们限制为每次6个请求。如果这样的项目不存在,我也没有问题去解决它。我只是不知道如何处理这种情况,希望能有一些标准可依。

我向mikeal/request提交了一个工单。


不开玩笑,我终于受够了ElasticTranscoder的用户界面,并编写了代码通过JS SDK使用API,但立即遇到了这些限制。 - rainabba
1
在2018年,有一个名为rate-limiter-flexible的软件包可以完成这项工作。 - Animir
有人能提供Java解决方案吗? - nil96
10个回答

40

为了寻求替代方案,我使用node-rate-limiter来包裹请求函数,代码如下:

var request = require('request');
var RateLimiter = require('limiter').RateLimiter;

var limiter = new RateLimiter(1, 100); // at most 1 request every 100 ms
var throttledRequest = function() {
    var requestArgs = arguments;
    limiter.removeTokens(1, function() {
        request.apply(this, requestArgs);
    });
};

我会调查一下!非常感谢! - ThomasReggi
22
我是node-rate-limiter的作者。对于所述问题,这个库可能更适合,因为async.queue()仅限制并发,没有时间概念。API速率限制通常基于时间(例如每秒最多6次调用),可以表示为var limiter = new RateLimiter(6, 'second');。它与oibackoff这样的解决方案是互补的,后者会在达到速率限制后改变行为。 - jhurliman
我能否将其作为整体来处理所有请求,还是需要逐个处理?我的意思是,我能否将其放在中间件中?如果可以,它将如何应用于所有端点或每个端点? - Kamalakannan J
2
这只是限制调用还是也作为排队机制工作?意思是,如果我超过了限制,它会将请求排队,并在限制刷新后重新开始调用吗? - ChickenWing24
是的,它会排队。我认为发生的情况是node-rate-limiter在令牌可用时回调您。 - Dmitry Chornyi

24

npmsimple-rate-limiter似乎是解决这个问题的非常好的解决方案。

此外,它比node-rate-limiterasync.queue更易于使用。

这里是一个片段,展示了如何将所有请求限制为每秒十个。

var limit = require("simple-rate-limiter");
var request = limit(require("request")).to(10).per(1000);

非常好用且简单的建议。谢谢! - Paula Fleck

21

我也曾在各种API中遇到了同样的问题,AWS也因此而闻名。

有几种方法可以解决这个问题。您提到了async.map()函数。您尝试过async.queue()吗?队列方法应该允许您设置一个固定的限制(例如6),任何超过这个数量的内容都会被放入队列中。

另一个有用的工具是oibackoff。该库将允许您在从服务器收到错误消息时备份您的请求并重试。

将这两个库包装起来可能很有用,以确保您的两个基本需求都得到满足:使用async.queue确保不超出限制,使用oibackoff确保如果服务器告诉您发生错误,则可以再次尝试获取您的请求。


1
我将深入研究这两个建议。我唯一的问题是我的async.maps分散并嵌套在彼此之中。因此,我不能仅用async.queue替换它们,因为我仍然无法保证对API的请求每次只有6个。它们将是6 * 每个async.queue。但我认为球已经滚动了? - ThomasReggi
3
https://caolan.github.io/async/docs.html#queue 不限制速率(每秒/每分钟)。它仅限于异步操作数量。 - Manohar Reddy Poreddy

11

我的解决方案使用现代纯 JavaScript:

function throttleAsync(fn, wait) {
  let lastRun = 0;

  async function throttled(...args) {
    const currentWait = lastRun + wait - Date.now();
    const shouldRun = currentWait <= 0;

    if (shouldRun) {
      lastRun = Date.now();
      
      return await fn(...args);
    } else {
      return await new Promise(function(resolve) {
        setTimeout(function() {
          resolve(throttled(...args));
        }, currentWait);
      });
    }
  }

  return throttled;
}

// Usage:

const run = console.log.bind(console);
const throttledRun = throttleAsync(run, 1000);

throttledRun(1); // Will execute immediately.
throttledRun(2); // Will be delayed by 1 second.
throttledRun(3); // Will be delayed by 2 second.


8

在async模块中,此请求的功能被关闭为“不会修复”。

有一种使用LeakyBucket或Token Bucket模型的解决方案,它作为RateLimiter实施在“limiter” npm模块中。

RateLimiter,请参见此处的示例:https://github.com/caolan/async/issues/1314#issuecomment-263715550

另一种方法是使用PromiseThrottle,我使用了这个,下面是工作示例:

var PromiseThrottle = require('promise-throttle');
let RATE_PER_SECOND = 5; // 5 = 5 per second, 0.5 = 1 per every 2 seconds

var pto = new PromiseThrottle({
    requestsPerSecond: RATE_PER_SECOND, // up to 1 request per second
    promiseImplementation: Promise  // the Promise library you are using
});

let timeStart = Date.now();
var myPromiseFunction = function (arg) {
    return new Promise(function (resolve, reject) {
        console.log("myPromiseFunction: " + arg + ", " + (Date.now() - timeStart) / 1000);
        let response = arg;
        return resolve(response);
    });
};

let NUMBER_OF_REQUESTS = 15;
let promiseArray = [];
for (let i = 1; i <= NUMBER_OF_REQUESTS; i++) {
    promiseArray.push(
            pto
            .add(myPromiseFunction.bind(this, i)) // passing am argument using bind()
            );
}

Promise
        .all(promiseArray)
        .then(function (allResponsesArray) { // [1 .. 100]
            console.log("All results: " + allResponsesArray);
        });

输出:

myPromiseFunction: 1, 0.031
myPromiseFunction: 2, 0.201
myPromiseFunction: 3, 0.401
myPromiseFunction: 4, 0.602
myPromiseFunction: 5, 0.803
myPromiseFunction: 6, 1.003
myPromiseFunction: 7, 1.204
myPromiseFunction: 8, 1.404
myPromiseFunction: 9, 1.605
myPromiseFunction: 10, 1.806
myPromiseFunction: 11, 2.007
myPromiseFunction: 12, 2.208
myPromiseFunction: 13, 2.409
myPromiseFunction: 14, 2.61
myPromiseFunction: 15, 2.811
All results: 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15

我们可以从输出中清楚地看到速率,即每秒5个调用。

6

其他解决方案不符合我的口味。进一步研究后,我发现了promise-ratelimit,它提供了一个可以简单地await的API:

var rate = 2000 // in milliseconds
var throttle = require('promise-ratelimit')(rate)

async function queryExampleApi () {
  await throttle()
  var response = await get('https://api.example.com/stuff')
  return response.body.things
}

上述示例将确保您最多每2000毫秒向api.example.com发出一次查询。换句话说,第一个请求不会等待2000毫秒。

2

这是我的解决方案,使用一个库request-promise或者axios并将调用包装在这个Promise中。

var Promise = require("bluebird")

// http://stackoverflow.com/questions/28459812/way-to-provide-this-to-the-global-scope#28459875
// https://dev59.com/mYbca4cB1Zd3GeqPSzek

module.exports = promiseDebounce

function promiseDebounce(fn, delay, count) {
  var working = 0, queue = [];
  function work() {
    if ((queue.length === 0) || (working === count)) return;
    working++;
    Promise.delay(delay).tap(function () { working--; }).then(work);
    var next = queue.shift();
    next[2](fn.apply(next[0], next[1]));
  }
  return function debounced() {
    var args = arguments;
    return new Promise(function(resolve){
      queue.push([this, args, resolve]);
      if (working < count) work();
    }.bind(this));
  }

1

这里有很多很好的选项,以下是我在一个项目中使用的选项。

axios-request-throttle

用法:

import axios from 'axios';
import axiosThrottle from 'axios-request-throttle';

axiosThrottle.use(axios, { requestsPerSecond: 5 });

1
我使用async-sema模块来处理HTTP请求的限流。这意味着它允许您以一定的速率发送HTTP请求。
以下是一个示例:
在Node.js服务器上,将express-rate-limit中间件添加到API中,以使API具有速率限制功能。假设这是您的Shopify API。

server.ts:

import express from 'express';
import rateLimit from 'express-rate-limit';
import http from 'http';

const port = 3000;
const limiter = new rateLimit({
  windowMs: 1000,
  max: 3,
  message: 'Max RPS = 3',
});

async function createServer(): Promise<http.Server> {
  const app = express();

  app.get('/place', limiter, (req, res) => {
    res.end('Query place success.');
  });

  return app.listen(port, () => {
    console.log(`Server is listening on http://localhost:${port}`);
  });
}

if (require.main === module) {
  createServer();
}

export { createServer };

在客户端,我们希望使用并发 = 3 和每秒限制之间的HTTP请求发送。我将客户端代码放在测试用例中,所以不要感到奇怪。

server.test.ts:

import { RateLimit } from 'async-sema';
import rp from 'request-promise';
import { expect } from 'chai';
import { createServer } from './server';
import http from 'http';

describe('20253425', () => {
  let server: http.Server;
  beforeEach(async () => {
    server = await createServer();
  });
  afterEach((done) => {
    server.close(done);
  });
  it('should throttle http request per second', async () => {
    const url = 'http://localhost:3000/place';
    const n = 10;
    const lim = RateLimit(3, { timeUnit: 1000 });

    const resArr: string[] = [];
    for (let i = 0; i < n; i++) {
      await lim();
      const res = await rp(url);
      resArr.push(res);
      console.log(`[${new Date().toLocaleTimeString()}] request ${i + 1}, response: ${res}`);
    }

    expect(resArr).to.have.lengthOf(n);
    resArr.forEach((res) => {
      expect(res).to.be.eq('Query place success.');
    });
  });
});

测试结果,请注意请求时间

  20253425
Server is listening on http://localhost:3000
[8:08:17 PM] request 1, response: Query place success.
[8:08:17 PM] request 2, response: Query place success.
[8:08:17 PM] request 3, response: Query place success.
[8:08:18 PM] request 4, response: Query place success.
[8:08:18 PM] request 5, response: Query place success.
[8:08:18 PM] request 6, response: Query place success.
[8:08:19 PM] request 7, response: Query place success.
[8:08:19 PM] request 8, response: Query place success.
[8:08:19 PM] request 9, response: Query place success.
[8:08:20 PM] request 10, response: Query place success.
    ✓ should throttle http request per second (3017ms)


  1 passing (3s)

0

我在寻找有关异步函数节流的片段,但没有发现能够考虑调用状态的相关内容。

如果您不想允许并行函数调用,并且不允许在函数完成后的一段时间内再次调用该函数,则以下是我想到的代码。

/**
 * Throttles async function. Takes into account the function call duration and waits
 * extra wait milliseconds after the function call is done.
 * If the throttled function is called during the execution or wait state the call
 * arguments are stored and the last ones are used to call the function at the end
 * of the waiting state.
 *
 * Throttling example of function having one number argument and wait = 5000ms:
 * <pre>
 *   Time                       0s         3s            8s             15s   20s
 *   Outside call (argument)    1               2   3  4    5
 *   Inside call (argument)     1 -------> OK            4 -----------> OK    5 ------> OK
 * </pre>
 *
 * @param func function to throttle
 * @param wait waiting duration after the function call is finished
 */
export function throttleAsync<A extends unknown[], R>(func: (...args: A) => Promise<R>, wait: number) {
  // currently invoked function promise
  let promise: Promise<R> | undefined;

  // last call arguments during the function invocation or waiting state
  let lastDefferedArgs: A | undefined;

  function throttled(...args: A) {
    // function is not running and we are not waiting
    if (!promise) {
      // invoke the function
      promise = func(...args).finally(() => {
        // invocation is done, now wait extra 'wait' milliseconds
        window.setTimeout(() => {
          // then set the promise to undefined allowing subsequent invocations
          promise = undefined;
          const defferedArgs = lastDefferedArgs;
          lastDefferedArgs = undefined;

          // there was some deffered invocation - invoke now with latest arguments
          if (defferedArgs) {
            throttled(...defferedArgs);
          }
        }, wait);
      });
    } else {
      // function is running or we are waiting - store arguments to deffered invocation
      lastDefferedArgs = args;
    }
  }

  return throttled;
}

Codepen

程序设计相关的内容

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接