PHP Daemon/worker environment
Asked Answered
A

5

15

Problem: I want to implement several php-worker processes who are listening on a MQ-server queue for asynchronous jobs. The problem now is that simply running this processes as daemons on a server doesn't really give me any level of control over the instances (Load, Status, locked up)...except maybe for dumping ps -aux. Because of that I'm looking for a runtime environment of some kind that lets me monitor and control the instances, either on system (process) level or on a higher layer (some kind of Java-style appserver)

Any pointers?

Alveta answered 15/4, 2009 at 15:22 Comment(1)
Also see: symfony.com/doc/master/components/process.htmlStatic
S
12

Here's some code that may be useful.

<?
define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '/path/to/your/processor');
set_time_limit(0);
$cycles = 0;
$run = true;
$reload = false;
declare(ticks = 30);

function signal_handler($signal) {
    switch($signal) {
    case SIGTERM :
        global $run;
        $run = false;
        break;
    case SIGHUP  :
        global $reload;
        $reload = true;
        break;
    }   
}

pcntl_signal(SIGTERM, 'signal_handler');
pcntl_signal(SIGHUP, 'signal_handler');

function spawn_processor() {
    $pid = pcntl_fork();
    if($pid) {
        global $processors;
        $processors[] = $pid;
    } else {
        if(posix_setsid() == -1)
            die("Forked process could not detach from terminal\n");
        fclose(stdin);
        fclose(stdout);
        fclose(stderr);
        pcntl_exec(PROCESSOR_EXECUTABLE);
        die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
    }
}

function spawn_processors() {
    global $processors;
    if($processors)
        kill_processors();
    $processors = array();
    for($ix = 0; $ix < WANT_PROCESSORS; $ix++)
        spawn_processor();
}

function kill_processors() {
    global $processors;
    foreach($processors as $processor)
        posix_kill($processor, SIGTERM);
    foreach($processors as $processor)
        pcntl_waitpid($processor);
    unset($processors);
}

function check_processors() {
    global $processors;
    $valid = array();
    foreach($processors as $processor) {
        pcntl_waitpid($processor, $status, WNOHANG);
        if(posix_getsid($processor))
            $valid[] = $processor;
    }
    $processors = $valid;
    if(count($processors) > WANT_PROCESSORS) {
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            posix_kill($processors[$ix], SIGTERM);
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            pcntl_waitpid($processors[$ix]);
    } elseif(count($processors) < WANT_PROCESSORS) {
        for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
            spawn_processor();
    }
}

spawn_processors();

while($run) {
    $cycles++;
    if($reload) {
        $reload = false;
        kill_processors();
        spawn_processors();
    } else {
        check_processors();
    }
    usleep(150000);
}
kill_processors();
pcntl_wait();
?>
Samovar answered 15/4, 2009 at 15:31 Comment(2)
Where did you get this? Open source project or your own code? Any documentation or explanation of what exactly is going on here?Effie
@gAMBOOKa: You should write that as a separate answer rather than a comment. :)Samovar
L
4

It sounds like you already have a MQ up and running on a *nix system and just want a way to manage workers.

A very simple way to do so is to use GNU screen. To start 10 workers you can use:

#!/bin/sh
for x in `seq 1 10` ; do
screen -dmS worker_$x php /path/to/script.php worker$x
end

This will start 10 workers in the background using screens named worker_1,2,3 and so on.

You can reattach to the screens by running screen -r worker_ and list the running workers by using screen -list.

For more info this guide may be of help: http://www.kuro5hin.org/story/2004/3/9/16838/14935

Also try:

  • screen --help
  • man screen
  • or google.

For production servers I would normally recommend using the normal system startup scripts, but I have been running screen commands from the startup scripts for years with no problems.

Lindsey answered 15/4, 2009 at 18:56 Comment(0)
F
1

Do you actually need it to be continuously running?

If you only want to spawn new process on request, you can register it as a service in xinetd.

Festivity answered 15/4, 2009 at 15:33 Comment(2)
The spawning-aspect isn't a big issue imho because the number of workers is depending on the system performance which is usually constant. More important would be the monitoring aspect of the individual worker status (crashed, whatever). One tool I just discovered for this might be DJBs deamontoolsAlveta
That's one option. For monitoring you could also use flock()-ed PID files. Upon crash all locks are released.Festivity
A
1

a pcntl plugin type server daemon for PHP

http://dev.pedemont.com/sonic/

Abjure answered 5/6, 2009 at 3:55 Comment(0)
T
0

Bellow is our working implementation of @chaos answer. Code to handle signals was removed as this script lives usually just few milliseconds.

Also, in code we added 2 functions to save pids between calls: restore_processors_state() and save_processors_state(). We've used redis there, but you can decide to use implementation on files.

We run this script every minute using cron. Cron checks if all processes alive. If not - it re-run them and then dies. If we want to kill existing processes then we simply run this script with argument kill: php script.php kill.

Very handy way of running workers without injecting scripts into init.d.

<?php

include_once dirname( __FILE__ ) . '/path/to/bootstrap.php';

define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php');
set_time_limit(0);

$run = true;
$reload = false;
declare(ticks = 30);

function restore_processors_state()
{
    global $processors;

    $redis = Zend_Registry::get('redis');
    $pids = $redis->hget('worker_procs', 'pids');

    if( !$pids )
    {
        $processors = array();
    }
    else
    {
        $processors = json_decode($pids, true);
    }
}

function save_processors_state()
{
    global $processors;

    $redis = Zend_Registry::get('redis');
    $redis->hset('worker_procs', 'pids', json_encode($processors));
}

function spawn_processor() {
    $pid = pcntl_fork();
    if($pid) {
        global $processors;
        $processors[] = $pid;
    } else {
        if(posix_setsid() == -1)
            die("Forked process could not detach from terminal\n");
        fclose(STDIN);
        fclose(STDOUT);
        fclose(STDERR);
        pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE));
        die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
    }
}

function spawn_processors() {
    restore_processors_state();

    check_processors();

    save_processors_state();
}

function kill_processors() {
    global $processors;
    foreach($processors as $processor)
        posix_kill($processor, SIGTERM);
    foreach($processors as $processor)
        pcntl_waitpid($processor, $trash);
    unset($processors);
}

function check_processors() {
    global $processors;
    $valid = array();
    foreach($processors as $processor) {
        pcntl_waitpid($processor, $status, WNOHANG);
        if(posix_getsid($processor))
            $valid[] = $processor;
    }
    $processors = $valid;
    if(count($processors) > WANT_PROCESSORS) {
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            posix_kill($processors[$ix], SIGTERM);
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            pcntl_waitpid($processors[$ix], $trash);
    }
    elseif(count($processors) < WANT_PROCESSORS) {
        for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
            spawn_processor();
    }
}

if( isset($argv) && count($argv) > 1 ) {
    if( $argv[1] == 'kill' ) {
        restore_processors_state();
        kill_processors();
        save_processors_state();

        exit(0);
    }
}

spawn_processors();
Tuyere answered 18/11, 2013 at 8:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.