PHP Daemon/worker environment

2019-01-16 07:42发布

问题:

Problem: I want to implement several php-worker processes who are listening on a MQ-server queue for asynchronous jobs. The problem now is that simply running this processes as daemons on a server doesn't really give me any level of control over the instances (Load, Status, locked up)...except maybe for dumping ps -aux. Because of that I'm looking for a runtime environment of some kind that lets me monitor and control the instances, either on system (process) level or on a higher layer (some kind of Java-style appserver)

Any pointers?

回答1:

Here's some code that may be useful.

<?
define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '/path/to/your/processor');
set_time_limit(0);
$cycles = 0;
$run = true;
$reload = false;
declare(ticks = 30);

function signal_handler($signal) {
    switch($signal) {
    case SIGTERM :
        global $run;
        $run = false;
        break;
    case SIGHUP  :
        global $reload;
        $reload = true;
        break;
    }   
}

pcntl_signal(SIGTERM, 'signal_handler');
pcntl_signal(SIGHUP, 'signal_handler');

function spawn_processor() {
    $pid = pcntl_fork();
    if($pid) {
        global $processors;
        $processors[] = $pid;
    } else {
        if(posix_setsid() == -1)
            die("Forked process could not detach from terminal\n");
        fclose(stdin);
        fclose(stdout);
        fclose(stderr);
        pcntl_exec(PROCESSOR_EXECUTABLE);
        die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
    }
}

function spawn_processors() {
    global $processors;
    if($processors)
        kill_processors();
    $processors = array();
    for($ix = 0; $ix < WANT_PROCESSORS; $ix++)
        spawn_processor();
}

function kill_processors() {
    global $processors;
    foreach($processors as $processor)
        posix_kill($processor, SIGTERM);
    foreach($processors as $processor)
        pcntl_waitpid($processor);
    unset($processors);
}

function check_processors() {
    global $processors;
    $valid = array();
    foreach($processors as $processor) {
        pcntl_waitpid($processor, $status, WNOHANG);
        if(posix_getsid($processor))
            $valid[] = $processor;
    }
    $processors = $valid;
    if(count($processors) > WANT_PROCESSORS) {
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            posix_kill($processors[$ix], SIGTERM);
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            pcntl_waitpid($processors[$ix]);
    } elseif(count($processors) < WANT_PROCESSORS) {
        for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
            spawn_processor();
    }
}

spawn_processors();

while($run) {
    $cycles++;
    if($reload) {
        $reload = false;
        kill_processors();
        spawn_processors();
    } else {
        check_processors();
    }
    usleep(150000);
}
kill_processors();
pcntl_wait();
?>


回答2:

It sounds like you already have a MQ up and running on a *nix system and just want a way to manage workers.

A very simple way to do so is to use GNU screen. To start 10 workers you can use:

#!/bin/sh
for x in `seq 1 10` ; do
screen -dmS worker_$x php /path/to/script.php worker$x
end

This will start 10 workers in the background using screens named worker_1,2,3 and so on.

You can reattach to the screens by running screen -r worker_ and list the running workers by using screen -list.

For more info this guide may be of help: http://www.kuro5hin.org/story/2004/3/9/16838/14935

Also try:

  • screen --help
  • man screen
  • or google.

For production servers I would normally recommend using the normal system startup scripts, but I have been running screen commands from the startup scripts for years with no problems.



回答3:

Do you actually need it to be continuously running?

If you only want to spawn new process on request, you can register it as a service in xinetd.



回答4:

a pcntl plugin type server daemon for PHP

http://dev.pedemont.com/sonic/



回答5:

Bellow is our working implementation of @chaos answer. Code to handle signals was removed as this script lives usually just few milliseconds.

Also, in code we added 2 functions to save pids between calls: restore_processors_state() and save_processors_state(). We've used redis there, but you can decide to use implementation on files.

We run this script every minute using cron. Cron checks if all processes alive. If not - it re-run them and then dies. If we want to kill existing processes then we simply run this script with argument kill: php script.php kill.

Very handy way of running workers without injecting scripts into init.d.

<?php

include_once dirname( __FILE__ ) . '/path/to/bootstrap.php';

define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php');
set_time_limit(0);

$run = true;
$reload = false;
declare(ticks = 30);

function restore_processors_state()
{
    global $processors;

    $redis = Zend_Registry::get('redis');
    $pids = $redis->hget('worker_procs', 'pids');

    if( !$pids )
    {
        $processors = array();
    }
    else
    {
        $processors = json_decode($pids, true);
    }
}

function save_processors_state()
{
    global $processors;

    $redis = Zend_Registry::get('redis');
    $redis->hset('worker_procs', 'pids', json_encode($processors));
}

function spawn_processor() {
    $pid = pcntl_fork();
    if($pid) {
        global $processors;
        $processors[] = $pid;
    } else {
        if(posix_setsid() == -1)
            die("Forked process could not detach from terminal\n");
        fclose(STDIN);
        fclose(STDOUT);
        fclose(STDERR);
        pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE));
        die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
    }
}

function spawn_processors() {
    restore_processors_state();

    check_processors();

    save_processors_state();
}

function kill_processors() {
    global $processors;
    foreach($processors as $processor)
        posix_kill($processor, SIGTERM);
    foreach($processors as $processor)
        pcntl_waitpid($processor, $trash);
    unset($processors);
}

function check_processors() {
    global $processors;
    $valid = array();
    foreach($processors as $processor) {
        pcntl_waitpid($processor, $status, WNOHANG);
        if(posix_getsid($processor))
            $valid[] = $processor;
    }
    $processors = $valid;
    if(count($processors) > WANT_PROCESSORS) {
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            posix_kill($processors[$ix], SIGTERM);
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            pcntl_waitpid($processors[$ix], $trash);
    }
    elseif(count($processors) < WANT_PROCESSORS) {
        for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
            spawn_processor();
    }
}

if( isset($argv) && count($argv) > 1 ) {
    if( $argv[1] == 'kill' ) {
        restore_processors_state();
        kill_processors();
        save_processors_state();

        exit(0);
    }
}

spawn_processors();