JavaScript数组.reduce与async/await

162

似乎在将async/await与.reduce()结合时遇到了一些问题,如下所示:

const data = await bodies.reduce(async(accum, current, index) => {
  const methodName = methods[index]
  const method = this[methodName]
  if (methodName == 'foo') {
    current.cover = await this.store(current.cover, id)
    console.log(current)
    return {
      ...accum,
      ...current
    }
  }
  return {
    ...accum,
    ...method(current.data)
  }
}, {})
console.log(data)

this.store 完成之前,data 对象被记录下来...

我知道你可以在异步循环中使用 Promise.all,但这是否也适用于 .reduce()

11个回答

265

问题在于你的累加器值是 promises - 它们是 async function 的返回值。要实现顺序求值(并且使得除了最后一次迭代之外的所有迭代都被等待),你需要使用

const data = await array.reduce(async (accumP, current, index) => {
  const accum = await accumP;
  …
}, Promise.resolve(…));

然而,对于 async/await,我普遍建议使用普通循环而不是数组迭代方法,它们更高效并且通常更简单。


6
谢谢您最后的建议。我最终使用了简单的for循环完成了我的任务,代码行数相同,但更易于阅读... - cs_pupil
5
reduceinitialValue 不需要是一个 Promise,但在大多数情况下,使用 Promise 可以更清晰地表达意图。 - EECOLOR
3
在使用TypeScript时,初始值需要是一个Promise,因为回调函数的返回类型必须始终匹配累加器的类型。 - jessedvrs
在同步的 reduce 中,你可以将整个调用包装在 try/catch 中,异常会按预期传播。你不需要单独包装每个可能抛出异常的代码部分 - 这也可能是处理它们的错误位置不正确。但是对于异步的 reducer,只有在你正确(立即)await 处理的每个 promise - 包括累加器 promise 时,编写 try { await array.reduce(…) } catch … 才能正常工作。 - Bergi
让我们在聊天中继续这个讨论 - inwerpsel
显示剩余16条评论

10
当前被接受的答案建议使用Promise.all()而不是asyncreduce。然而,这与asyncreduce的行为不同,仅适用于您希望异常立即停止所有迭代的情况,这并不总是适用的。
此外,在该答案的评论中建议始终将累加器作为reducer中的第一条语句进行等待,否则可能会出现未处理的promise拒绝风险。发帖者还说这就是OP所要求的,但事实并非如此。相反,他只想知道何时完成所有操作。为了确切知道这一点,您确实需要在reducer的任何时间点上做await acc
const reducer = async(acc, key) => {
  const response = await api(item);

  return {
    ...await acc, // <-- this would work just as well for OP
    [key]: response,
  }
}
const result = await ['a', 'b', 'c', 'd'].reduce(reducer, {});
console.log(result); // <-- Will be the final result

如何安全地使用async reduce

尽管使用reducer这种方式很方便,但是你必须保证它不会抛出异常,否则你将会得到"未处理的promise拒绝"。可以使用try-catch来保证这一点,如果出现异常,catch块应该返回累加器(可以附带一个记录失败的API调用的记录)。

const reducer = async (acc, key) => {
    try {
        data = await doSlowTask(key);
        return {...await acc, [key]: data};
    } catch (error) {
        return {...await acc, [key]: {error}};
    };
}
const result = await ['a', 'b', 'c','d'].reduce(reducer, {});

Promise.allSettled与之间的区别 您可以使用Promise.allSettled接近具有错误捕获功能的async reduce行为。但是,这种使用方式很笨拙:如果要缩小到对象,则需要在其后添加另一个同步缩小。

理论时间复杂度对于Promise.allSettled+常规reduce也更高,尽管可能有非常少的用例会产生影响。 async reduce可以从第一个项目完成时开始累加,而在Promise.allSettled之后的reduce被阻塞,直到所有承诺都已实现。当循环遍历大量元素时,这可能会产生影响。

const responseTime = 200; //ms
function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

const api = async (key) => {
    console.log(`Calling API for ${ key }`);
    // Boz is a slow endpoint.
    await sleep(key === 'boz' ? 800 : responseTime);
    console.log(`Got response for ${ key }`);

    if (key === 'bar') throw new Error(`It doesn't work for ${ key }`);

    return {
        [key]: `API says ${ key }`,
    };
};

const keys = ['foo', 'bar', 'baz', 'buz', 'boz'];

const reducer = async (acc, key) => {
    let data;
    try {
        const response = await api(key);
        data = {
            apiData: response
        };
    } catch (e) {
        data = {
            error: e.message
        };
    }

    // OP doesn't care how this works, he only wants to know when the whole thing is ready.
    const previous = await acc;
    console.log(`Got previous for ${ key }`);

    return {
        ...previous,
        [key]: {
            ...data
        },
    };
};
(async () => {
    const start = performance.now();
    const result = await keys.reduce(reducer, {});
    console.log(`After ${ performance.now() - start }ms`, result); // <-- OP wants to execute things when it's ready.
})();

