Challenge:
Downloading a CSV File with a Size of Over 30 Million Records
Solution:
Implementing Laravel Job Chains to Manage the Processing of 30 Million Records. Storing the Data in 30 CSV Files (1M in each CSV) and Downloading them as a Single Zip File.
IN Controller
public function trans_recharge_rep_dl_custm_csv_chunk_queue(Request $request)
{
$chunkSize = 25000; // 25k data per chunk for safety but server capacity up to 70k for 134mb of running processes
$recharge_archives = TransactionReportFacade::trans_recharge_rep_process($request);
$totalCount = $recharge_archives->count();
$numberOfChunks = ceil($totalCount / $chunkSize);
$fileName = "file_".Str::slug(getNow()."-".rand(1,999)).".csv";
$report_type = $request->report_type=='recharge_report' ? 'recharge' : 'transactions';
$file_base_url = "/uploads/reports/".$report_type."/".getToday()."/csv";
$file_full_base_url = $_SERVER['DOCUMENT_ROOT']."/uploads/reports/".$report_type."/".getToday()."/csv";
$file_url = $file_base_url."/".$fileName;
$file_full_url = $file_full_base_url."/".$fileName;
$report_type_name = ($request->report_type=='recharge_report' ? 'Recharge Report':'Transaction Report');
$data = $totalCount>1000000 ?
TransactionReportFacade::transaction_rep_download_more_than_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name)
:
TransactionReportFacade::transaction_rep_download_less_or_eq_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name);
return $this->set_response($data, 200,'success', ['Transaction report file is being generated. Please allow us some time. You may download the csv file in notification section.'], $request->merge(['log_type_id' => 3]));
}
In Facade / Helper
<?php
namespace App\Facades\Modules\Report\TransactionReport;
use ZipArchive;
use App\Models\RechargeArchives;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Bus;
use App\Jobs\TransactionReportCSVChunk;
use Illuminate\Support\Facades\File;
class TransactionReportHelper
{
public function trans_recharge_rep_process($request)
{
$recharge_daily_tbl = RechargeArchives::with('reduction_list:id,ruid');
if ($request->report_type == 'recharge_report') {
if (isset($request->request_date_from)) {
$recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.recharge_date', '>=', $request->request_date_from);
}
if (isset($request->request_date_to)) {
$recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.recharge_date', '<=', $request->request_date_to);
}
} else {
if (isset($request->request_date_from)) {
$recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.request_date', '>=', $request->request_date_from);
}
if (isset($request->request_date_to)) {
$recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.request_date', '<=', $request->request_date_to);
}
}
return $recharge_daily_tbl;
}
// transaction report less than or equal to 1 Millon
public function transaction_rep_download_less_or_eq_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name)
{
$batches = [];
for($i=1; $i<=$numberOfChunks; $i++)
{
$offset = ($i * $chunkSize) - $chunkSize;
$batches[] = new TransactionReportCSVChunk(
[
'request' => $request->all(),
'offset' => $offset, 'totalCount' => $totalCount, 'chunkSize' => $chunkSize, 'numberOfChunks' => $numberOfChunks, 'fileName' => $fileName, 'file_base_url' => $file_base_url, 'file_full_base_url' => $file_full_base_url, 'file_url' => $file_url, 'file_full_url' => $file_full_url, 'user_id' => auth()->user()->id, 'report_type_name' => $report_type_name
]
);
}
Bus::chain($batches)->dispatch();
$data = [
'download' => $file_url
];
return $data;
}
// transaction report more than or equal to 1 Millon
public function transaction_rep_download_more_than_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name)
{
// $fileName = is master filename that will be renamed as multiple file names
$files_count = (int) ceil($totalCount/1000000);
$files = []; // each file contains 1m data
$zip_file_name = substr($fileName, 0, -3).'zip';
$zip_file_url = substr($file_url, 0, -3).'zip';
$zip_file_full_base_url = $file_full_base_url;
$zip_file_full_url = substr($file_full_url, 0, -3).'zip';
for($i = 0; $i < $files_count; $i++)
{
$files[] = $i.'-'.$fileName;
}
$batches = [];
$offset=1;
for($i=$offset; $i<=$numberOfChunks; $i++)
{
$offset = ($i * $chunkSize) - $chunkSize;
$fileName = $files[$offset/1000000]; // file name overwritten for each 1m datasets
$file_url = $file_base_url.'/'.$fileName; // file full base url overwritten for each 1m datasets
$file_full_url = $file_full_base_url.'/'.$fileName; // file full base url overwritten for each 1m datasets
$batches[] = new TransactionReportCSVChunk(
[
'request' => $request->all(),
'offset' => $offset, 'totalCount' => $totalCount, 'chunkSize' => $chunkSize, 'numberOfChunks' => $numberOfChunks, 'fileName' => $fileName, 'file_base_url' => $file_base_url, 'files_count' => $files_count, 'file_full_base_url' => $file_full_base_url, 'file_url' => $file_url, 'file_full_url' => $file_full_url, 'user_id' => auth()->user()->id, 'report_type_name' => $report_type_name, 'files' => $files, 'zip_file_name' => $zip_file_name, 'zip_file_url' => $zip_file_url, 'zip_file_full_base_url' => $zip_file_full_base_url, 'zip_file_full_url' => $zip_file_full_url
]
);
}
Bus::chain($batches)->dispatch();
$data = [
'download' => $zip_file_url
];
return $data;
}
}
In Job Queue
<?php
namespace App\Jobs;
use ZipArchive;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\File;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use App\Facades\Modules\Report\TransactionReport\TransactionReportFacade;
class TransactionReportCSVChunk implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $data;
public function __construct($data = [])
{
$this->data = $data;
}
public function handle()
{
$zip = new ZipArchive();
$process_start_time = getTodayDateTime();
$request = (object) $this->data['request'];
$offset = $this->data['offset'] ;
$totalCount = $this->data['totalCount'] ;
$chunkSize = $this->data['chunkSize'] ;
$numberOfChunks = $this->data['numberOfChunks'] ;
$fileName = $this->data['fileName'] ;
$file_base_url = $this->data['file_base_url'] ;
$file_full_base_url = $this->data['file_full_base_url'] ;
$file_url = $this->data['file_url'] ;
$file_full_url = $this->data['file_full_url'] ;
$user_id = $this->data['user_id'] ;
$report_type_name = $this->data['report_type_name'] ;
$files = $this->data['files'] ?? [] ;
$zip_file_name = $this->data['zip_file_name'] ?? '' ;
$zip_file_url = $this->data['zip_file_url'] ?? '' ;
$zip_file_full_base_url = $this->data['zip_file_full_base_url'] ?? '' ;
$zip_file_full_url = $this->data['zip_file_full_url'] ?? '' ;
$recharge_archives = TransactionReportFacade::trans_recharge_rep_process($request);
writeToLog('======Queue started========'.'$offset='.$offset, 'debug');
try {
File::ensureDirectoryExists($file_full_base_url);
// Open/Create the file
$fp = fopen($file_full_url, 'a');
if($offset==0)
{
$header = [
"Serial",
"Operator" ,
"Client" ,
($request->report_type=='recharge_report' ? 'Recharge Date':'Request Date') ,
"Sender" ,
"Recipient" ,
"Amount" ,
"Status" ,
"Connection Type" ,
];
// Write to the csv
fputcsv($fp, $header);
}
$operator_info = DB::table('operator_info')->select('operator_id', 'operator_name')->get();
$client_info = DB::table('client_info')->select('client_id', 'client_name')->get();
$recharge_status_codes = DB::table('recharge_status_codes')->select('status_code', 'status_name')->get();
$cursor = $recharge_archives
->skip($offset)
->take($chunkSize)
->cursor();
foreach ($cursor as $item)
{
$item_data = [
"Serial" => $offset+1, // First chunk 0, then 10000, 20000
"Operator" => $operator_info->where('operator_id', $item->operator_id)->pluck('operator_name')->first() ,
"Client" => $client_info->where('client_id', $item->client_id)->pluck('client_name')->first(),
($request->report_type=='recharge_report' ? 'Recharge Date':'Request Date') => ($request->report_type=='recharge_report' ? $item->recharge_date:$item->request_date),
"Sender" => $item->request_sender_id,
"Recipient" => $item->recharge_recipient_msisdn,
"Amount" => $item->amount,
"Status" => $recharge_status_codes->where('status_code', $item->status_code)->pluck('status_name')->first(),
"Connection Type" => $item->connection_type,
];
fputcsv($fp, $item_data);
$offset = $offset+1;
}
$process_end_time = getTodayDateTime();
$user = DB::table('users_report')->where('id', $user_id)->first();
// Close the file
fclose($fp);
// When all jobs/chunks have finished
if ($offset>=$totalCount)
{
$file_type = 'text/csv'; // default csv format
if($totalCount>1000000) // zip proces will only applicable for 1M+ dataset
{
$file_url = $zip_file_url; // .csv to .zip file format
$file_type = 'application/zip'; // .csv to .zip file format for more than 1m data
$zip = new ZipArchive();
writeToLog('$zip_file_full_url = '.$zip_file_full_url, 'debug');
$zip->open($zip_file_full_url, ZipArchive::CREATE);
foreach ($files as $key => $csvfilename)
{
// Add the CSV file to the ZIP archive
writeToLog('$file_full_base_url $csvfilename '.$file_full_base_url.'/'. $csvfilename, 'debug');
$zip->addFile($file_full_base_url.'/'. $csvfilename, $csvfilename);
}
$zip->close();
}
reportlogsGenerate($fileName, $file_url, $user_id);
$notification_response = [
'is_report' => 1 ,
'report_name' => $report_type_name,
'process_start_time' => $process_start_time,
'process_end_time' => $process_end_time,
'execution_time' => getTimeDuration($process_start_time, $process_end_time),
'no_of_records' => $offset,
'filePath' => $file_url,
"type" => $file_type,
];
notificationsGenerate($notification_response, $user_id, $report_type_name.' is generated.', 'text/csv');
}
writeToLog('======Queue ended========'.'$offset='.$offset, 'debug');
} catch (\Throwable $e) {
report($e);
return false;
}
}
}