使用Parallel::ForkManager限制进程数

3
我正在尝试使用Parallel::ForkManager来控制一些子进程。我希望将同时运行的进程数量限制为10个。总共需要运行20个进程。
我知道我可以在对象声明的第一行设置进程限制为10,但我也使用$pm对象来运行执行不同操作的子进程(当前函数需要更多的内存,因此需要限制)。
我目前的代码无法正常工作,run on finish调用从未被执行,因此剩余的10个子进程从未被分叉。我不明白为什么会出现这种情况——我认为子进程仍然会在退出时调用完成代码并减少计数,但“if”语句似乎阻止了这一点。有人能解释一下为什么吗?
感谢任何帮助!
# Parallel declarations
my $pm = Parallel::ForkManager->new(30);

$pm->run_on_finish(sub {
    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_str_ref) = @_; 
    --$active_jobs;
    })

my $total_jobs = 0;
my $active_jobs = 0;
while( $total_jobs < 20) {
    sleep 300 and next if $active_jobs > 10; 

    my $pid = $pm->start and ++$active_p1_jobs and ++$total_p1_jobs and next;

    my $return = module::function(%args);

    $pm->finish(0, { index => $total_jobs, return => $return }); 
    }

print STDERR "Submitted all jobs, now waiting for children to exit.\n";
$pm->wait_all_children();
1个回答

3
我将称限制为10的工作为“类型2”。
以下是我使用P::FM的方式:
use strict;
use warnings;

use List::Util            qw( shuffle );
use Parallel::ForkManager qw( );
use POSIX                 qw( WNOHANG );
use Time::HiRes           qw( sleep );

use constant MAX_WORKERS       => 30;
use constant MAX_TYPE2_WORKERS => 10;

sub is_type2_job { $_[0]{type} == 2 }

my @jobs = shuffle(
   ( map { { type => 1, data => $_ } } 0..19 ),
   ( map { { type => 2, data => $_ } } 0..19 ),
);

my $pm = Parallel::ForkManager->new(MAX_WORKERS);

my $type2_count = 0;
$pm->run_on_finish(sub {
   my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $job) = @_;
   --$type2_count if is_type2_job($job);
   print "Finished: $pid, $job->{type}, $job->{data}, $job->{result}\n";
});

my @postponed_jobs;
while (@postponed_jobs || @jobs) {
   my $job;
   if (@postponed_jobs && $type2_count < MAX_TYPE2_WORKERS) {
      $job = shift(@postponed_jobs);
   }
   elsif (@jobs) {
      $job = shift(@jobs);
      if ($type2_count >= MAX_TYPE2_WORKERS && is_type2_job($job)) {
         push @postponed_jobs, $job;
         redo;
      }
   }
   # elsif (@postponed_jobs) {
   #     # Already max type 2 jobs being processed,
   #     # but there are idle workers.
   #     $job = shift(@postponed_jobs);
   # }
   else {
      local $SIG{CHLD} = sub { };
      select(undef, undef, undef, 0.300);
      $pm->wait_one_child(WNOHANG);
      redo;
   }

   ++$type2_count if is_type2_job($job);

   my $pid = $pm->start and next;
   $job->{result} = $job->{data} + 100;  # Or whatever.
   $pm->finish(0, $job);
}

$pm->wait_all_children();

但这里有一个问题。选择下一个任务的代码应该在start中间完成(即在等待子进程完成但在fork之前),而不是在start之前。这可能会导致任务的执行顺序出现错误。我已经不是第一次希望P::FM有一个pre-fork回调了。也许你可以向维护者提出这个建议。


对我来说可以。在我的节点上添加了缺失的代码部分。你得到了多少行输出?(perl script.pl | wc -l - ikegami
我得到了40行输出,正如预期的那样。然而,您似乎没有增加$type2_count。如果我添加my $pid = $pm->start and ++$type2_count and next;,我会陷入无限循环中。 - distracted-biologist
修复了。这正是我添加的内容。 - ikegami
1
我看到问题了。如果我们在应该选择作业的时候进行了选择(但不能选择),我们就不会有问题,因为我们已经等待一个子进程结束,但现在我们正在睡眠而没有等待子进程结束。我们将不得不从wait_all_children中复制一些代码。 - ikegami
让我们在聊天中继续这个讨论 - distracted-biologist
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接