PHP守护进程/工作进程环境

15
问题:我想实现几个 PHP Worker 进程,监听一个 MQ 服务器队列以执行异步作业。问题在于,仅仅将这些进程作为守护进程在服务器上运行,并不能给我任何控制实例的级别(负载、状态、锁定)...除非是通过 ps -aux 命令来查询。因此,我正在寻找一种运行时环境,可以让我监控和控制这些实例,无论是在系统(进程)级别还是在更高层次(某种 Java 样式的应用服务器)。
任何指针?

另请参见:http://symfony.com/doc/master/components/process.html - salmatron
5个回答

12

这里有一些可能有用的代码。

<?
define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '/path/to/your/processor');
set_time_limit(0);
$cycles = 0;
$run = true;
$reload = false;
declare(ticks = 30);

function signal_handler($signal) {
    switch($signal) {
    case SIGTERM :
        global $run;
        $run = false;
        break;
    case SIGHUP  :
        global $reload;
        $reload = true;
        break;
    }   
}

pcntl_signal(SIGTERM, 'signal_handler');
pcntl_signal(SIGHUP, 'signal_handler');

function spawn_processor() {
    $pid = pcntl_fork();
    if($pid) {
        global $processors;
        $processors[] = $pid;
    } else {
        if(posix_setsid() == -1)
            die("Forked process could not detach from terminal\n");
        fclose(stdin);
        fclose(stdout);
        fclose(stderr);
        pcntl_exec(PROCESSOR_EXECUTABLE);
        die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
    }
}

function spawn_processors() {
    global $processors;
    if($processors)
        kill_processors();
    $processors = array();
    for($ix = 0; $ix < WANT_PROCESSORS; $ix++)
        spawn_processor();
}

function kill_processors() {
    global $processors;
    foreach($processors as $processor)
        posix_kill($processor, SIGTERM);
    foreach($processors as $processor)
        pcntl_waitpid($processor);
    unset($processors);
}

function check_processors() {
    global $processors;
    $valid = array();
    foreach($processors as $processor) {
        pcntl_waitpid($processor, $status, WNOHANG);
        if(posix_getsid($processor))
            $valid[] = $processor;
    }
    $processors = $valid;
    if(count($processors) > WANT_PROCESSORS) {
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            posix_kill($processors[$ix], SIGTERM);
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            pcntl_waitpid($processors[$ix]);
    } elseif(count($processors) < WANT_PROCESSORS) {
        for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
            spawn_processor();
    }
}

spawn_processors();

while($run) {
    $cycles++;
    if($reload) {
        $reload = false;
        kill_processors();
        spawn_processors();
    } else {
        check_processors();
    }
    usleep(150000);
}
kill_processors();
pcntl_wait();
?>

你是从哪里得到这个代码的?开源项目还是自己写的?这里具体发生了什么,有任何文档或解释吗? - leek
@gAMBOOKa:你应该把它写成一个单独的答案,而不是一个评论。 :) - chaos

4

听起来您已经在*nix系统上运行了一个MQ,只是想找一种管理worker的方法。

一个非常简单的方法是使用GNU screen。要启动10个workers,您可以使用以下命令:

#!/bin/sh
for x in `seq 1 10` ; do
screen -dmS worker_$x php /path/to/script.php worker$x
end

这将在后台启动10个工作进程,使用命名为worker_1、2、3等的屏幕。您可以通过运行screen -r worker_重新连接到屏幕,并使用screen -list列出正在运行的工作进程。有关更多信息,请参阅以下指南:http://www.kuro5hin.org/story/2004/3/9/16838/14935。此外,您还可以尝试以下操作:
- screen --help - man screen - 或 google
对于生产服务器,我通常建议使用正常的系统启动脚本,但我多年来一直在从启动脚本中运行screen命令,没有任何问题。

1

1
你真的需要它一直运行吗?
如果你只想在需要时生成新进程,你可以将其注册为xinetd中的服务。

生成方面不是我认为的一个大问题,因为工作人员的数量取决于通常是恒定的系统性能。更重要的是监控每个工作人员状态(崩溃等)的方面。我刚刚发现的一个工具是DJB的deamontools。 - Sebastian
这是一个选项。为了监控,您还可以使用flock()-ed PID文件。在崩溃时,所有锁都会被释放。 - vartec

0

以下是我们对 @chaos 答案的工作实现。由于这个脚本通常只存在几毫秒,因此去除了处理信号的代码。

此外,在代码中我们添加了两个函数来保存调用之间的 pids:restore_processors_state() 和 save_processors_state()。我们在这里使用了 redis ,但您可以决定使用文件实现。

我们使用 cron 每分钟运行此脚本。Cron 检查所有进程是否存活。如果没有,则重新运行它们,然后退出。如果要杀死现有进程,则只需带上参数 kill 运行此脚本:php script.php kill

这种无需注入脚本到 init.d 中就能运行 worker 的方法非常方便。

<?php

include_once dirname( __FILE__ ) . '/path/to/bootstrap.php';

define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php');
set_time_limit(0);

$run = true;
$reload = false;
declare(ticks = 30);

function restore_processors_state()
{
    global $processors;

    $redis = Zend_Registry::get('redis');
    $pids = $redis->hget('worker_procs', 'pids');

    if( !$pids )
    {
        $processors = array();
    }
    else
    {
        $processors = json_decode($pids, true);
    }
}

function save_processors_state()
{
    global $processors;

    $redis = Zend_Registry::get('redis');
    $redis->hset('worker_procs', 'pids', json_encode($processors));
}

function spawn_processor() {
    $pid = pcntl_fork();
    if($pid) {
        global $processors;
        $processors[] = $pid;
    } else {
        if(posix_setsid() == -1)
            die("Forked process could not detach from terminal\n");
        fclose(STDIN);
        fclose(STDOUT);
        fclose(STDERR);
        pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE));
        die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
    }
}

function spawn_processors() {
    restore_processors_state();

    check_processors();

    save_processors_state();
}

function kill_processors() {
    global $processors;
    foreach($processors as $processor)
        posix_kill($processor, SIGTERM);
    foreach($processors as $processor)
        pcntl_waitpid($processor, $trash);
    unset($processors);
}

function check_processors() {
    global $processors;
    $valid = array();
    foreach($processors as $processor) {
        pcntl_waitpid($processor, $status, WNOHANG);
        if(posix_getsid($processor))
            $valid[] = $processor;
    }
    $processors = $valid;
    if(count($processors) > WANT_PROCESSORS) {
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            posix_kill($processors[$ix], SIGTERM);
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            pcntl_waitpid($processors[$ix], $trash);
    }
    elseif(count($processors) < WANT_PROCESSORS) {
        for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
            spawn_processor();
    }
}

if( isset($argv) && count($argv) > 1 ) {
    if( $argv[1] == 'kill' ) {
        restore_processors_state();
        kill_processors();
        save_processors_state();

        exit(0);
    }
}

spawn_processors();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接