我最初的解决方案https://superuser.com/questions/482953/read-non-blocking-from-multiple-fifos-in-parallel?answertab=oldest#tab-top会将数据存储到磁盘上。
现在我已经制作了第二个版本,它在内存中缓冲一行。
它能够工作,但需要在启动之前连接所有的FIFO。以下是可以正常工作的代码:
window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null
window2$ parallel -j0 'cat bigfile > ' ::: *
这不会产生任何输出(因为100没有连接):
window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null
window2$ parallel -j0 'cat bigfile > ' ::: {1..99}
我尝试使用
open '+<'
。这解决了上述问题,但现在它不会在EOF处停止。我该怎么做?
最小版本(不支持大行并且不会回退等待):
#!/usr/bin/perl
use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use Fcntl qw(:DEFAULT :flock);
for (@ARGV) {
open($fh{$_},"<",$_) || die;
# set fh non blocking($fh{$_});
my $flags;
fcntl($fh{$_}, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh{$_}, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
while(keys %fh) {
for(keys %fh) {
my($string,$something_read) = non_blocking_read($_);
print $string;
}
# Sleep 1 ms
select(undef, undef, undef, 1/1000);
}
{
my %buffer;
sub non_blocking_read {
my $file = shift;
my $in = $fh{$file};
my $rv = sysread($in, substr($buffer{$file},length $buffer{$file}), 327680);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
return(undef,undef);
} else {
# This file is done
close $in;
delete $fh{$file};
my $buf = $buffer{$file};
delete $buffer{$file};
return ($buf,1);
}
}
# Find \n for full line
my $i = (rindex($buffer{$file},"\n")+1);
if($i) {
# Return full line
# Remove full line from $buffer
return(substr($buffer{$file},0,$i),
1,substr($buffer{$file},0,$i) = "");
} else {
# Something read, but not a full line
return("",1);
}
}
}
完整版本:重要的代码在前40行:其余是经过充分测试的代码。
#!/usr/bin/perl
use Symbol qw(gensym);
use IPC::Open3;
for (@ARGV) {
open($fh{$_},"<",$_) || die;
set_fh_non_blocking($fh{$_});
}
$ms = 1;
while(keys %fh) {
for(keys %fh) {
my($string,$something_read) = non_blocking_read($_);
if($something_read) {
$ms = 0.1;
print $string;
}
}
$ms = exp_usleep($ms);
}
{
my %buffer;
my $ms;
sub non_blocking_read {
use POSIX qw(:errno_h);
my $file = shift;
my $in = $fh{$file};
my $rv = read($in, substr($buffer{$file},length $buffer{$file}), 327680);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
return(undef,undef);
} else {
# This file is done
close $in;
delete $fh{$file};
my $buf = $buffer{$file};
delete $buffer{$file};
return ($buf,1);
}
}
#### Well-tested code below
# Find \n or \r for full line
my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
(::rindex64(\$buffer{$file},"\r")+1);
if($i) {
# Return full line
# Remove full line from $buffer
return(substr($buffer{$file},0,$i),
1,substr($buffer{$file},0,$i) = "");
} else {
# Something read, but not a full line
return("",1);
}
}
}
sub rindex64 {
# Do rindex on strings > 2GB.
# rindex in Perl < v5.22 does not work for > 2GB
# Input:
# as rindex except STR which must be passed as a reference
# Output:
# as rindex
my $ref = shift;
my $match = shift;
my $pos = shift;
my $block_size = 2**31-1;
my $strlen = length($$ref);
# Default: search from end
$pos = defined $pos ? $pos : $strlen;
# No point in doing extra work if we don't need to.
if($strlen < $block_size) {
return rindex($$ref, $match, $pos);
}
my $matchlen = length($match);
my $ret;
my $offset = $pos - $block_size + $matchlen;
if($offset < 0) {
# The offset is less than a $block_size
# Set the $offset to 0 and
# Adjust block_size accordingly
$block_size = $block_size + $offset;
$offset = 0;
}
while($offset >= 0) {
$ret = rindex(
substr($$ref, $offset, $block_size),
$match);
if($ret != -1) {
return $ret + $offset;
}
$offset -= ($block_size - $matchlen - 1);
}
return -1;
}
sub exp_usleep {
# Sleep this many milliseconds.
# Input:
# $ms = milliseconds to sleep
# Returns:
# $ms + 10%
my $ms = shift;
select(undef, undef, undef, $ms/1000);
return (($ms < 1000) ? ($ms * 1.1) : ($ms));
}
sub set_fh_non_blocking {
# Set filehandle as non-blocking
# Inputs:
# $fh = filehandle to be blocking
# Returns:
# N/A
my $fh = shift;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
my $flags;
fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
eval "use Fcntl qw(:DEFAULT :flock); 1;"
在BEGIN
块之外没有意义。请用require Fcntl;
替换$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
。[这不会解决你的问题。] - ikegamisysread
没有任何区别。它在你的系统上工作了吗?(我花了一整天的时间来让select
工作,但失败了,所以当 EOF 问题解决时,请期待后续问题)。 - Ole Tange