优雅停止Gearman工作进程

43

我有许多Gearman工人在不断地运行, 保存诸如记录用户页面浏览等内容。偶尔, 我会更新Gearman工人使用的PHP代码。为了使工人切换到新代码,我需要杀死并重新启动工人的PHP进程。

有更好的方法吗?大概有时候我会丢失一些数据(虽然不是非常重要的数据)当我杀死其中一个工人进程的时候。

编辑:我已经找到了一个适合我的答案,并在下面发布了它。


52
现在我本来想回复“请停下!” :-) - cletus
16
那些德国工人对你做了什么? :) - Justin Ethier
2
这是Gearman,不是德语(German)。虽然我在前两次阅读时也认为它是德语。 - Dylan
2
我不知道他们存在?(一个不断运行的德国工人)... - Rob Vermeulen
12个回答

12

解决方案1


通常我会使用Unix守护程序工具来运行我的工作进程,并带上-r标志,让它们在完成一次任务后自动终止。您的脚本将在每次迭代后优雅地结束,然后守护程序将自动重新启动。

您的工作进程将过时一次,但这可能对您而言并不像丢失数据那么重要。

此解决方案还具有释放内存的优点。如果您正在执行大型任务,则可能会遇到内存问题,因为PHP 5.3之前的GC效果非常糟糕。

解决方案2


您还可以向所有工作进程添加退出脚本的函数。当您想重新启动时,只需给Gearman调用一个高优先级的退出信号即可。


3
如果在函数内部退出脚本,解决方案2将无法起作用,任务将不会完成并且下一次会重新启动:kill、respawn、kill、respawn、kill... - Fabian Schmengler

8
function AutoRestart() {
   static $startTime = time();

   if (filemtime(__FILE__) > $startTime) {
      exit();
   }
}

AutoRestart();  

7

我发布了这个问题,现在我认为我已经找到了一个好的答案。

如果您查看Net_Gearman_Worker的代码,您会发现在工作循环中,监视stopWork函数,如果它返回true,则退出该函数。

我做了以下几点:
使用memcache,我创建了一个缓存值gearman_restarttime,并使用单独的脚本将其设置为当前时间戳,每当我更新网站时。(我使用了Memcache,但这可以存储在任何地方-数据库、文件或其他地方)。

我扩展了Worker类,使其成为Net_Gearman_Worker_Foo,并让我的所有worker实例化它。在Foo类中,我重写了stopWork函数,首先检查gearman_restarttime;第一次通过时,它将该值保存在全局变量中。从那时起,每次通过时,它将缓存值与全局值进行比较。如果它已更改,则stopWork返回true,worker退出。一个cron每分钟检查每个worker是否仍在运行,并重新启动任何已退出的worker。

在stopWork中放置计时器并每x分钟只检查一次缓存也是值得的。在我们的情况下,Memcache足够快,每次检查值似乎不是问题,但如果您使用其他系统来存储当前时间戳,则较少地进行检查会更好。


仅仅使用memcached并不能保证数据在过期时间之前一直存在。 - Simon Bennett

1
我遇到了同样的问题,并想出了一个适用于python 2.7的解决方案。
我正在编写一个使用gearman与系统中其他组件通信的python脚本。该脚本将有多个工作者,我让每个工作者在单独的线程中运行。工作者们都接收gearman数据,处理并将该数据存储在消息队列中,主线程可以根据需要从队列中取出数据。
我的解决方法是继承gearman.GearmanWorker并重写work()函数,以清晰地关闭每个工作者。
from gearman import GearmanWorker
POLL_TIMEOUT_IN_SECONDS = 60.0
class StoppableWorker(GearmanWorker):
    def __init__(self, host_list=None):
        super(StoppableWorker,self).__init__(host_list=host_list)
        self._exit_runloop = False


    # OVERRIDDEN
    def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS):
        worker_connections = []
        continue_working = True

        def continue_while_connections_alive(any_activity):
            return self.after_poll(any_activity)

        while continue_working and not self._exit_runloop:
            worker_connections = self.establish_worker_connections()
            continue_working = self.poll_connections_until_stopped(
                worker_connections,
                continue_while_connections_alive,
                timeout=poll_timeout)

        for current_connection in worker_connections:
            current_connection.close()

        self.shutdown()


    def stopwork(self):
        self._exit_runloop = True

