PHP/Beanstalkd: Spawning Multiple workers in parallel
Asked Answered
N

1

7

I've got a script which checks which MX record belongs to an email address. I've got about 300k emails to check. So a single threaded process is going to take a long time.

I've got beanstalkd with a queue and php is sending emails to it via a file. However I'm only ever getting one worker to execute the queue. I'm currently at a loss to spawn say 10+ workers for a process.

I run do_job_mx.php which then opens a file which just contains emails and passes them on to the queue.

The php code to take emails from a file and put into the queue - do_job_mx.php:

require_once('pheanstalk_init.php');

$pheanstalk = new Pheanstalk_Pheanstalk('127.0.0.1:11300');

$filename = '_blank.txt';
$filename = dirname(__FILE__) . '/in/' . $filename;

foreach (file($filename, FILE_SKIP_EMPTY_LINES) as $line) 
{
    $json = json_encode(array("email" => trim($line)));

    $pheanstalk
        ->useTube('process_mx')
        ->put($json);
}

The php code for the worker - do_worker_process_mx.php:

class Worker 
{
    public function __construct() 
    {
        $this->log('worker process - starting');

        require_once('pheanstalk_init.php');
        $this->pheanstalk = new Pheanstalk_Pheanstalk('127.0.0.1:11300');
    }

    public function __destruct() 
    {
        $this->log('worker process - ending');
    }

    public function run() 
    {
        $this->log('worker process - starting to run');

        while(1) 
        {
            $job = $this->pheanstalk
                ->watch('process_mx')
                ->ignore('default')
                ->reserve();

            $data = json_decode($job->getData(), true);

            $this->process_mx($data);

            $this->pheanstalk->delete($job);
        }
    }

    private function process_mx($data)
    {
        $domain = explode("@", $data['email']);

        dns_get_mx($domain[1], $mx_records);

        $mx_array = explode(".", strtolower($mx_records[0]));

        $mx = array_slice($mx_array, -2, count($mx_array));

        $mx_domain = implode(".", $mx);

        echo $data['email'] . "\n";

        $this->write_file($mx_domain, $data['email']);
    }       

    private function write_file($mx, $email)
    {
        $filename = fopen(dirname(__FILE__) . "/out/" . $mx . ".txt", 'ab+');

        fwrite($filename, $email . "\n");

        fclose($filename);
    }

    private function log($txt) 
    {
        echo $txt . "\n";
    }
}

$worker = new Worker();
$worker->run();

Supervisord conf:

[program:do_worker_process]
command=/usr/bin/php /srv/www/mydev/public_html/esp/do_worker_process_mx.php
numprocs=10
numprocs_start=10
autostart=true
autorestart=true
stopsignal=QUIT
log_stdout=true
logfile=/var/log/supervisor/worker_process_mx.log

I'm currently at a loss to spawn say 10+ workers for a process.

Number of processes running:

# supervisorctl status

do_worker_process RUNNING    pid 44343, uptime 1:46:11
Notarize answered 1/5, 2013 at 16:36 Comment(0)
N
0

Centos 6 comes with:

beanstalkd 1.4.6 supervisor 2.1.8

I just needed to upgrade to supervisor 3.0.

Now I have the multiple worker facility.

Notarize answered 2/5, 2013 at 6:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.