限制同时进行的 Promise 数量

14
以下TypeScript逐个调用doSomething(action),一次只处理一个。(也就是说在第一个项目完成之前不会进行第二个项目的调用)。
async performActionsOneAtATime() {
    for (let action of listOfActions) {
        const actionResult = await doSomethingOnServer(action);
        console.log(`Action Done: ${actionResult}`);
    }
 }

这个会立即将所有请求发送到服务器(不等待任何响应):

async performActionsInParallel() {
    for (let action of listOfActions) {
        const actionResultPromise = doSomething(action);
        actionResultPromise.then((actionResult) => {
            console.log(`Action Done: ${actionResult}`);
        });
    }
}

但我真正需要的是一种限制它们速度的方法,也许同时打开10或20个调用。(一次一个太慢了,但全部600个将使服务器超载。)

但我很难弄清楚这一点。

你有什么建议可以控制每次X个调用吗?

(这个问题使用TypeScript,但我也可以接受 ES6 JavaScript 的答案。)


我使用underscore.js(我的首选js库)来处理这种事情。看看防抖和节流。 - pm100
@pm100 - 我看到的去抖动和节流实现都是基于时间的。(在调用之间等待x时间。)下划线是基于队列的吗?(同时进行X个调用。) - Vaccano
你可能想看一下我的答案。它有点基于队列(假设列表是一个具有pop()方法的数组)。一旦一个Promise返回,它就会通知列表开始获取下一个。 - kennasoft
7个回答

19
你可以用一个简短的函数来实现这个功能。(按照Mulan的建议按顺序返回值。谢谢!)

/**
 * Performs a list of callable actions (promise factories) so
 * that only a limited number of promises are pending at any
 * given time.
 *
 * @param listOfCallableActions An array of callable functions,
 *     which should return promises.
 * @param limit The maximum number of promises to have pending
 *     at once.
 * @returns A Promise that resolves to the full list of values
 *     when everything is done.
 */
function throttleActions(listOfCallableActions, limit) {
  // We'll need to store which is the next promise in the list.
  let i = 0;
  let resultArray = new Array(listOfCallableActions.length);

  // Now define what happens when any of the actions completes.
  // Javascript is (mostly) single-threaded, so only one
  // completion handler will call at a given time. Because we
  // return doNextAction, the Promise chain continues as long as
  // there's an action left in the list.
  function doNextAction() {
    if (i < listOfCallableActions.length) {
      // Save the current value of i, so we can put the result
      // in the right place
      let actionIndex = i++;
      let nextAction = listOfCallableActions[actionIndex];
      return Promise.resolve(nextAction()).then(result => {
        // Save results to the correct array index.
        resultArray[actionIndex] = result;
      }).then(doNextAction);
    }
  }

  // Now start up the original <limit> number of promises.
  // i advances in calls to doNextAction.
  let listOfPromises = [];
  while (i < limit && i < listOfCallableActions.length) {
    listOfPromises.push(doNextAction());
  }
  return Promise.all(listOfPromises).then(() => resultArray);
}

// Test harness:

function delay(name, ms) {
  return new Promise((resolve, reject) => setTimeout(() => {
    console.log(name);
    resolve(name);
  }, ms));
}

var ps = [];
for (let i = 0; i < 10; i++) {
  ps.push(() => {
    console.log(`begin ${i}`);
    return delay(`complete ${i}`, Math.random() * 3000);
  });
}

throttleActions(ps, 3).then(result => console.log(result));


非常感谢!限制承诺的唯一解决方案。 - Hamid Tavakoli

3

编辑

杰夫·鲍曼大大改进了他的答案以解决有意义的值。欢迎查看此答案的历史记录,了解为什么解决后的值如此重要/有用。


节流p

这个解决方案密切模仿了本地Promise.all

它是如何相同的...

  • 尽可能快地解析承诺
  • 按照相同的顺序解决一系列值
  • 在遇到一个拒绝时就拒绝

