Nodejs and Express, use res.send() from a worker thread
Asked Answered
B

2

7

In Nodejs, using Express as a server, I offload a heavy computation onto a worker thread.

From the main application, I call the worker thread like this:

// file: main.js

const { Worker } = require("worker_threads");

function runService(fileName, workerData) {
    return new Promise((resolve, reject) => {
        const worker = new Worker(fileName, { workerData });
        worker.on("message", resolve);
        worker.on("error", reject);
        worker.on("exit", code => {
            if (code !== 0)
                reject(new Error(`Worker stopped with exit code ${code}`));
        });
    });
}

router.get("/some_url", async function(req, res) {
    const result = await runService(
        "./path/to/worker.js",
        { query: req.query, user: req.user } // using { req } causes an error
    );
});

The worker looks like this:

// file: worker.js

const { workerData, parentPort } = require('worker_threads');
const { query, user } = workerData;

async function run() {
    const result = await generateLotsOfData(query, user);

    parentPort.postMessage(result);

    // What I would like to do here (doesn't work): res.send(result);
}

The worker generates a huge amount of data and "postMessage" causes a server error.

Is there a way to send this data from the worker thread directly to the client, using res.send() or something alike?
(instead of using postMessage and then sending from the main thread)?

Battik answered 30/10, 2019 at 14:57 Comment(5)
would streams help?Orthodontics
not sure about your exact usecase, bt you should be able to stream the values using res.write() as shown here: gist.github.com/montanaflynn/6a438f0be606daede899Orthodontics
I would still have to send data from the worker thread to the main thread before I can use res.write(). Streams might still be useful because I would have to send Les data a time.Battik
Did you find a solution @HendrikJan? I'm facing a similar issue.Chaw
@Chaw I did not find a way to send directly from the worker-thread. See my answer for how I finally resolved my problem.Battik
D
1

This sample of code works :

  1. create worker
  2. create event handler
  3. sendMessage --> wrapperWorkerThreadBigComputing
  4. sendBack message on our main thread
  5. emit server response

Api routes :

app.get('/bigComputingWithWorker', (req, res) => {
     const worker = new Worker(`${__dirname}/src/wrapperWorkerThreadBigComputing.js`);
    res.set('Content-Type', 'text/html');
      worker.once("message", count => {
        res.status(200).send(`The final count :${count}`);
    });
    worker.once("error", err => {
        console.error(err);
        res.status(400).send(`error worker thread`);
    });
     worker.postMessage({coucou : 'john'});
});

wrapperWorkerThreadBigComputing

const { parentPort } = require('node:worker_threads');

console.log("* worker created");
parentPort.once('message', (message) => {
      let big = bigComputing();
      parentPort.postMessage(big);
})

function bigComputing() {
    let i = 0;
    console.log("* start big computing")
    for (i = 0; i < 300000000; i++) {
    }
    return i;
}
Dorladorlisa answered 29/7, 2022 at 9:32 Comment(0)
B
0

It seems not possible to send directly from the worker-tread to the client.
In the end, I used child-process (instead of worker-treads), and send the result back to the main tread, and then to the client.

// file: main.js

var child_process = require('child_process');

// Run a worker thread
function runService(fileName, workerData) {
    return new Promise((resolve, reject) => {

        const worker = child_process.fork(fileName);

        worker.on('message', function(message) {
            if (message.ready) {
                // worker is ready to receive data
                worker.send(workerData);
            } else {
                // worker finished it's calculations, send the data to the client
                resolve(message);
            }
        });

        worker.on("error", function(x) {
            console.log('error', x);
            resolve({status: 500});
        });
        worker.on("close", function(y) {
            console.log('close', y);
        });
        worker.on("exit", function(code) {
            if (code == 0) {
                console.log('report ran successfully');
            } else {
                console.log('report FAILED');
                resolve({status: 500});
            }
        });
    });
}

And in the worker:

process.on('message', run);
process.send({ready: true});

async function run(workerData) {
    try {
        const result = doSomeCalculations();

        process.send({
            data: JSON.stringify(result)
        }, null, {}, function() {process.exit(0)});
    } catch(err) {
        console.error(err);
        process.exit(1); // exit the process with a failure
    }
}

// Make sure that this child_process doesn't accidentally stay in memory.
// Kill after 10 minutes. (not sure this is necessary, just to be sure)
setTimeout(function(){
    console.log('Timeout in child-process');
    process.exit(1);
}, 600000);

This actually works quite well.

Battik answered 2/1, 2020 at 10:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.