I'm trying to write a node code that will read multiple json files from directory and insert mapped data into mongodb collection. In order to speed up the process, I have follwed following steps:
- Put all filename inside Array.
- Make Segment based on CPU length.
- Create Worker thread read these file and store into MongoDB collection
- Returning the combined result as a JSON result
Below Code Sample
readFiles.js
const {Worker} = require('worker_threads');
const os = require('os');
const cpuCount = os.cpus().length;
const path = require('path');
const workerPath = path.resolve('./worker.js');
const fileArray = ['D:/jsonFiles/abc_1310202220102022.json', 'D:/jsonFiles/def_1310202220102022.json', 'D:/jsonFiles/ghi_1310202220102022.json'];
let failureCount = 0;
let successCount = 0;
let failureMessage = [];
const splitRecords = (files, done) => {
const segmentSize = Math.ceil(files.length/cpuCount);
const segments = [];
for (let segmentIndex=0; segmentIndex<cpuCount;segmentIndex++) {
const start = segmentIndex * segmentSize; //0 * 60
const end = start + segmentSize; // 0 + 60
const segment = files.slice(start,end);
segments.push(segment);
}
const finalSegments = [];
segments.forEach((seg)=>{
if(seg.length>0) {
finalSegments.push(seg);
}
});
done (finalSegments);
}
const createWorker = (records) => {
return new Promise((resolve, reject) => {
let workerData = {'records': records};
const worker = new Worker(workerPath, {workerData});
worker.on('message', function(data) {
failureCount += data.failureCount;
successCount += data.successCount;
failureMessage.push(data.detailError);
resolve({
"failureCount":failureCount, "successCount":successCount,
"failureMessage":failureMessage
});
});
worker.on('error', function(dataErr) {
resolve(dataErr)
});
})
}
(async () => {
if (fileArray.length > 0) {
let segmentArray = [];
splitRecords(fileArray, (data) => {
segmentArray = data;
});
if (segmentArray.length > 0) {
segmentArray.forEach(async (list) => {
createWorker(list);
}
}
}
})();
Worker.js
const mongoose = require('mongoose');
require('../config/mongoconfig')(mongoose);
const { workerData, isMainThread, parentPort } = require('worker_threads');
const axios = require('axios');
const Blog = require('../models/blogs');
let failureCount = 0;
let successCount = 0;
let detailsFailure = [];
var fs = require('fs');
const getFeatured = async function(imageURL) {
return new Promise(async(resolve, reject) => {
var url = cmsconfig.channelsUrl
let options = {
method: 'post',
url: 'http://10.65.32.90:2045/v1/imageupload',
headers: {},
data: { imageUrl:imageURL}
}
await axios(options)
.then(async function (response) {
if (typeof response.data.url !== 'undefined') {
resolve(response.data.url);
} else {
resolve();
}
})
.catch(function (error) {
resolve();
})
});
};
const mappingData = async (jsonPayload) => {
return new Promise(async(resolve, reject) => {
await Promise.all([jsonPayload["data"].forEach(async function (item) {
let blogPayload = {};
blogPayload["post_title"] = item.post_title;
blogPayload["category"] = item.category;
blogPayload["sub_category"] = item.sub_category;
blogPayload["contents"] = item.contents;
blogPayload["featured_image"] = await getFeatured(item.featured_image); // uploading file into AWS cloud and return url
blogPayload["author"] = item.author;
const newBlog = new Blog(blogPayload);
await newBlog.save((idbErr) => {
if (idbErr) {
failureCount += 1;
detailsFailure.push(idbErr);
} else {
successCount += 1;
}
});
})]);
resolve({"successCount": successCount, "failureCount": failureCount, "detailError": detailsFailure});
});
};
async () => {
console.log("Worker started");
if (!isMainThread ) {
console.log("Inside not equal MainThread");
const items = workerData.records;
console.log("items length" + items.length);
for (const dbData of items) {
fs.readFile(dbData, async function(err, fileData) {
if (err) throw err;
let payload = JSON.parse(fileData.toString());
let mappingResult = await mappingData(payload);
parentPort.postMessage(mappingResult);
});
}
} else {
console.log("Outside MainThread");
}
})();
Problems:
- When I run code CPU Utilization is two high
- MongoDB connection is also two high.
- Throwing multiple MongoDB connection issue
.forEach()
doesn't work with promises, so you're basically startingjsonPayload["data"].length
concurrent asynchronous operations in each worker. – Dewdrop