它的不同之处在于...

  • 号码参数限制同时运行的承诺数量
  • 数组输入接受承诺创作者(函数);而不是实际的承诺

// throttlep :: Number -> [(* -> Promise)]
const throttlep = n=> Ps=>
  new Promise ((pass, fail)=> {
    // r is the number of promises, xs is final resolved value
    let r = Ps.length, xs = []
    // decrement r, save the resolved value in position i, run the next promise
    let next = i=> x=> (r--, xs[i] = x, run(Ps[n], n++))
    // if r is 0, we can resolve the final value xs, otherwise chain next
    let run = (P,i)=> r === 0 ? pass(xs) : P().then(next(i), fail)
    // initialize by running the first n promises
    Ps.slice(0,n).forEach(run)
  })

// -----------------------------------------------------
// make sure it works

// delay :: (String, Number) -> (* -> Promise)
const delay = (id, ms)=>
  new Promise (pass=> {
    console.log (`running: ${id}`)
    setTimeout(pass, ms, id)
  })

// ps :: [(* -> Promise)]
let ps = new Array(10)
for (let i = 0; i < 10; i++) {
  ps[i] = () => delay(i, Math.random() * 3000)
}

// run a limit of 3 promises in parallel
// the first error will reject the entire pool
throttlep (3) (ps) .then (
  xs => console.log ('result:', xs),
  err=> console.log ('error:', err.message)
)

控制台输出

输入按顺序运行;解析的结果与输入顺序相同。

running: 0
running: 1
running: 2
=> Promise {}
running: 3
running: 4
running: 5
running: 6
running: 7
running: 8
running: 9
result: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

实际应用

让我们看一个更实际的代码示例。此代码的任务是从服务器获取一组图像。这是我们如何使用 throttlep 将同时请求的数量限制为每次最多 3 个。

// getImage :: String -> Promise<base64>
let getImage = url=> makeRequest(url).then(data => data.base64, reqErrorHandler)

// actions :: [(* -> Promise<base64>)]
let actions = [
  ()=> getImage('one.jpg'),
  ()=> getImage('two.jpg'),
  ()=> getImage('three.jpg'),
  ()=> getImage('four.jpg'),
  ()=> getImage('five.jpg')
]

// throttle the actions then do something...
throttlep (3) (actions) .then(results => {
  // results are guaranteed to be ordered the same as the input array
  console.log(results)
  // [<base64>, <base64>, <base64>, <base64>, <base64>]
})

谢谢您的反馈!我认为像Promise.all一样提供答案会很有用,但是在第一次提交中没有机会。我添加了6行来更新我的答案,并给予了您的功劳。 - Jeff Bowman
@JeffBowman 很高兴能与您合作。我也编辑了我的答案,以防止人们产生困惑 ^_^ - Mulan
你的测试风格提供了更干净的输出,所以我也从你那里借鉴了一下 ^_^ - Mulan
你的代码中有一个错误(在完成之前会引发多次异常)。可以通过更改run函数来解决: let run = (P,i)=> { if(r === 0) { pass(xs); } else if(P){ P().then(next(i), fail); } }; 感谢你和@JeffBowmansupportsMonica提供的解决方案! - A-S

2

这方面没有内置的功能,所以您需要自己构建。据我所知,目前还没有相关的库。

首先,从“延迟”开始——一种允许外部代码解决它的承诺:

class Deferral<T> {
    constructor() {
        this.promise = new Promise<T>((resolve, reject) => {
            this.resolve = resolve;
            this.reject = reject;
        });
    }

    promise: Promise<T>;
    resolve: (thenableOrResult?: T | PromiseLike<T>) => void;
    reject: (error: any) => void;
}

然后,您可以定义一个“等待队列”,它代表所有正在等待进入临界区的代码块:
class WaitQueue<T> {
    private deferrals: Deferral<T>[];

