I'm trying to parse and insert a big csv file into MongoDB but when the file extends 100'000 rows I get a bad response from the server. And the files I need to insert are usually above 200'000 rows.
I've tried both bulk insert (insertMany) and Babyparse(Papaparse) streaming approach to insert the file row by row. But with poor results.
Node api:
router.post('/csv-upload/:id', multipartMiddleware, function(req, res) {
// Post vartiables
var fileId = req.params.id;
var csv = req.files.files.path;
// create a queue object with concurrency 5
var q = async.queue(function(row, callback) {
var entry = new Entry(row);
entry.save();
callback();
}, 5);
baby.parseFiles(csv, {
header: true, // Includes header in JSON
skipEmptyLines: true,
fastMode: true,
step: function(results, parser) {
results.data[0].id = fileId;
q.push(results.data[0], function (err) {
if (err) {throw err};
});
},
complete: function(results, file) {
console.log("Parsing complete:", results, file);
q.drain = function() {
console.log('All items have been processed');
res.send("Completed!");
};
}
});
});
This streaming approach results in: POST SERVER net::ERR_EMPTY_RESPONSE
Not sure if I'm using the async.queue correctly though.
Is there a better and more efficient way to do this OR am I doing something wrong?
Express Server:
// Dependencies
var express = require('express');
var path = require('path');
var bodyParser = require('body-parser');
var routes = require('./server/routes');
var mongoose = require("mongoose");
var babel = require("babel-core/register");
var compression = require('compression');
var PORT = process.env.PORT || 3000;
// Include the cluster module
var cluster = require('cluster');
mongoose.connect(process.env.MONGOLAB_URI || 'mongodb://localhost/routes');
// Code to run if we're in the master process
if (cluster.isMaster) {
// Count the machine's CPUs
var cpuCount = require('os').cpus().length;
// Create a worker for each CPU
for (var i = 0; i < cpuCount; i += 1) {
cluster.fork();
}
// Code to run if we're in a worker process
} else {
// Express
var app = express();
app.use(bodyParser.json({limit: '50mb'}));
app.use(bodyParser.urlencoded({limit: '50mb', extended: true}));
// Compress responses
app.use(compression());
// Used for production build
app.use(express.static(path.join(__dirname, 'public')));
routes(app);
// Routes
app.use('/api', require('./server/routes/api'));
app.all('/*', function(req, res) {
res.sendFile(path.join(__dirname, 'public/index.html'));
});
// Start server
app.listen(PORT, function() {
console.log('Server ' + cluster.worker.id + ' running on ' + PORT);
});
}