使用命名管道与bash - 数据丢失问题

16

在网上做了一些搜索,找到了使用命名管道的简单“教程”。但是当我使用后台作业时,似乎会丢失很多数据。

[[编辑:找到了一个更简单的解决方案,请看帖子的回复。因此我提出的问题现在是学术性的 - 万一有人想要一个工作服务器]]

我正在使用Ubuntu 10.04,Linux 2.6.32-25-generic #45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/Linux

GNU bash,版本4.1.5(1)-release(x86_64-pc-linux-gnu)。

我的bash函数是:

function jqs
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read txt <"$pipe"
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      fi
    fi
  done
}

我在后台运行这个程序:

> jqs&
[1] 5336

现在我要把它喂:

for i in 1 2 3 4 5 6 7 8
do
  (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done
输出结果不一致。我经常无法得到所有的成功回声。我最多只能获得与成功回声相同数量的新文本回声,有时更少。
如果我将“feed”中的“&”删除,它似乎可以工作,但我会一直被阻塞直到输出被读取。因此,我希望让子进程被阻塞,但不是主进程。
目标是编写一个简单的作业控制脚本,以便我最多可以并行运行10个作业,并将其余作业排队进行后续处理,但可靠地知道它们确实在运行。
下面是完整的作业管理器:
function jq_manage
{
  export __gn__="$1"

  pipe=/tmp/__job_control_manager_"$__gn__"__
  trap "rm -f $pipe"    EXIT
  trap "break"      SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    date
    jobs
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
    then
      echo "Waiting for new job"
      if read new_job <"$pipe"
      then
    echo "new job is [[$new_job]]"

    if [[ "$new_job" == 'quit' ]]
    then
      break
    fi

    echo "In group $__gn__, starting job $new_job"
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
      fi
    else
      sleep 3
    fi
  done
}

function jq
{
  # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
  # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently

  export __gn__="$1"
  shift
  export __jN__="$1"
  shift

  export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
  if (($__jq__ '<' 1))
  then
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
  fi

  pipe=/tmp/__job_control_manager_"$__gn__"__

  echo $@ >$pipe
}

调用

jq <name> <max processes> <command>
jq abc 2 sleep 20

我将启动一个进程。那部分工作正常。 启动第二个也没问题。一个一个手动启动似乎也很好。但在循环中启动10个进程似乎会失去系统,就像上面更简单的例子一样。

有什么提示可以解决这种显然的IPC数据丢失将不胜感激。

谢谢, Alain。


请查看第二次2018年编辑的如何将变量设置为命令输出GitHub.com: Connector-bash,其中我将工具子进程连接到我的当前shell会话。 - F. Hauri - Give Up GitHub
6个回答

30

你的问题是以下 if 语句:

while true
do
    if read txt <"$pipe"
    ....
done

你的任务队列服务器在每次循环时都打开和关闭管道。这意味着当客户端尝试写入管道时,一些客户端会收到“ broken pipe”错误 - 也就是说,在写入者打开管道后,读取器就消失了。

要解决此问题,请更改服务器中的循环,仅一次打开整个循环的管道:

while true
do
    if read txt
    ....
done < "$pipe"

按照这种方式,管道只需打开一次并保持打开状态。

需要注意,在循环内部运行的所有处理都将stdin附加到命名管道。您需要确保在循环内的所有进程中重定向stdin,否则它们可能会消耗来自管道的数据。

编辑:现在问题是当最后一个客户端关闭管道时,您的读取操作遇到了EOF。您可以使用jilles的fd复制方法,或者您也可以确保自己也是客户端,并保持管道的写入端口开放:

while true
do
    if read txt
    ....
done < "$pipe" 3> "$pipe"

这将在fd 3上持有管道的写入端口。与stdin一样,对于此文件描述符也适用相同的注意事项。您需要关闭它,以便任何子进程不会继承它。与stdin相比,这似乎并不那么重要,但这样做会更加清晰。


哇,太棒了。讲得很清楚明白。谢谢。我会立刻尝试的。 - asoundmove
好的,现在你解决了关键问题,我还有一个问题:如何让读取等待输入?我将在下面进一步回复自己并附上示例代码。 - asoundmove
1
@asoundmove:我已经更新了答案,并提供了读取EOF的解决方案。 - camh
这种方法避免了依赖于非POSIX行为,但在某些情况下可能存在不好的原因。 - jilles

8

正如其他答案所述,您需要始终保持fifo处于打开状态以避免数据丢失。

然而,一旦所有的写入者在fifo被打开后离开(所以有一个写入者),读取将立即返回(并且poll()返回POLLHUP)。清除这种状态的唯一方法是重新打开fifo。

POSIX没有提供解决方案,但至少Linux和FreeBSD提供了:如果读取开始失败,请同时打开原始描述符并再次打开fifo。这是有效的,因为在Linux和FreeBSD中,“挂起”状态是特定打开文件描述符本地的,而在POSIX中它是全局的。

可以像这样在shell脚本中完成:

while :; do
    exec 3<tmp/testfifo
    exec 4<&-
    while read x; do
        echo "input: $x"
    done <&3
    exec 4<&3
    exec 3<&-
done

1
在Bash中,你可以使用read -u 3从指定的文件描述符号读取数据,而不是使用{...read...} <&3 - ephemient
@ephemient read -u 3 x 相较于 read x <&3 有什么优势? - jilles
哇,这个可行!你能解释一下为什么我不能使用fd 1而不是3吗?第一次运行它可以工作,但之后就失败了。我会单独发布一个评论来展示最新的完整脚本。 - asoundmove
@jilles:据我了解,使用“read x <&3”会在读取完成后关闭管道,而使用“read -u 3 x”则会将管道保持打开状态以供下一次读取。 - asoundmove
@jilles:优点在于很明显(不需要查找与while ...; do匹配的done),即read正在从FD 3读取,并且do ...; done的其余部分没有重定向FD 0。 - ephemient
@ephemient:使用while read x <&3; do ...; done也可以得到相同的结果,那么为什么需要read -u呢? - jilles

2

针对可能感兴趣的人,这里有两个新版本的测试服务器脚本,经过camh和jilles的评论后进行了重新编辑。

两个版本现在都能够完美地实现预期功能。

camh版本用于管道管理:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    fi
  done 3< "$pipe" 4> "$pipe"    # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}

jille的通管管理版本:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  exec 3< "$pipe"
  exec 4<&-

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    else
      # Close the pipe and reconnect it so that the next read does not end up returning EOF
      exec 4<&3
      exec 3<&-
      exec 3< "$pipe"
      exec 4<&-
    fi
  done
}

