我有许多Gearman工人在不断地运行, 保存诸如记录用户页面浏览等内容。偶尔, 我会更新Gearman工人使用的PHP代码。为了使工人切换到新代码,我需要杀死并重新启动工人的PHP进程。
有更好的方法吗?大概有时候我会丢失一些数据(虽然不是非常重要的数据)当我杀死其中一个工人进程的时候。
编辑:我已经找到了一个适合我的答案,并在下面发布了它。
我有许多Gearman工人在不断地运行, 保存诸如记录用户页面浏览等内容。偶尔, 我会更新Gearman工人使用的PHP代码。为了使工人切换到新代码,我需要杀死并重新启动工人的PHP进程。
有更好的方法吗?大概有时候我会丢失一些数据(虽然不是非常重要的数据)当我杀死其中一个工人进程的时候。
编辑:我已经找到了一个适合我的答案,并在下面发布了它。
通常我会使用Unix守护程序工具来运行我的工作进程,并带上-r标志,让它们在完成一次任务后自动终止。您的脚本将在每次迭代后优雅地结束,然后守护程序将自动重新启动。
您的工作进程将过时一次,但这可能对您而言并不像丢失数据那么重要。
此解决方案还具有释放内存的优点。如果您正在执行大型任务,则可能会遇到内存问题,因为PHP 5.3之前的GC效果非常糟糕。
您还可以向所有工作进程添加退出脚本的函数。当您想重新启动时,只需给Gearman调用一个高优先级的退出信号即可。
function AutoRestart() {
static $startTime = time();
if (filemtime(__FILE__) > $startTime) {
exit();
}
}
AutoRestart();
我发布了这个问题,现在我认为我已经找到了一个好的答案。
如果您查看Net_Gearman_Worker的代码,您会发现在工作循环中,监视stopWork函数,如果它返回true,则退出该函数。
我做了以下几点:
使用memcache,我创建了一个缓存值gearman_restarttime,并使用单独的脚本将其设置为当前时间戳,每当我更新网站时。(我使用了Memcache,但这可以存储在任何地方-数据库、文件或其他地方)。
我扩展了Worker类,使其成为Net_Gearman_Worker_Foo,并让我的所有worker实例化它。在Foo类中,我重写了stopWork函数,首先检查gearman_restarttime;第一次通过时,它将该值保存在全局变量中。从那时起,每次通过时,它将缓存值与全局值进行比较。如果它已更改,则stopWork返回true,worker退出。一个cron每分钟检查每个worker是否仍在运行,并重新启动任何已退出的worker。
在stopWork中放置计时器并每x分钟只检查一次缓存也是值得的。在我们的情况下,Memcache足够快,每次检查值似乎不是问题,但如果您使用其他系统来存储当前时间戳,则较少地进行检查会更好。
gearman.GearmanWorker
并重写work()
函数,以清晰地关闭每个工作者。from gearman import GearmanWorker
POLL_TIMEOUT_IN_SECONDS = 60.0
class StoppableWorker(GearmanWorker):
def __init__(self, host_list=None):
super(StoppableWorker,self).__init__(host_list=host_list)
self._exit_runloop = False
# OVERRIDDEN
def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS):
worker_connections = []
continue_working = True
def continue_while_connections_alive(any_activity):
return self.after_poll(any_activity)
while continue_working and not self._exit_runloop:
worker_connections = self.establish_worker_connections()
continue_working = self.poll_connections_until_stopped(
worker_connections,
continue_while_connections_alive,
timeout=poll_timeout)
for current_connection in worker_connections:
current_connection.close()
self.shutdown()
def stopwork(self):
self._exit_runloop = True
使用它就像使用GearmanWorker一样。当脚本退出时,调用stopwork()
函数。它不会立即停止 - 在退出运行循环之前,可能需要多达poll_timeout
秒。
可能有多种聪明的方法来调用stopwork()
函数。在我的情况下,在主线程中创建一个临时的gearman客户端。对于我要关闭的worker,我通过gearman服务器发送一个特殊的STOP命令。当worker收到此消息时,它知道要关闭自己。
希望这可以帮助!
嗯,您可以在工作进程中实现一段代码,定期检查源代码是否已修改,如果是,则在适当时候自行终止。也就是说,在它们执行任务的过程中进行检查,如果任务非常大。
另一种方法是通过网络实现某种类型的中断,以便在有机会时停止并重新启动。
最后一个解决方案是帮助修改Gearman的源代码以包括此功能。
我最近也在研究这个问题(不过是用Gearman::XS的Perl)。我的使用场景和你一样——允许长时间运行的Gearman worker 定期检查自身是否有新版本并重新加载。
我的第一次尝试只是让worker跟踪上次检查worker脚本版本的时间(md5sum也可以工作)。然后,在两个任务之间,一旦经过了N秒,它就会检查是否有新版本可用,并重新启动自己(fork()/exec())。这确实可以正常工作,但是注册了罕见任务的worker可能需要等待几个小时才能返回work(),因此需要检查当前时间。
所以现在我在等待任务时设置了相当短的超时时间,以便更经常地检查时间。PHP接口建议您在注册任务时设置此超时值。我使用SIGALRM触发新版本检查。Perl接口在work()上阻塞,因此最初未触发警报。将超时设置为60秒使SIGALRM起作用。
考虑到工人们是用PHP编写的,定期回收他们是一个好主意。这可以是一个固定的时间段,也可以在尝试了一定数量的作业后完成。
这实际上一举两得。您不仅减少了内存泄漏的可能性,还有一种一致的方法来确定您的工人何时会接收任何潜在的新代码。
我通常编写工人,使它们将其间隔报告给stdout和/或日志记录设施,以便轻松检查工人在进程中的位置。
http://phpscaling.com/2009/06/23/doing-the-work-elsewhere-sidebar-running-the-worker/
就像上面的文章所展示的那样,我在BASH shell脚本中运行了一个工作进程,在任务之间偶尔退出以进行清理(或重新加载工作脚本)- 或者如果给定任务,则可以使用特定的退出代码退出并关闭。
Ctrl-C
和 kill -TERM
两种方式。默认情况下,如果没有修改 signal=
设置,supervisor
将发送 TERM
信号。在 PHP 5.3+ 中,declare(ticks = 1)
已被弃用,请改用 pcntl_signal_dispatch()
。$terminate = false;
pcntl_signal(SIGINT, function() use (&$terminate)
{
$terminate = true;
});
pcntl_signal(SIGTERM, function() use (&$terminate)
{
$terminate = true;
});
$worker = new GearmanWorker();
$worker->addOptions(GEARMAN_WORKER_NON_BLOCKING);
$worker->setTimeout(1000);
$worker->addServer('127.0.0.1', 4730);
$worker->addFunction('reverse', function(GearmanJob $job)
{
return strrev($job->workload());
});
$count = 500 + rand(0, 100); // rand to prevent multple workers restart at same time
for($i = 0; $i < $count; $i++)
{
if ( $terminate )
{
break;
}
else
{
pcntl_signal_dispatch();
}
$worker->work();
if ( $terminate )
{
break;
}
else
{
pcntl_signal_dispatch();
}
if ( GEARMAN_SUCCESS == $worker->returnCode() )
{
continue;
}
if ( GEARMAN_IO_WAIT != $worker->returnCode() && GEARMAN_NO_JOBS != $worker->returnCode() )
{
$e = new ErrorException($worker->error(), $worker->returnCode());
// log exception
break;
}
$worker->wait();
}
$worker->unregisterAll();