Parallel processing in Node.js
Asked Answered
M

0

0

I'm trying to write a node code that will read multiple json files from directory and insert mapped data into mongodb collection. In order to speed up the process, I have follwed following steps:

  • Put all filename inside Array.
  • Make Segment based on CPU length.
  • Create Worker thread read these file and store into MongoDB collection
  • Returning the combined result as a JSON result

Below Code Sample

readFiles.js

const {Worker} = require('worker_threads');
const os = require('os');
const cpuCount = os.cpus().length;
const path = require('path');
const workerPath = path.resolve('./worker.js');
const fileArray = ['D:/jsonFiles/abc_1310202220102022.json', 'D:/jsonFiles/def_1310202220102022.json', 'D:/jsonFiles/ghi_1310202220102022.json'];
let failureCount = 0;
let successCount = 0;
let failureMessage = [];
const splitRecords = (files, done) => {
  const segmentSize = Math.ceil(files.length/cpuCount);
  const segments = [];
  for (let segmentIndex=0; segmentIndex<cpuCount;segmentIndex++) {
    const start = segmentIndex * segmentSize; //0 * 60
    const end = start + segmentSize; // 0 + 60
    const segment = files.slice(start,end);
    segments.push(segment);
  }
  const finalSegments = [];
  segments.forEach((seg)=>{
    if(seg.length>0) {
      finalSegments.push(seg);
    }
  });
  done (finalSegments);
}

const createWorker = (records) => {
  return new Promise((resolve, reject) => {
    let workerData = {'records': records};
    const worker = new Worker(workerPath, {workerData});
    worker.on('message', function(data) {
      failureCount += data.failureCount;
      successCount += data.successCount;
      failureMessage.push(data.detailError);
      resolve({
            "failureCount":failureCount, "successCount":successCount, 
            "failureMessage":failureMessage
        });
    });
    worker.on('error', function(dataErr) {
      resolve(dataErr)
    });
  })
}

(async () => {
    if (fileArray.length > 0) {
        let segmentArray = [];
        splitRecords(fileArray, (data) => {
            segmentArray = data;
        });
        if (segmentArray.length > 0) {
            segmentArray.forEach(async (list) => {
                createWorker(list);
            }
        }
    }
})();

Worker.js

const mongoose = require('mongoose');
require('../config/mongoconfig')(mongoose);
const { workerData, isMainThread, parentPort } = require('worker_threads');
const axios = require('axios');
const Blog = require('../models/blogs');
let failureCount = 0;
let successCount = 0;
let detailsFailure = [];
var fs = require('fs');

const getFeatured = async function(imageURL) {
  return new Promise(async(resolve, reject) => {
    var url = cmsconfig.channelsUrl
    let options = {
      method: 'post',
      url: 'http://10.65.32.90:2045/v1/imageupload',
      headers: {},
      data: { imageUrl:imageURL}
    }
    await axios(options)
    .then(async function (response) {
      if (typeof response.data.url !== 'undefined') {
        resolve(response.data.url);
      } else {
        resolve();
      }
    })
    .catch(function (error) {
      resolve();
    })
  });
};
const mappingData = async (jsonPayload) => {
  return new Promise(async(resolve, reject) => {
      await Promise.all([jsonPayload["data"].forEach(async function (item) {
          let blogPayload = {};
          blogPayload["post_title"] = item.post_title;
          blogPayload["category"] = item.category;
          blogPayload["sub_category"] = item.sub_category;
          blogPayload["contents"] = item.contents;
          blogPayload["featured_image"] = await getFeatured(item.featured_image); // uploading file into AWS cloud and return url
          blogPayload["author"] = item.author;
          const newBlog = new Blog(blogPayload);
          await newBlog.save((idbErr) => {
              if (idbErr) {
                 failureCount += 1;
                 detailsFailure.push(idbErr);
              } else {
                 successCount += 1;
              }
          });
      })]);
  resolve({"successCount": successCount, "failureCount": failureCount, "detailError": detailsFailure});
  });
};
async () => {
  console.log("Worker started");
  if (!isMainThread ) {
    console.log("Inside not equal MainThread");
    const items = workerData.records;
    console.log("items length" + items.length);
    for (const dbData of items) {
      fs.readFile(dbData, async function(err, fileData) {
        if (err) throw err;
        let payload = JSON.parse(fileData.toString());
        let mappingResult = await mappingData(payload);
        parentPort.postMessage(mappingResult);
      });
    }
  } else {
    console.log("Outside MainThread");
  }
})();

Problems:

  1. When I run code CPU Utilization is two high
  2. MongoDB connection is also two high.
  3. Throwing multiple MongoDB connection issue
Mufinella answered 3/1, 2023 at 12:38 Comment(5)
I'm sorry, but I can't find a question in this question. What are you having trouble with?Evangelia
@Evangelia I have added code sample and problem bothMufinella
@robertklep could you please help me to solve problemMufinella
.forEach() doesn't work with promises, so you're basically starting jsonPayload["data"].length concurrent asynchronous operations in each worker.Dewdrop
@Dewdrop but some data I am able to ingest arround 20k. Can you response with solutionMufinella

© 2022 - 2024 — McMap. All rights reserved.