使用Promise.allSettled检查执行顺序:

const responseTime = 200; //ms
function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

const api = async (key) => {
    console.log(`Calling API for ${ key }`);
    // Boz is a slow endpoint.
    await sleep(key === 'boz' ? 800 : responseTime);
    console.log(`Got response for ${ key }`);

    if (key === 'bar') throw new Error(`It doesn't work for ${ key }`);

    return {
        key,
        data: `API says ${ key }`,
    };
};

const keys = ['foo', 'bar', 'baz', 'buz', 'boz'];

(async () => {
    const start = performance.now();
    const apiResponses = await Promise.allSettled(keys.map(api));
    const result = apiResponses.reduce((acc, {status, reason, value}) => {
        const {key, data} = value || {};
        console.log(`Got previous for ${ key }`);
        return {
            ...acc,
            [key]: status === 'fulfilled' ? {apiData: data} : {error: reason.message},
        };
    }, {});
    console.log(`After ${ performance.now() - start }ms`, result); // <-- OP wants to execute things when it's ready.
})();


9

我喜欢Bergi的回答,我认为这是正确的做法。

我还想提到我的一个库,叫做Awaity.js

它让你轻松使用async/awaitreducemapfilter等函数:

import reduce from 'awaity/reduce';

const posts = await reduce([1,2,3], async (posts, id) => {

  const res = await fetch('/api/posts/' + id);
  const post = await res.json();

  return {
    ...posts,
    [id]: post
  };
}, {})

posts // { 1: { ... }, 2: { ... }, 3: { ... } }

每次传递都会是顺序的吗?还是批量调用所有等待函数? - wle8300
5
每个迭代都依赖于前一个返回值,因此是顺序的。 - Asaf Katz

9

[未解决OP的确切问题,专注于其他人来到此处的情况。]

当你需要前面步骤的结果以便在处理下一步时,通常会使用“Reduce”。在这种情况下,你可以像下面这样串联承诺(promises):

promise = elts.reduce(
    async (promise, elt) => {
        return promise.then(async last => {
            return await f(last, elt)
        })
    }, Promise.resolve(0)) // or "" or [] or ...

这里举一个使用 fs.promise.mkdir() 的例子(当然,使用 mkdirSync 更简单,但在我的情况下,需要跨网络使用):

const Path = require('path')
const Fs = require('fs')

async function mkdirs (path) {
    return path.split(/\//).filter(d => !!d).reduce(
        async (promise, dir) => {
            return promise.then(async parent => {
                const ret = Path.join(parent, dir);
                try {
                    await Fs.promises.lstat(ret)
                } catch (e) {
                    console.log(`mkdir(${ret})`)
                    await Fs.promises.mkdir(ret)
                }
                return ret
            })
        }, Promise.resolve(""))
}

mkdirs('dir1/dir2/dir3')

以下是另一个例子,它将100 + 200 ... 500相加并稍微等待一下:

async function slowCounter () {
    const ret = await ([100, 200, 300, 400, 500]).reduce(
        async (promise, wait, idx) => {
            return promise.then(async last => {
                const ret = last + wait
                console.log(`${idx}: waiting ${wait}ms to return ${ret}`)
                await new Promise((res, rej) => setTimeout(res, wait))
                return ret
            })
        }, Promise.resolve(0))
    console.log(ret)
}

slowCounter ()


4
有时候最好的做法就是将同步和异步代码版本并列,在它们之间进行比较:
同步版本:
const arr = [1, 2, 3, 4, 5];

const syncRev = arr.reduce((acc, i) => [i, ...acc], []); // [5, 4, 3, 2, 1] 

异步一:

(async () => { 
   const asyncRev = await arr.reduce(async (promisedAcc, i) => {
      const id = await asyncIdentity(i); // could be id = i, just stubbing async op.
      const acc = await promisedAcc;
      return [id, ...acc];
   }, Promise.resolve([]));   // [5, 4, 3, 2, 1] 
})();

//async stuff
async function asyncIdentity(id) {
   return Promise.resolve(id);
}

const arr = [1, 2, 3, 4, 5];
(async () => {
    const asyncRev = await arr.reduce(async (promisedAcc, i) => {
        const id = await asyncIdentity(i);
        const acc = await promisedAcc;
        return [id, ...acc];
    }, Promise.resolve([]));

    console.log('asyncRev :>> ', asyncRev);
})();

const syncRev = arr.reduce((acc, i) => [i, ...acc], []);

console.log('syncRev :>> ', syncRev);

