在Perl 6中运行自供频道

4

我想在一个通道上同时设置多个线程,并且每个线程也应该在通道上进行输入。其中一个线程将决定何时停止。但是,这是我最接近实现它的方法:

use Algorithm::Evolutionary::Simple;

my $length = 32;
my $supplier = Supplier.new;
my $supply   = $supplier.Supply;
my $channel-one = $supply.Channel;
my $pairs-supply = $supply.batch( elems => 2 );
my $channel-two = $pairs-supply.Channel;

my $single = start {
    react  {
        whenever $channel-one -> $item {
            say "via Channel 1:", max-ones($item);
        }
    }
}

my $pairs = start {
    react  {
        whenever $channel-two -> @pair {
        my @new-chromosome = crossover( @pair[0], @pair[1] );
        say "In Channel 2: ", @new-chromosome;
        $supplier.emit( @new-chromosome[0]);
        $supplier.emit( @new-chromosome[1]);
        }
    }
}

await (^10).map: -> $r {
    start {
    sleep $r/100.0;
        $supplier.emit( random-chromosome($length) );
    }
}

$supplier.done;

这将在一定数量的发射后停止。而且它可能不会同时运行。我使用通道而不是供应和插头,因为这些不是并发运行,而是异步运行。我需要供应,因为我想要一个伪通道,以成对地获取元素,就像上面所做的那样;我没有看到使用纯通道的方法来实现这一点。如果我将供应的emit更改为通道的send,则上述内容没有区别。
因此,这里有几个问题:
1.这些react块是否在不同的线程中运行?如果不是,有什么方法可以做到这一点?
2.即使它们不是,为什么它会停止,即使$pairs一直向通道发出信号?
3.我能否从单项通道自动创建“批量”通道?
更新1:如果我从末尾删除$supplier.done,它将只会阻塞。如果我在每次读取时创建一个承诺,它只会阻塞并且什么也不做。

3
你可以像这样做:start react whenever ... { ... },而不是 start { react { whenever ... { ... } } } - Brad Gilbert
3
这是如何从另一个通道获取成对发射的通道:my Channel $c .= new; $c.send($_) for ^20; $c.close; my Channel $c2 .= new; my $work = start { $c2.send: $_ for $c.List.rotor(2); $c2.close; CATCH { default { $c2.fail($_) } } }; .say for $c2.List; await $work - 检查一下你是否也想在rotor调用中使用 :partial。 - timotimo
@BradGilbert 没有区别。为什么那样会更好? - jjmerelo
@timotimo 这并不是我正在寻找的。如果我这样修改它 if ( $count++ < 100 ) { $c.send( $count ); } else { $c.close; } 以便它继续向第一个频道提供数据,然后再传递给第二个频道,在最初的20个数字之后它就会挂起。在此之前,$count 将被初始化为0。 - jjmerelo
这个链接更接近我想要的,但仍然不够。它只是丢掉了一些对并在处理前40个@timotimo后停止。 - jjmerelo
1个回答

1
答案在这里,已经削减到最小必要内容。
my Channel $c .= new;
my Channel $c2 = $c.Supply.batch( elems => 2).Channel;
my Channel $output .= new;
my $count = 0;
$c.send(1) for ^2;

my $more-work = start react whenever $c2 -> @item {
    if ( $count++ < 32 ) {
        $c.send( @item[1]);
    my $sum = sum @item;
    $c.send( $sum );
    $output.send( $sum ); 
    } else {
    $c.close;
    }

}
await $more-work;
loop {
    if my $item = $output.poll {
    $item.say
    } else {
    $output.close;
    }
    if $output.closed  { last };
}

创建第二个通道,每两个元素批处理第一个通道,通过从通道($c.Supply)创建供应品,将该供应品分批处理成两个元素(batch(elems => 2)),并将其转换回通道。创建第三个通道用于输出。 为了不耗尽供应并挂起通道,从第一个(实际上是唯一的)通道中读取的每个第二个元素都被放回那里。因此,以二进制方式读取的第二个通道永远不会挂起或等待新元素。 为每个新元素创建一个输出通道,并创建一个外部计数器以在需要时完成操作;以非阻塞方式读取该输出通道,并在最后一行没有可读内容时关闭它。 为了精确回答我的原始问题:
  1. 是的,只是它们互相窃取元素。
  2. 因为这两个线程都从同一个通道中读取。先遇到元素的线程先读取它。
  3. 是的,通过将通道转换为供应品、进行分批处理并将其转换回通道。只要记住它们不是副本,它们将共享相同的元素。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接