    constructor() {
        this.deferrals = [];
    }

    get isEmpty(): boolean {
        return this.deferrals.length === 0;
    }

    enqueue(): Promise<T> {
        const deferral = new Deferral<T>();
        this.deferrals.push(deferral);
        return deferral.promise;
    }

    dequeue(result?: T) {
        const deferral = this.deferrals.shift();
        deferral.resolve(result);
    }
}

最终,你可以定义一个异步信号量,如下所示:

export class AsyncSemaphore {
    private queue: WaitQueue<void>;
    private _count: number;

    constructor(count: number = 0) {
        this.queue = new WaitQueue<void>();
        this._count = count;
    }

    get count(): number { return this._count; }

    waitAsync(): Promise<void> {
        if (this._count !== 0) {
            --this._count;
            return Promise.resolve();
        }
        return this.queue.enqueue();
    }

    release(value: number = 1) {
        while (value !== 0 && !this.queue.isEmpty) {
            this.queue.dequeue();
            --value;
        }
        this._count += value;
    }
}

示例用法:

async function performActionsInParallel() {
    const semaphore = new AsyncSemaphore(10);
    const listOfActions = [...];
    const promises = listOfActions.map(async (action) => {
        await semaphore.waitAsync();
        try {
            await doSomething(action);
        }
        finally {
            semaphore.release();
        }
    });
    const results = await Promise.all(promises);
}

这个方法首先创建一个节流器,然后立即启动所有异步操作。每个异步操作都会先(异步地)等待信号量可用,然后执行操作,并最终释放信号量(允许其他操作进入)。当所有异步操作完成后,将检索所有结果。

警告:此代码100%未经测试。我甚至没有尝试过一次。


0
这是一个使用 async await 语法的节流函数版本:

async function throttle(tasks, max) {
    async function run(_, i) {
        values[i] = await tasks[i]();
        if (max < tasks.length) return run(_, max++);
    };
    const values = [];
    try {
        await Promise.all(tasks.slice(0, max).map(run));
    } catch (error) {
        max = tasks.length; // don't allow new tasks to start
        throw error;
    }
    return values;
}

// Demo
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));

const tasks = Array.from({length: 10}, (_, i) => 
    async () => {
        console.log(`task ${i} starts`);
        await delay((1 + i % 3)*1000);
        console.log(`task ${i} ends with ${i*10}`);
        return i*10;
    }
);

throttle(tasks, 4).then(console.log);


0

您可以使用发布-订阅模式来实现此功能。我也不熟悉TypeScript,并且我不知道这是在浏览器中还是后端中发生的。我将为此编写伪代码(假设它是后端):

//I'm assuming required packages are included e.g. events = require("events");
let limit = 10;
let emitter = new events.EventEmitter();

for(let i=0; i<limit; i++){
    fetchNext(listOfActions.pop());
}

function fetchNext(action){
    const actionResultPromise = doSomething(action);
    actionResultPromise.then((actionResult) => {
        console.log(`Action Done: ${actionResult}`);
        emitter.emit('grabTheNextOne', listOfActions.pop());
    });
}

emitter.on('grabTheNextOne', fetchNext);

EventEmitter 是 NodeJS 的一部分,如果你在 Node 中工作的话。如果在浏览器中,你可以使用普通的事件模型。这里的关键思想是发布-订阅模式。


0

以下是我使用TypeScript的看法:

function ParallelMap<T, U>(array: U[], callbackFn: (element: U, index?: number, array?: U[]) => Promise<T>, maxDegreeOfParallelism: number = -1) {
    if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);

    return new Promise<T[]>((resolve, reject) => {
        const inputArraySize = array.length;

        let indexTracker = 0;
        let completedTracker = 0;

        const output = new Array<T>(inputArraySize);
        const errors = new Array<{ index: number, error: any }>();

        const processNext = () => {
            const elementIndex = indexTracker++;
            const element = array[elementIndex];

            callbackFn(element, elementIndex, array).then(
                value => output[elementIndex] = value,
                reason => errors.push({ index: elementIndex, error: reason })
            ).finally(() => {
                ++completedTracker;

                if (completedTracker == inputArraySize) {
                    if (errors.length > 0) reject(errors);

                    else resolve(output);
                }

                else if (indexTracker < inputArraySize) processNext();
            });
        };

        for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
            processNext();
        }
    });
}

