如何在Node.js中创建两个工作线程之间的直接通信渠道?

9
有没有一种方法可以使用new MessageChannel在两个工作线程之间创建直接通信通道?例如:我使用worker_thread API创建了一个主线程P,它创建了两个工作线程W1W2
P -> W1
  -> W2

我想要使用 parentPort 直接启用 W1W2 之间的通信,而不是通过 P

你正在使用 worker_threads 模块吗? - lx1412
是的,我正在使用 worker_threads 模块。 - tusharmath
2个回答

11

使用new MessageChannel()生成一个双向通信通道。

index.js

const { Worker } = require('worker_threads');

const path = require('path'); 

const w1 = new Worker(path.join(__dirname,'./worker1.js'));
const w2 = new Worker(path.join(__dirname,'./worker2.js'));

w1.once('message', value => {
    w2.postMessage({
        port: value.port
    }, [value.port]);
});

w2.once('message', value => {
    w1.postMessage({
        port: value.port
    }, [value.port]);
});

worker1.js

const { MessageChannel, parentPort,  } = require('worker_threads');

let woker2Port;
console.log('worker1 started');

const { port1, port2 } = new MessageChannel();
port1.on('message', (value) => {
    console.log(value);
});

parentPort.postMessage({
    port: port2,
}, [port2]);

parentPort.on('message', value => {
    if (value.port) {
        woker2Port = value.port;
        woker2Port.postMessage({msg:'i am worker1!'});// send msg to worker2
        return;
    }
});

worker2.js

const { MessageChannel, parentPort,  } = require('worker_threads');

let woker1Port;
console.log('worker2 started');

const { port1, port2 } = new MessageChannel();
port1.on('message', (value) => {
    console.log(value);
});

parentPort.postMessage({
    port: port2,
}, [port2]);

parentPort.on('message', value => {
    if (value.port) {
        woker1Port = value.port;
        woker1Port.postMessage({msg:'i am worker2!'});// send msg to worker1
    }
});

注意:如果你在 VSCode 中调试此代码,你将看不到在 worker1.jsworker2.js 中的日志打印。直接运行 node index 或者使用 ndb 调试都可以正常工作!


起初我不太清楚的是,你需要使用“new MessageChannel()”创建两个端口,并将一个端口保留在一个worker中,将另一个端口发送到另一个worker。我曾试图在我发送给另一个worker的同一端口上发送和接收信息,但这样做是行不通的。通信无法进行。你需要在每个worker中都使用来自同一通道的一个端口。希望这样更清晰明了。 - Jonathan

1

也许你可以尝试这个:

const {Worker, isMainThread, parentPort, MessageChannel } = require('node:worker_threads');
    
    if(isMainThread)
    {
        let worker1 = new Worker(__filename,{argv:['worker1']})
        let worker2 = new Worker(__filename,{argv:['worker2']})
    
    const {port1, port2} = new MessageChannel

    worker1.postMessage({port:port1},[port1])
    worker2.postMessage({port:port2},[port2])

}
else
{
    //use process.argv to differentiate between worker1 and worker2
    let workerName = process.argv[2]

    parentPort.once('message',value=>{
        let myport = value.port

        myport.on('message',(data)=>{
            console.log(`${workerName} got:${data}`)
            //every time the data was increacsed by 1
            setTimeout(()=>{myport.postMessage(data+1)},1000)
        })

        if(workerName === 'worker1')
        {
            //let worker1 to send the intial message
            myport.postMessage(1)
        }
    })
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接