async function asyncIdentity(id) {
    return Promise.resolve(id);
}


这种方式无法正确处理错误,请参见 https://dev59.com/HlYN5IYBdhLWcg3wzK3f 和 https://dev59.com/HFcO5IYBdhLWcg3wvUQp。绝对不要使用这种模式! - Bergi
2
如果您使用try catch块包装reducer主体,以便它始终能够返回累积值,则可以绝对使用此模式并正确处理错误。 - inwerpsel
我在reduce函数的第二个参数中没有使用"Promise.resolve",但似乎也能正常运行。您能否解释一下添加Promise.resolve的目的以及即使没有它为什么也能正常工作? - Waleed Ahmad
我实际上使用TypeScript编程,因此不使用Promise.resolve(...)作为初始值是不可能的,因为acc的类型(或任何异步函数返回的内容)都是一个promise(而Promise.resolve是一种“装箱”初始值的方法)。关于第二个问题,我想它(在js中)可以工作是因为函数内部的await是用于“拆箱”promise的。结果发现它(await)也适用于“已经拆箱”的值。 - l30.4l3x

2

对于 TypeScript,先前值和初始值需要相同。

const data = await array.reduce(async (accumP: Promise<Tout>, curr<Tin>) => {
    const accum: Tout = await accumP;
    
    doSomeStuff...

    return accum;

}, Promise<Tout>.resolve({} as Tout);

1
你可以将整个 map/reduce 迭代器块包装在自己的 Promise.resolve 中,并等待其完成。然而,问题在于累加器不包含你在每次迭代中预期的结果数据/对象。由于内部的 async/await/Promise 链,累加器本身将是实际的 Promise,它们很可能尚未解决,尽管在调用存储之前使用了 await 关键字(这可能会让您认为迭代直到该调用完成并更新累加器才会返回)。
虽然这不是最优雅的解决方案,但你有一个选择,那就是将你的 data 对象变量移出作用域,并将其分配为 let,以便可以进行适当的绑定和突变。然后从你的迭代器内部更新此数据对象,因为异步/await/Promise 调用解析。
/* allow the result object to be initialized outside of scope 
   rather than trying to spread results into your accumulator on iterations, 
   else your results will not be maintained as expected within the 
   internal async/await/Promise chain.
*/    
let data = {}; 

await Promise.resolve(bodies.reduce(async(accum, current, index) => {
  const methodName = methods[index]
  const method = this[methodName];
  if (methodName == 'foo') {
    // note: this extra Promise.resolve may not be entirely necessary
    const cover = await Promise.resolve(this.store(current.cover, id));
    current.cover = cover;
    console.log(current);
    data = {
      ...data,
      ...current,
    };
    return data;
  }
  data = {
    ...data,
    ...method(current.data)
  };
  return data;
}, {});
console.log(data);

“累加器将是实际的 Promise 本身”- 是的,但您的解决方案从不等待它们。它只等待最后一次迭代返回的 promise,但如果该解析速度比之前的解析更快,那么您的 console.log(data) 将不完整。此解决方案无效。 您应该使用Promise.all - Bergi

0

export const addMultiTextData = async(data) => {
  const textData = await data.reduce(async(a, {
    currentObject,
    selectedValue
  }) => {
    const {
      error,
      errorMessage
    } = await validate(selectedValue, currentObject);
    return {
      ...await a,
      [currentObject.id]: {
        text: selectedValue,
        error,
        errorMessage
      }
    };
  }, {});
};


3
虽然这段代码可能解决了问题,但是包括解释会有助于提高您的帖子质量。请记住,您是为未来的读者回答问题,这些人可能不知道您建议的代码建议的原因。 - 4b0
不是说我不推荐这种方法,因为在循环中使用扩展运算符会对性能造成很大的影响。 - rmolinamir
这种方式无法正确处理错误,请参见 https://dev59.com/HlYN5IYBdhLWcg3wzK3f 和 https://dev59.com/HFcO5IYBdhLWcg3wvUQp。绝对不要使用这种模式! - Bergi

0

以下是如何创建异步 reduce:

async function asyncReduce(arr, fn, initialValue) {
  let temp = initialValue;

  for (let idx = 0; idx < arr.length; idx += 1) {
    const cur = arr[idx];

    temp = await fn(temp, cur, idx);
  }

  return temp;
}

0

我的typescript中.reduce的解决方案

感谢这个人 https://dev.to/arnaudcourtecuisse/comment/1el22

const userOrders = await existUsersWithName.reduce(
      async (promise, existUserAndName) => {
        const acc = await promise;

        const {user, name} = existUserAndName;

        // My async function
        acc[user] = await this.users.getOrders(name);

        return promise;
      },
      <Promise<Record<string, string[] | undefined>>>{}
    );

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接