Perl: 非阻塞读取FIFO

5

我最初的解决方案https://superuser.com/questions/482953/read-non-blocking-from-multiple-fifos-in-parallel?answertab=oldest#tab-top会将数据存储到磁盘上。

现在我已经制作了第二个版本,它在内存中缓冲一行。

它能够工作,但需要在启动之前连接所有的FIFO。以下是可以正常工作的代码:

window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null

window2$ parallel -j0 'cat bigfile > ' ::: *

这不会产生任何输出(因为100没有连接):
window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null

window2$ parallel -j0 'cat bigfile > ' ::: {1..99}

我尝试使用open '+<'。这解决了上述问题,但现在它不会在EOF处停止。
我该怎么做?
最小版本(不支持大行并且不会回退等待):
#!/usr/bin/perl

use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use Fcntl qw(:DEFAULT :flock);

for (@ARGV) {
    open($fh{$_},"<",$_) || die;
    # set fh non blocking($fh{$_});
    my $flags;
    fcntl($fh{$_}, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags
    fcntl($fh{$_}, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}

while(keys %fh) {
    for(keys %fh) {
        my($string,$something_read) = non_blocking_read($_);
    print $string;
    }
    # Sleep 1 ms
    select(undef, undef, undef, 1/1000);
}

{
    my %buffer;

    sub non_blocking_read {

        my $file = shift;
        my $in = $fh{$file};
        my $rv = sysread($in, substr($buffer{$file},length $buffer{$file}), 327680);
        if (!$rv) {
            if($! == EAGAIN) {
            # Would block: Nothing read
            return(undef,undef);
            } else {
            # This file is done
            close $in;
            delete $fh{$file};
            my $buf = $buffer{$file};
            delete $buffer{$file};
            return ($buf,1);
            }
        }

        # Find \n for full line
        my $i = (rindex($buffer{$file},"\n")+1);
        if($i) {
            # Return full line
            # Remove full line from $buffer
            return(substr($buffer{$file},0,$i),
               1,substr($buffer{$file},0,$i) = "");
        } else {
            # Something read, but not a full line
            return("",1);
        }
    }
}

完整版本:重要的代码在前40行:其余是经过充分测试的代码。
#!/usr/bin/perl

use Symbol qw(gensym);
use IPC::Open3;

for (@ARGV) {
    open($fh{$_},"<",$_) || die;
    set_fh_non_blocking($fh{$_});
}

$ms = 1;
while(keys %fh) {
    for(keys %fh) {
    my($string,$something_read) = non_blocking_read($_);
    if($something_read) {
        $ms = 0.1;
        print $string;
    }
    }
    $ms = exp_usleep($ms);
}

{
    my %buffer;
    my $ms;

    sub non_blocking_read {
    use POSIX qw(:errno_h);

    my $file = shift;
    my $in = $fh{$file};
    my $rv = read($in, substr($buffer{$file},length $buffer{$file}), 327680);
    if (!$rv) {
        if($! == EAGAIN) {
        # Would block: Nothing read
        return(undef,undef);
        } else {
        # This file is done
        close $in;
        delete $fh{$file};
        my $buf = $buffer{$file};
        delete $buffer{$file};
        return ($buf,1);
        }
    }

    #### Well-tested code below                                                           

    # Find \n or \r for full line
    my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
        (::rindex64(\$buffer{$file},"\r")+1);
    if($i) {
        # Return full line
        # Remove full line from $buffer
        return(substr($buffer{$file},0,$i),
           1,substr($buffer{$file},0,$i) = "");
    } else {
        # Something read, but not a full line
        return("",1);
    }
    }
}

sub rindex64 {
    # Do rindex on strings > 2GB.
    # rindex in Perl < v5.22 does not work for > 2GB
    # Input:
    #   as rindex except STR which must be passed as a reference
    # Output:
    #   as rindex
    my $ref = shift;
    my $match = shift;
    my $pos = shift;
    my $block_size = 2**31-1;
    my $strlen = length($$ref);
    # Default: search from end
    $pos = defined $pos ? $pos : $strlen;
    # No point in doing extra work if we don't need to.
    if($strlen < $block_size) {
    return rindex($$ref, $match, $pos);
    }

    my $matchlen = length($match);
    my $ret;
    my $offset = $pos - $block_size + $matchlen;
    if($offset < 0) {
    # The offset is less than a $block_size
    # Set the $offset to 0 and
    # Adjust block_size accordingly
    $block_size = $block_size + $offset;
    $offset = 0;
    }
    while($offset >= 0) {
    $ret = rindex(
        substr($$ref, $offset, $block_size),
        $match);
    if($ret != -1) {
        return $ret + $offset;
    }
    $offset -= ($block_size - $matchlen - 1);
    }
    return -1;
}

sub exp_usleep {
    # Sleep this many milliseconds.
    # Input:
    #   $ms = milliseconds to sleep
    # Returns:
    #   $ms + 10%
    my $ms = shift;
    select(undef, undef, undef, $ms/1000);
    return (($ms < 1000) ? ($ms * 1.1) : ($ms));
}

sub set_fh_non_blocking {
    # Set filehandle as non-blocking
    # Inputs:
    #   $fh = filehandle to be blocking
    # Returns:
    #   N/A
    my $fh = shift;
    $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
    my $flags;
    fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags
    fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}

3
请问有人可以解释一下“关闭”投票吗?我不太明白为什么这个不符合要求... - stevieb
1
请尝试使用sysread而不是read。sysread实现非缓冲读取(即类似于系统调用中的read),而read实现缓冲读取并尝试读取整个缓冲区(类似于fread)。我还建议使用select来查找哪个文件描述符有数据。 - Steffen Ullrich
eval "use Fcntl qw(:DEFAULT :flock); 1;"BEGIN块之外没有意义。请用require Fcntl;替换$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";。[这不会解决你的问题。] - ikegami
2
@stevieb,我想这是因为他们认为问题的代码远非问题的最小演示。当OP不愿意时,我知道我不会花时间在这上面。 - ikegami
@SteffenUllrich 在我的系统上,sysread 没有任何区别。它在你的系统上工作了吗?(我花了一整天的时间来让 select 工作,但失败了,所以当 EOF 问题解决时,请期待后续问题)。 - Ole Tange
1
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Steffen Ullrich
1个回答

3

该解决方案打开一个虚假的写入器,只要接收到任何数据就会关闭。它执行正确的操作,但如果输入为空,则不会结束:

mkfifo {1..100}
parcat {1..100} &
parallel -j2 echo works '>' {} ::: {1..100}

parcat {1..100} &
# Fails (parcat does not exit)
parallel -j2 cat /dev/null '>' {} ::: {1..100}

代码:

#!/usr/bin/perl
    
use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use IO::Select;
use strict;

my $s = IO::Select->new();
my %fhr;
my %fhw;

for (@ARGV) {
    # Open the file with a fake writer that will never write
    open(my $fhw,"+<",$_) || die;
    # Open the file for real
    open(my $fhr,"<",$_) || die;
    set_fh_non_blocking($fhr);
    $s->add($fhr);
    $fhr{$fhr}++;
    $fhw{$fhr}=$fhw;
}

my %buffer;
while(keys %fhr) {
    for my $file ($s->can_read(undef)) {
        my $rv = sysread($file, substr($buffer{$file},length $buffer{$file}), 327680);
        if (!$rv) {
            if($! == EAGAIN) {
            # Would block: Nothing read
        next;
            } else {
            # This file is done
        $s->remove($file);
            delete $fhr{$file};
        print $buffer{$file};
            delete $buffer{$file};
        # Closing the $file causes it to block
        # close $file;
        next;
            }
        }
    if($fhw{$file}) {
        # We have received data from $file:
        # Close the fake writer
        close $fhw{$file};
        delete $fhw{$file};
    }
    
        # Find \n or \r for full line
        my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
            (::rindex64(\$buffer{$file},"\r")+1);
        if($i) {
            # Print full line
            # Remove full line from $buffer
        print substr($buffer{$file},0,$i);
        substr($buffer{$file},0,$i) = "";
        next;
        } else {
            # Something read, but not a full line
        next;
        }
    }
}

sub rindex64 {
    # Do rindex on strings > 2GB.
    # rindex in Perl < v5.22 does not work for > 2GB
    # Input:
    #   as rindex except STR which must be passed as a reference
    # Output:
    #   as rindex
    my $ref = shift;
    my $match = shift;
    my $pos = shift;
    my $block_size = 2**31-1;
    my $strlen = length($$ref);
    # Default: search from end
    $pos = defined $pos ? $pos : $strlen;
    # No point in doing extra work if we don't need to.
    if($strlen < $block_size) {
        return rindex($$ref, $match, $pos);
    }
    
    my $matchlen = length($match);
    my $ret;
    my $offset = $pos - $block_size + $matchlen;
    if($offset < 0) {
        # The offset is less than a $block_size
        # Set the $offset to 0 and
        # Adjust block_size accordingly
        $block_size = $block_size + $offset;
        $offset = 0;
    }
    while($offset >= 0) {
        $ret = rindex(
            substr($$ref, $offset, $block_size),
            $match);
        if($ret != -1) {
            return $ret + $offset;
        }
        $offset -= ($block_size - $matchlen - 1);
    }
    return -1;
}

sub set_fh_non_blocking {
    # Set filehandle as non-blocking
    # Inputs:
    #   $fh = filehandle to be blocking
    # Returns:
    #   N/A
    my $fh = shift;
    $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
    my $flags;
    fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags
    fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}

现在,GNU Parallel已经分发了一个更好的版本,名为parcat

我正在搜索“parcat”! - F. Hauri - Give Up GitHub

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接