感谢大家的帮助。

2
你无法捕获SIGKILL信号,试图这样做毫无意义。另外,请查看我的最后一次编辑,以获取不需要复制文件描述符的更简单方法。 - camh
好的。已测试,当然你是对的。谢谢你的指引,camh。 - asoundmove

1

就像Camh和Dennis Williamson所说的,不要破坏管道。

现在我有更小的例子,可以直接在命令行中执行:

服务器:

(
  for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
  do
    if read s;
      then echo ">>$i--$s//";
    else
      echo "<<$i";
    fi;
  done < tst-fifo
)&

客户:

(
  for i in {%a,#b}{1,2}{0,1};
  do
    echo "Test-$i" > tst-fifo;
  done
)&

可以用以下代码替换关键行:

    (echo "Test-$i" > tst-fifo&);

所有发送到管道的客户端数据都会被读取,但是使用客户端的第二个选项时,可能需要启动服务器几次才能读取所有数据。

但是尽管读取一开始等待管道中的数据,一旦数据被推送,它就会永远读取空字符串。

有什么方法可以停止这种情况吗?

再次感谢任何见解。


0
并行运行最多10个作业,并将其余的排队以供后续处理,但可靠地知道它们确实在运行。您可以使用GNU Parallel来完成此操作,无需编写脚本。

http://www.gnu.org/software/parallel/man.html#options

您可以设置最大进程数 "任务槽数。并行运行多达N个作业。" 有一个选项可以设置要使用的CPU核心数。您可以将已执行的作业列表保存到日志文件中,但这是一个 beta 功能。


0
一方面,问题比我想象的更严重: 现在,在我更复杂的示例(jq_manage)中似乎存在这样一个情况,即从管道中反复读取相同的数据(即使没有新数据被写入)。
另一方面,我找到了一个简单的解决方案(根据Dennis的评论进行了编辑):
function jqn    # compute the number of jobs running in that group
{
  __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
}

function jq
{
  __groupn__="$1";  shift   # job group name (the pool within which to allocate $__jmax__ jobs)
  __jmax__="$1";    shift   # maximum of job numbers to run concurrently

  jqn
  while (($__jqty__ '>=' $__jmax__))
  do
    sleep 1
    jqn
  done

  eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; $@) &"
}

非常好用。 没有涉及到套接字或管道。 简单易懂。


1
没有理由导出__jqty__(或原始导出中的任何内容)。为什么要直接将某些东西回显到/dev/null?为什么使用eval?为什么不只是做$@&?引用>=是不必要的。我同意camh的答案。 - Dennis Williamson
最终归结为读取和过滤ps的输出。将echo输出到/dev/null,因为我实际上不需要输出,我只需要在“ps”的输出中找到正确的字符串。eval也是如此,否则ps会显示变量名称,而不是扩展变量,eval进行扩展。我以前从未使用过((...)),所以感谢您指出我不需要引号,我只是根据我在某个地方读到的示例进行操作,并且还要感谢导出,这是先前更复杂的脚本的剩余部分,其中包含子进程并需要导出。 - asoundmove
抱歉,我的意思是“作业”,而不是“ps”。 - asoundmove

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接