使用它就像使用GearmanWorker一样。当脚本退出时,调用stopwork()函数。它不会立即停止 - 在退出运行循环之前,可能需要多达poll_timeout秒。

可能有多种聪明的方法来调用stopwork()函数。在我的情况下,在主线程中创建一个临时的gearman客户端。对于我要关闭的worker,我通过gearman服务器发送一个特殊的STOP命令。当worker收到此消息时,它知道要关闭自己。

希望这可以帮助!


1

嗯,您可以在工作进程中实现一段代码,定期检查源代码是否已修改,如果是,则在适当时候自行终止。也就是说,在它们执行任务的过程中进行检查,如果任务非常大。

另一种方法是通过网络实现某种类型的中断,以便在有机会时停止并重新启动。

最后一个解决方案是帮助修改Gearman的源代码以包括此功能。


将自己杀死 => 可能只需使用相同的参数在自己身上调用exec。 - mjy

1

我最近也在研究这个问题(不过是用Gearman::XS的Perl)。我的使用场景和你一样——允许长时间运行的Gearman worker 定期检查自身是否有新版本并重新加载。

我的第一次尝试只是让worker跟踪上次检查worker脚本版本的时间(md5sum也可以工作)。然后,在两个任务之间,一旦经过了N秒,它就会检查是否有新版本可用,并重新启动自己(fork()/exec())。这确实可以正常工作,但是注册了罕见任务的worker可能需要等待几个小时才能返回work(),因此需要检查当前时间。

所以现在我在等待任务时设置了相当短的超时时间,以便更经常地检查时间。PHP接口建议您在注册任务时设置此超时值。我使用SIGALRM触发新版本检查。Perl接口在work()上阻塞,因此最初未触发警报。将超时设置为60秒使SIGALRM起作用。


Perl 只会在 work() 上使用安全信号阻塞。使用 Perl::Unsafe::Signals,您就不必阻塞了。请参考 GearmanX::Starter 的示例。 - runrig
不是说你必须使用Perl :: Unsafe :: Signals,但在我看来,这是相对安全地使用不安全信号的简单方法。 - runrig

1

考虑到工人们是用PHP编写的,定期回收他们是一个好主意。这可以是一个固定的时间段,也可以在尝试了一定数量的作业后完成。

这实际上一举两得。您不仅减少了内存泄漏的可能性,还有一种一致的方法来确定您的工人何时会接收任何潜在的新代码。

我通常编写工人,使它们将其间隔报告给stdout和/或日志记录设施,以便轻松检查工人在进程中的位置。


1

1
如果有人正在寻找运行 Perl 的工作程序的答案,那么 GearmanX::Starter 库就是其中一部分。您可以通过两种不同的方式停止当前作业后的工作程序:通过向工作进程发送 SIGTERM 信号进行外部停止,或通过设置全局变量进行编程式停止。

1
我使用以下代码,支持 Ctrl-Ckill -TERM 两种方式。默认情况下,如果没有修改 signal= 设置,supervisor 将发送 TERM 信号。在 PHP 5.3+ 中,declare(ticks = 1) 已被弃用,请改用 pcntl_signal_dispatch()
$terminate = false;
pcntl_signal(SIGINT, function() use (&$terminate)
{
    $terminate = true;
});
pcntl_signal(SIGTERM, function() use (&$terminate)
{
    $terminate = true;
});

$worker = new GearmanWorker();
$worker->addOptions(GEARMAN_WORKER_NON_BLOCKING);
$worker->setTimeout(1000);
$worker->addServer('127.0.0.1', 4730);
$worker->addFunction('reverse', function(GearmanJob $job)
{
    return strrev($job->workload());
});

$count = 500 + rand(0, 100); // rand to prevent multple workers restart at same time
for($i = 0; $i < $count; $i++)
{
    if ( $terminate )
    {
        break;
    }
    else
    {
        pcntl_signal_dispatch();
    }

    $worker->work();

    if ( $terminate )
    {
        break;
    }
    else
    {
        pcntl_signal_dispatch();
    }

    if ( GEARMAN_SUCCESS == $worker->returnCode() )
    {
        continue;
    }

    if ( GEARMAN_IO_WAIT != $worker->returnCode() && GEARMAN_NO_JOBS != $worker->returnCode() )
    {
        $e = new ErrorException($worker->error(), $worker->returnCode());
        // log exception
        break;
    }

    $worker->wait();
}

$worker->unregisterAll();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接