使用方法:

const maxDegreeOfParallelism = 3; // Number of concurrent tasks
const result = await ParallelMap(
    inputArray,
    async (value, index, array) => { /* Do something */ }, // Some async function to process each element
    maxDegreeOfParallelism
);

同样适用于JavaScript:

function ParallelMap(array, callbackFn, maxDegreeOfParallelism = -1) {
  if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);

  return new Promise((resolve, reject) => {
    const inputArraySize = array.length;

    let indexTracker = 0;
    let completedTracker = 0;

    const output = new Array(inputArraySize);
    const errors = new Array();

    const processNext = () => {
      const elementIndex = indexTracker++;
      const element = array[elementIndex];

      callbackFn(element, elementIndex, array).then(
        value => output[elementIndex] = value,
        reason => errors.push({
          index: elementIndex,
          error: reason
        })
      ).finally(() => {
        ++completedTracker;

        if (completedTracker == inputArraySize) {
          if (errors.length > 0) reject(errors);

          else resolve(output);
        } else if (indexTracker < inputArraySize) processNext();
      });
    };

    for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
      processNext();
    }
  });
}



// Usage

(async() => {
  const input = new Array(10).fill(1); // Array containing 10 '1' values

  const oneSecondTask = (value, index) => {
    return new Promise(resolve => {
      setTimeout(() => {
        resolve(value + index); // Extremely complex calculation of adding index to value 1
      }, 1000);
    });
  };

  console.log(`const input = [${input.join(', ')}];`);
  console.log(`---------------------------------------------`);
  console.log(`... wait for 10s ...`);
  console.log(`---------------------------------------------`);

  let start = Date.now();
  let maxDegreeOfParallelism = 1;
  let result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
  console.log(`const result = [${result.join(', ')}];`);
  console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) one at a time`);

  console.log(`---------------------------------------------`);

  start = Date.now();
  maxDegreeOfParallelism = 2;
  result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
  console.log(`const result = [${result.join(', ')}];`);
  console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);

  console.log(`---------------------------------------------`);

  start = Date.now();
  maxDegreeOfParallelism = 5;
  result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
  console.log(`const result = [${result.join(', ')}];`);
  console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);

  console.log(`---------------------------------------------`);

  start = Date.now();
  maxDegreeOfParallelism = 10;
  result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
  console.log(`const result = [${result.join(', ')}];`);
  console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
})();


0

可以通过生成器来限制 Promise 的速率。在下面的示例中,我们将它们限制在某个速度。

function asyncTask(duration = 1000) {
  return new Promise(resolve => {
    setTimeout(resolve, duration, duration)
  })
}


async function main() {
  const items = Array(10).fill(() => asyncTask()) {
    const generator = batchThrottle(3, ...items)
    console.log('batch', (await generator.next()).value)
    for await (let result of generator) {
      console.log('remaining batch', result)
    }
  }

  {
    const generator = streamThrottle(3, ...items)
    console.log('stream', await generator.next())
    for await (let result of generator) {
      console.log('remaining stream', result)
    }
  }

}

async function* batchThrottle(n = 5, ...items) {
  while (items.length) {
    const tasks = items.splice(0, n).map(fn => fn())
    yield Promise.all(tasks)
  }
}

async function* streamThrottle(n = 5, ...items) {
  while (items.length) {
    const tasks = items.splice(0, n).map(fn => fn())
    yield* await Promise.all(tasks)
  }
}
main().catch()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接