How do I create a direct communication channel between two worker threads in Node.js
Asked Answered
A

2

9

Is there a way to create a direct communication channel using new MessageChannel between two worker threads? For Eg: There is a main thread P that I have created using the worker_thread API, which creates two worker threads W1 and W2

P -> W1
  -> W2

I want to enable communication between W1 and W2 directly instead of going via P using parentPort.

Astrology answered 12/9, 2019 at 2:14 Comment(2)
You are using the worker_threads module ?Killam
Yes. I am using the worker_threads moduleAstrology
K
11

Use new MessageChannel() to generate a two-way communications channel.

index.js

const { Worker } = require('worker_threads');

const path = require('path'); 

const w1 = new Worker(path.join(__dirname,'./worker1.js'));
const w2 = new Worker(path.join(__dirname,'./worker2.js'));

w1.once('message', value => {
    w2.postMessage({
        port: value.port
    }, [value.port]);
});

w2.once('message', value => {
    w1.postMessage({
        port: value.port
    }, [value.port]);
});

worker1.js

const { MessageChannel, parentPort,  } = require('worker_threads');

let woker2Port;
console.log('worker1 started');

const { port1, port2 } = new MessageChannel();
port1.on('message', (value) => {
    console.log(value);
});

parentPort.postMessage({
    port: port2,
}, [port2]);

parentPort.on('message', value => {
    if (value.port) {
        woker2Port = value.port;
        woker2Port.postMessage({msg:'i am worker1!'});// send msg to worker2
        return;
    }
});

worker2.js

const { MessageChannel, parentPort,  } = require('worker_threads');

let woker1Port;
console.log('worker2 started');

const { port1, port2 } = new MessageChannel();
port1.on('message', (value) => {
    console.log(value);
});

parentPort.postMessage({
    port: port2,
}, [port2]);

parentPort.on('message', value => {
    if (value.port) {
        woker1Port = value.port;
        woker1Port.postMessage({msg:'i am worker2!'});// send msg to worker1
    }
});

Notice: if you debug this code in VSCode, you won't see the log print in worker1.js and worker2.js. Run node index directly or debug it in ndb works fine!

Killam answered 12/9, 2019 at 3:59 Comment(1)
What wasn't clear to me at first is that you create two ports with 'new MessageChannel()' and keep one port in one worker and send the other port to the other worker. I was trying to send and receive on the same port I sent to the other worker and that does not work. The communication doesn't go through. You need one port from the same channel in each worker. I hope that make it clearer.Pussy
D
1

Perhaps you can try this:

const {Worker, isMainThread, parentPort, MessageChannel } = require('node:worker_threads');
    
    if(isMainThread)
    {
        let worker1 = new Worker(__filename,{argv:['worker1']})
        let worker2 = new Worker(__filename,{argv:['worker2']})
    
    const {port1, port2} = new MessageChannel

    worker1.postMessage({port:port1},[port1])
    worker2.postMessage({port:port2},[port2])

}
else
{
    //use process.argv to differentiate between worker1 and worker2
    let workerName = process.argv[2]

    parentPort.once('message',value=>{
        let myport = value.port

        myport.on('message',(data)=>{
            console.log(`${workerName} got:${data}`)
            //every time the data was increacsed by 1
            setTimeout(()=>{myport.postMessage(data+1)},1000)
        })

        if(workerName === 'worker1')
        {
            //let worker1 to send the intial message
            myport.postMessage(1)
        }
    })
}
Degression answered 1/12, 2022 at 14:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.