Node.js非阻塞for循环

3
请检查我对以下for循环的理解是否正确。
for(let i=0; i<1000; i){
  sample_function(i, function(result){});
}

一旦调用for循环,1000个sample_function事件会在event loop中排队。大约5秒后,用户发出一个http请求,该请求排队在这些“1000个事件”之后。 通常,这不是问题,因为循环是异步的。 但是假设这个sample_function是一个占用CPU资源的函数。 因此,“1000个事件”将依次完成,并且每个事件需要约1秒钟。 结果,for循环将阻塞约1000秒。
是否有解决此类问题的方法? 例如,是否可以让线程每10次循环“休息”一下? 并允许其他新队列在其中弹出? 如果可以,我该怎么做呢?
2个回答

3

试试这个:

 for(let i=0; i<1000; i++)
 {
    setTimeout(sample_function, 0, i, function(result){});
 }

或者
function sample_function(elem, index){..}

var arr = Array(1000);
arr.forEach(sample_function);

只是为了确保我理解正确——如果我使用setTimeout而不是一次性排队整个for loop,那么每次它都会排队1000个中的一个,同时在它们之间允许其他请求进来。这样理解对吗? - Sihoon Kim
是的。有关更详细的说明,请参见此处:https://developer.mozilla.org/en-US/docs/Web/JavaScript/EventLoop#Adding_messages是的 - Eriks Klotins
我认为第二个例子,生成一个空数组的方法是不可行的。如果你使用for循环生成一个包含某些值的索引数组,那么你可以按照这里展示的方式使用forEach。 - Matt Korostoff
据我所知并测试,提供的两个示例都是错误的,在循环中运行setTimeout将通过将它们放入事件循环的“timers”阶段队列中来安排setTimeout的回调函数(在您的示例中,1000个回调将被放置在“timer”的队列上,当事件循环迭代并进入“timers”阶段时,它尝试通过从“timer”的队列中逐个出队回调并将它们放在“call stack”上以执行,这意味着在每次迭代之间没有空闲时间(意味着事件循环不会在每个任务之间迭代)。 - Gandalf
1
修复:let process = (i) => new Promise((resolve) => { setTimeout(sample_function, 0, i, function(result){ resolve(result) }); }) for(let i=0; i<1000; i++) { await process(i); } 这样我们确保循环在 process 返回的 promise 被解决之前不会继续进行,这个 promise 将在 setTimeout 的回调被调用时解决,这意味着在每次迭代后事件循环必须继续迭代,以便它可以进入计时器阶段来执行当前迭代的 setTimeout 回调,这就是为什么在每次迭代之间会有一个“breathing space”的原因。 - Gandalf
@Gandalf 是的,你说得对。我也应该在这里提到。setTimeout是我需要的提示。但是,是的,需要用Promise来包装它。 - Sihoon Kim

1

有一种技术叫做分区,你可以在NodeJs文档中了解它。但正如文档所述:

如果您需要执行更复杂的操作,则分区不是一个好选择。这是因为分区仅使用事件循环,而您几乎肯定无法从计算机上可用的多个核心中获得任何好处。

所以,您还可以使用另一种技术称为卸载,例如使用工作线程子进程,但这也有一些缺点,例如必须对您希望在事件循环(当前线程)和工作线程或子进程之间共享的任何对象进行序列化和反序列化。

以下是我想出来的一个分区示例,它适用于express应用程序的上下文。

const express = require('express');
const crypto = require('crypto');
const randomstring = require('randomstring');

const app = express();
const port = 80;

app.get('/', async (req, res) => {
    res.send('ok');
})

app.get('/block', async (req, res) => {
    let result = [];
    for (let i = 0; i < 10; ++i) {
        result.push(await block());
    }
    res.send({result});
})

app.listen(port, () => {
    console.log(`Listening on port ${port}`);
    console.log(`http://localhost:${port}`);
})

/* takes around 5 seconds to run(varies depending on your processor) */
const block = () => {
    //promisifying just to get the result back to the caller in an async way, this is not part of the partitioning technique
    return new Promise((resolve, reject) => {
        /**
         * https://nodejs.org/en/docs/guides/dont-block-the-event-loop/#partitioning
         * using partitioning techinique(using setImmediate/setTimeout) to prevent a long running operation
         * to block the eventloop completely
         * there will be a breathing period between each time block is called
         */
        setImmediate(() => {
            let hash = crypto.createHash("sha256");
            const numberOfHasUpdates = 10e5;
            for (let iter = 0; iter < numberOfHasUpdates; iter++) {
                hash.update(randomstring.generate());
            }
            resolve(hash);
        })
    });
}

有两个端点//block,如果你访问了/block,然后再访问/端点,会发生这样一件事情,即/端点需要大约5秒钟才能返回响应(在“breathing space”期间(你称之为“break”))。
如果没有使用setImmediate,那么/端点将在for循环中调用block函数的次数乘以5后,约需等待10 * 5秒才能响应请求。
此外,你可以使用递归方法进行分区,像这样:
/**
 * 
 * @param items array we need to process
 * @param chunk a number indicating number of items to be processed on each iteration of event loop before the breathing space
 */
function processItems(items, chunk) {
    let i = 0;
    const process = (done) => {
        let currentChunk = chunk;
        while (currentChunk > 0 && i < items?.length) {
            --currentChunk;
            syncBlock();
            ++i;
        }

        if (i < items?.length) {
            setImmediate(process);//the key is to schedule the next recursive call (by passing the function to setImmediate) instead of doing a recursive call (by simply invoking the process function)
        }
    }
    process();
}

如果您需要获取处理后的数据,可以像这样将其转换为promise:
function processItems(items, chunk) {
    let i = 0;
    let result = [];
    const process = (done) => {
        let currentChunk = chunk;
        while (currentChunk > 0 && i < items?.length) {
            --currentChunk;
            const returnedValue = syncBlock();
            result.push(returnedValue);
            ++i;
        }

        if (i < items?.length) {
            setImmediate(() => process(done));
        } else {
            done && done(result);
        }
    }
    const promisified = () => new Promise((resolve) => process(resolve));
    return promisified();
}

你可以通过添加此路由处理程序来测试它,该处理程序与上面提供的其他处理程序一起使用:
app.get('/block2', async (req, res) => {
    let result = [];

    let arr = [];
    for (let i = 0; i < 10; ++i) {
        arr.push(i);
    }
    result = await processItems(arr, 1);
    res.send({ result });
})

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接