如何在Perl中实现信号量线程通信?

6

我的Perl脚本需要同时运行多个线程...

use threads ('yield', 'exit' => 'threads_only');
use threads::shared;
use strict;
use warnings;
 no warnings 'threads';
use LWP::UserAgent;
use HTTP::Request;
use HTTP::Async;
use ...

...这些线程需要从网络获取一些信息,因此使用了HTTP::Async

my $request = HTTP::Request->new;
   $request->protocol('HTTP/1.1');
   $request->method('GET');
   $request->header('User-Agent' => '...');

my $async = HTTP::Async->new( slots            => 100,
                              timeout          => REQUEST_TIMEOUT,
                              max_request_time => REQUEST_TIMEOUT );

但有些线程需要在其他线程发出信号时才能访问网络。
my $start = [Time::HiRes::gettimeofday()];
my @threads = ();
foreach ... {
  $thread = threads->create(
    sub {
           local $SIG{KILL} = sub { threads->exit };
           my $url = shift;
           if ($url ... ) {
             # wait for "go" signal from other threads
           }
           my ($response, $data);
           $request->url($url);
           $data = '';
           $async->add($request);
           while ($response = $async->wait_for_next_response) {
             threads->yield();
             $data .= $response->as_string;
           }
           if ($data ... ) {
             # send "go" signal to waiting threads
           }
         }
       }, $_);

  if (defined $thread) {
    $thread->detach;
    push (@threads, $thread);
  }
}

可能会有一个或多个线程等待“go”信号,并且可能有一个或多个线程可以发送此类“go”信号。初始时,信号量的状态为“wait”,一旦变为“go”,则会保持不变。

最后,应用程序检查最大运行时间。如果线程运行时间过长,则发送自我终止信号。

my $running;
do {
  $running = 0;
  foreach my $thread (@threads) {
    $running++ if $thread->is_running();
  }
  threads->yield();
} until (($running == 0) || 
         (Time::HiRes::tv_interval($start) > MAX_RUN_TIME));
$running = 0;
foreach my $thread (@threads) {
  if ($thread->is_running()) {
    $thread->kill('KILL');
    $running++;
  }
}
threads->yield();

现在重点来了,我的问题是:

  1. 我如何在脚本中最有效地编写等待“信号量”代码(请参见上面脚本中的注释)?我应该只使用带有一些虚拟sleep循环的共享变量吗?

  2. 我是否需要在应用程序结尾添加一些sleep循环,以便线程有时间自我销毁?


我理解的是否正确,您正在使用单独的HTTP :: Async对象(由新线程复制而不是共享)来每次每个线程最多获取一个URL? - pilcrow
@pilcrow - 是的,看起来是这样。这会浪费资源吗? - Ωmega
它可能不太节省内存或时间,但真正的问题在于它会消耗程序员的时间。 :) 这个设计很难理解,因此也许很难安全地进行更改/扩展,因为组件似乎不太合适。 - pilcrow
@pilcrow - 那么您建议在线程内分配HTTP::Async吗?我相信没有必要共享这个对象。除非采用这种方法可以更加优化。 - Ωmega
我不知道足够的信息来发表评论。你向我们展示了你的实现摘录,你如何去做某事,但并没有说明你想要做什么。我只能观察到HTTP::Async是一个用于单线程并行网络请求的工具,这似乎与我们所看到的设计部分不符。 - pilcrow
2个回答

3
你可以考虑使用Thread::Queue来完成这项工作。你可以设置一个队列来处理等待“go”信号的线程和发送“go”信号的线程之间的信号传递。这是一个快速的模拟,我还没有测试过:
...
use Thread::Queue;
...
# In main body
my $q = Thread::Queue->new();
...
$thread = threads->create(
    sub {
           local $SIG{KILL} = sub { threads->exit };
           my $url = shift;
           if ($url ... ) {
             # wait for "go" signal from other threads
             my $mesg = $q->dequeue();
             # you could put in some termination code if the $mesg isn't 'go'
             if ($mesg ne 'go') { ... }
           }
           ...
           if ($data ... ) {
             # send "go" signal to waiting threads
             $q->enqueue('go');
           }
         }
       }, $_);
...

需要等待“go”信号的线程将在dequeue方法上等待,直到有东西进入队列。一旦消息进入队列,一个线程且仅一个线程将获取该消息并处理它。
如果您希望停止线程以使其不运行,可以将停止消息插入队列的头部。
$q->insert(0, 'stop') foreach (@threads);

在Thread::Queue和threads CPAN分发中有更详细的示例。

针对您的第二个问题,答案很遗憾,这取决于情况。当您终止线程时,需要进行哪种清理以进行干净的关闭?如果从线程下面拔出地毯会发生最坏的情况是什么?您将希望计划为清理腾出时间。您可以选择的另一个选项是等待每个线程实际完成。

我询问您是否可以删除detach调用的原因是,此方法允许主线程退出而不关心任何子线程正在发生的事情。相反,如果您删除此调用并添加:

$_->join() foreach threads->list();

将此代码添加到您的主块的末尾,这将要求主应用程序等待每个线程实际完成。
如果您保留detach方法,则需要在代码结尾处进行睡眠,如果需要线程执行任何清理操作。当您在线程上调用detach时,您告诉Perl,您不关心主线程退出时该线程正在做什么。如果主线程退出并且仍在运行已分离的线程,则程序将以无警告方式完成。但是,如果您不需要任何清理工作,并且仍然调用detach,则可以随时退出。

这个问题有一个价值+50声望的开放性赏金。请改进你的答案。我发现你的帖子很有趣,但是你没有回答我的帖子中的第二个子问题(如何/是否等待线程自毁)。 - Ωmega
@user1215106 我注意到你的代码中使用了 $thread->detach;。通常这是用来忽略线程,不必担心它是否完成的。你加入这个代码的原因是什么,还是可以将其删除? - Joel
我相信它可以被移除。 - Ωmega
我无法无限制地等待线程完成,因为在某个时刻我必须释放资源给其他应用程序。如果线程没有按时完成,它就会在$response = $async->wait_for_next_response这一行上几乎停滞不前。 - Ωmega
@user1215106 请看最后一段,如果那能更好地回答你的第二个问题,请告诉我。 - Joel
这是最好的答案,即使我期望更多。谢谢。 - Ωmega

-1

尝试类似这样的东西...

#!/usr/bin/perl

use threads;
use threads::shared;

$|=1;

my ($global):shared;
my (@threads);

push(@threads, threads->new(\&mySub,1));
push(@threads, threads->new(\&mySub,2));
push(@threads, threads->new(\&mySub,3));

$i = 0;

foreach my $myThread(@threads)

{
    my @ReturnData = $myTread->join ;
    print "Thread $i returned: @ReturnData\n";
    $i++;
}

sub mySub
{
    my ($threadID) = @_;

    for(0..1000)
    {
        $global++;
        print "Thread ID: $threadID >> $_ >> GLB: $global\n";
        sleep(1);
    }   
    return( $id );
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接