Mojolicious:限制Promise数量/ IOLoop-> subprocess

4

我正在使用Mojolicious非阻塞方法(Promises)从外部系统请求数据。 1)我希望立即通知用户进程已经开始;2)我想扩展该程序。

下面的代码适用于一小组数字(几百个),但是对于更多数字,我会收到错误信息 [error] Can't create pipe: Too many open files at /path/lib/perl5/Mojo/IOLoop.pm line 156. 问题1)如何限制我生成的Promise数量(我在下面的代码中使用了map):

#!/usr/bin/env perl

use Mojolicious::Lite;
use Mojolicious::Plugin::TtRenderer;

sub isPrime
{
    my ($n) = @_;
    my $e = sqrt($n);
    for (my $i=2; $i<$e; $i++) {
        return 0 if $n%$i==0;
    }
    return 1;
}

sub makeApromise
{
    my ($number) = @_;

    my $promise = Mojo::Promise->new;
    Mojo::IOLoop->subprocess(
    sub {  # first callback is executed in subprocess
        my %response;
        # Simulate a long computational process
        $response{'number'}  = $number;
        $response{'isPrime'} = isPrime($number);
        return \%response;
    },
        sub {  # second callback resolves promise with subprocess result
            my ($self, $err, @result) = @_;
            return $promise->reject($err) if $err;
            $promise->resolve(@result);
        },
    );
    return $promise;
}

plugin 'tt_renderer'; # automatically render *.html.tt templates

any '/' => sub {
    my ($self) = @_;
    my $lines = $self->param( 'textarea' );

    if ($lines) {
    my @numbers;
    foreach my $number (split(/\r?\n/, $lines)) {
        push(@numbers, $number) if $number =~ /^\d+$/;
    }
    if (@numbers) {
        ####################################
        ### This is the problem below... ###
        my @promises = map { makeApromise($_) } @numbers;
        ####################################
        # MojoPromise Wait
        Mojo::Promise->all(@promises)
        ->then(sub {
            my @values = map { $_->[0] } @_;
            foreach my $response (@values) {
            #print STDERR $response->{'number'}, " => ", $response->{'isPrime'}, "\n";
            # Prepare email...
            }
            # Send an email...
               })
        #->wait # Don't wait? I want to tell the user to wait for an email as quickly as possible...
        if @promises;
    }
    $self->stash(done => "1",);
    }
    $self->render(template => 'index', format => 'html', handler => 'tt');
};

app->start;
__DATA__

@@ index.html.tt
<!DOCTYPE html>
<html lang="en">
  <head>
    <title>Make A Promise</title>
  </head>
  <body>
    [% IF done %]
    <h3>Thank you! You will receive an email shortly with the results.</h3>
    [% ELSE %]
    <h3>Enter numbers...</h3>
    <form role="form" action="/" method="post">
      <textarea name="textarea" rows="5" autofocus required></textarea>
      <button type="submit">Submit</button>
    </form>
    [% END %]
  </body>
</html>

我注释掉了wait代码;但似乎代码仍在阻塞。 问题2) 当我存储done变量时,如何立即通知用户进程已经开始?


我可能在这里理解有些问题——为什么不能通过限制提交到map@numbers来进行节流(管理它们的提交方式)?此外,你是如何运行所有这些代码的?对我来说它总是崩溃。 - zdim
@zdim,你需要像这样运行它:[02:54:26 abhasker@wsl -> temp$ morbo bug.pl 输出:Server available at http://127.0.0.1:3000 - AbhiNickz
@h-q,我用大约5000个输入数字运行了这个程序,虽然花费了一些时间,但没有发现任何问题,用户收到了这条消息:谢谢!您将很快收到一封带有结果的电子邮件。 - AbhiNickz
@AbhiNickz,我怎样才能立即收到消息“**谢谢!您将立即收到电子邮件...**”,而不是等待一段时间?另外为了测试,我使用了11位数字,例如使用'99999999977' 10,000次。 - h q
@zdim,您能否提供一个关于如何对提交给map@numbers进行节流的示例? - h q
1个回答

2
问题不在于承诺的数量,而在于子进程的数量。一种限制方法是在程序逻辑中限制同时创建的数量。不要在地图中一次性生成所有内容,而是设置一个限制并从 @numbers 中检索相应数量的内容(可以使用 splice),然后生成这些子进程;创建一个 ->all 承诺等待它们,并附加一个 ->then 承诺以检索下一组数字,以此类推。
另一种选择是使用 Future::Utils fmap_concat,它可以通过您提供的最大未完成 Futures 数量来处理速率限制代码。您的承诺返回函数可以应用 Mojo::Promise::Role::Futurify 将后续 Future 链接在一起以便以这种方式使用。
#!/usr/bin/env perl

use Mojolicious::Lite;
use Mojo::File 'path';
use Mojo::IOLoop;
use Mojo::Promise;
use Future::Utils 'fmap_concat';

get '/' => sub {
  my $c = shift;
  my $count = $c->param('count') // 0;
  my @numbers = 1..$count;

  if (@numbers) {
    my $result_f = fmap_concat {
      my $number = shift;
      my $p = Mojo::Promise->new;
      Mojo::IOLoop->subprocess(sub {
        sleep 2;
        return $number+1;
      }, sub {
        my ($subprocess, $err, @result) = @_;
        return $p->reject($err) if $err;
        $p->resolve(@result);
      });
      return $p->with_roles('Mojo::Promise::Role::Futurify')->futurify;
    } foreach => \@numbers, concurrent => 20;

    $result_f->on_done(sub {
      my @values = @_;
      foreach my $response (@values) {
        $c->app->log->info($response);
      }
    })->on_fail(sub {
      my $error = shift;
      $c->app->log->fatal($error);
    })->retain;

    $c->stash(done => 1);
  }
  $c->render(text => "Processing $count numbers\n");
};

app->start;

关于wait方法,当事件循环已经运行时,它不会执行任何操作,在webapp响应处理程序中,如果您在Mojolicious守护进程中启动了应用程序(而不是支持异步响应的PSGI或CGI服务器),则会出现这种情况。在回调之外的->stash和->render调用将在设置子进程后立即运行。然后,响应处理程序将完成,事件循环将再次控制,并在承诺解决后触发相应的->then回调。自设置子进程以来,渲染不应等待任何内容;由于您说可能有数百个子进程,这可能是您遇到的减速问题。请确保使用Mojolicious 7.86或更新版本,因为Subprocess已更改,因此fork将在事件循环的下一个刻度之后发生(在响应处理程序完成后)。
我还要指出,子进程并不真正为此设计;它们旨在执行仍然返回响应中的最终结果的缓慢代码(Mojolicious::Plugin::Subprocess非常适合此用例)。我可以看到的一个问题是,如果重新启动应用程序,则任何仍未完成的子进程将被忽略。对于您想要启动并忘记的作业,您可以考虑像Minion这样的作业队列,它与Mojolicious应用程序具有很好的集成,并通过单独的工作进程运行。

感谢您的见解。我之前使用的是 Mojolicious 7.61 和 7.75,现在已经升级到了 8.0 版本。虽然不再崩溃,但仍然会超时,并且我仍然无法处理大型数据集。请问您能否提供一个使用 Future::Utils fmap_concat 和 Mojo::Promise::Role::Futurify 的工作示例?非常感谢! - h q
我已经添加了一个示例,并提到了Minion作为此用例的替代方案。 - Grinnz
哇,谢谢@Grinnz,写得很好。我会把小黄人的研究留到另一天。现在真正的挑战是尝试找到理想的“并发”值来使用 :-) - h q

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接