在C语言中实现流水线技术。最佳方法是什么?

3

我无法想出任何一种在C语言中实现管道的方法,这也是我决定在这里写下来的原因。我必须承认,我知道管道/分叉/命名管道的工作原理。我已经看到了很多实现2-3个管道的例子,这很容易。但问题出现在我需要实现一个shell,并且管道数量是未知的时候。

目前我所拥有的:

ls -al | tr a-z A-Z | tr A-Z a-z | tr a-z A-Z

我将这行文字转化为如下格式:
array[0] = {"ls", "-al", NULL"}
array[1] = {"tr", "a-z", "A-Z", NULL"}
array[2] = {"tr", "A-Z", "a-z", NULL"}
array[3] = {"tr", "a-z", "A-Z", NULL"}

因此,我可以使用。
execvp(array[0],array)

稍后再说。

直到现在,我相信一切都很好。问题出现在我试图将这些功能的输入/输出互相重定向时。

以下是我的做法:

    mkfifo("queue", 0777);

    for (i = 0; i<= pipelines_count; i++) // eg. if there's 3 pipelines, there's 4 functions to execvp
    {
    int b = fork();             
    if (b == 0) // child
        {           
        int c = fork();

        if (c == 0) 
        // baby (younger than child) 
        // I use c process, to unblock desc_read and desc_writ for b process only
        // nothing executes in here
            {       
            if (i == 0) // 1st pipeline
                {
                int desc_read = open("queue", O_RDONLY);
                // dup2 here, so after closing there's still something that can read from 
                // from desc_read
                dup2(desc_read, 0); 
                close(desc_read);           
                }

            if (i == pipelines_count) // last pipeline
                {
                int desc_write = open("queue", O_WRONLY);
                dup2(desc_write, 0);
                close(desc_write);                              
                }

            if (i > 0 && i < pipelines_count) // pipeline somewhere inside
                {
                int desc_read = open("queue", O_RDONLY);
                int desc_write = open("queue", O_WRONLY);
                dup2(desc_write, 1);
                dup2(desc_read, 0);
                close(desc_write);
                close(desc_read);
                }               
            exit(0); // closing every connection between process c and pipeline             
            }
        else
        // b process here
        // in b process, i execvp commands
        {                       
        if (i == 0) // 1st pipeline (changing stdout only)
            {   
            int desc_write = open("queue", O_WRONLY);               
            dup2(desc_write, 1); // changing stdout -> pdesc[1]
            close(desc_write);                  
            }

        if (i == pipelines_count) // last pipeline (changing stdin only)
            {   
            int desc_read = open("queue", O_RDONLY);                                    
            dup2(desc_read, 0); // changing stdin -> pdesc[0]   
            close(desc_read);           
            }

        if (i > 0 && i < pipelines_count) // pipeline somewhere inside
            {               
            int desc_write = open("queue", O_WRONLY);       
            dup2(desc_write, 1); // changing stdout -> pdesc[1]
            int desc_read = open("queue", O_RDONLY);                            
            dup2(desc_read, 0); // changing stdin -> pdesc[0]
            close(desc_write);
            close(desc_read);                               
            }

        wait(NULL); // it wait's until, process c is death                      
        execvp(array[0],array);         
        }
        }
    else // parent (waits for 1 sub command to be finished)
        {       
        wait(NULL);
        }       
    }

感谢您的选择。

2
现在每个人都在写自己的shell吗?这周已经有第三个问题是关于“我的自己的shell”了。 - user529758
1
@H2CO3 这可能与操作系统课程作业有关。至少在我的第一个操作系统课程中,我不得不编写一个 shell。 - mah
4个回答

7

Patryk,为什么你要使用一个FIFO,而且对于管道的每个阶段都使用同一个FIFO?

在我看来,你需要在每个阶段之间使用一个管道。因此流程应该是这样的:

Shell             ls               tr                tr
-----             ----             ----              ----
pipe(fds);
fork();  
close(fds[0]);    close(fds[1]);
                  dup2(fds[0],0); 
                  pipe(fds);
                  fork();         
                  close(fds[0]);   close(fds[1]);  
                  dup2(fds[1],1);  dup2(fds[0],0);
                  exex(...);       pipe(fds);
                                   fork();     
                                   close(fds[0]);     etc
                                   dup2(fds[1],1);
                                   exex(...);  

在每个分支 shell 中运行的序列(close、dup2、pipe 等)看起来像一个函数(以所需进程的名称和参数形式)。请注意,在每个 shell 中的 exec 调用之前,都会运行 fork 的 shell 的副本。
编辑:
Patryk:
Also, is my thinking correct? Shall it work like that? (pseudocode): 
start_fork(ls) -> end_fork(ls) -> start_fork(tr) -> end_fork(tr) -> 
start_fork(tr) -> end_fork(tr) 

我不确定您所说的start_fork和end_fork是什么意思。您是否意味着在tr开始之前,ls运行完成?这并不是上面图表中的意思。您的shell在开始所有管道进程之前不会等待ls完成。它按顺序启动管道中的所有进程,并为每个进程设置stdin和stdout,以便进程链接在一起,即将ls的stdout连接到tr的stdin;tr的stdout连接到下一个tr的stdin。这就是dup2调用的作用。
进程运行的顺序由操作系统(调度程序)决定,但显然,如果tr从空stdin读取,则必须等待(阻塞),直到前面的进程向管道写入内容。很可能ls在tr甚至从其stdin中读取任何内容之前就运行完成了,但同样有可能不会。例如,如果链中的第一个命令是连续运行并途中产生输出的东西,则管道中的第二个命令将被定时调度,以处理第一个命令沿管道发送的任何内容。
希望这能稍微澄清一些问题 :-)

1
不要覆盖已经在int fds [2]中的描述符,而不是管道(fds)?这样我不会因此丢失数据吗?另外,我的想法正确吗?它应该像这样工作吗?(伪代码):start_fork(ls) - > end_fork(ls) - > start_fork(tr) - > end_fork(tr) - > start_fork(tr) - > end_fork(tr) - Patryk
1
它应该这样工作吗?(伪代码):start_fork(ls) -> end_fork(ls) -> start_fork(tr) -> end_fork(tr) -> start_fork(tr) -> end_fork(tr) - Patryk
1
喜欢操作的表格表示! - user4815162342

2

你可能值得使用libpipeline。 它会代替你完成所有的工作,而且你甚至可以将功能包含在你的管道中。


1
问题在于你试图一次性完成所有事情。相反,将其分解为较小的步骤。
1)解析输入以从中获取ls -al |。 1a)从此,您知道需要创建一个管道,将其移动到stdout,并启动ls -al。然后将管道移动到stdin。当然还有更多内容,但您现在不必在代码中担心它。
2)解析下一个段以获取tr a-z A-Z |。只要您要生成的下一个命令的输出被传输到某个地方,就返回步骤1a。

这就是我正在做的事情。我在每次迭代中执行一个函数(并将输入从stdin更改为pdesc[0],输出从stdout更改为pdesc[1])。 - Patryk
2
这绝对不是你正在做的事情。在你的代码中,你已经知道有多少个管道了-- for (i = 0; i<= pipelines_count; i++)。此外,你只创建了一个队列,而且是命名队列--这两者都不合适。使用pipe()来创建一个未命名的队列,并为每个命令创建一个新的队列。最后--在fork()返回非0的情况下,你不应该调用wait()。相反,你应该先让你的整个管道中的所有进程运行起来,只有当你没有更多的进程要启动时才应该调用wait。否则你就打败了管道的目的。 - mah

0
在C中实现流水线。最好的方法是什么?
这个问题有点老了,但是这里提供一个从未被提供过的答案。使用libpipeline。libpipeline是一个管道操作库。使用案例是man页面维护者,他们经常需要使用类似以下命令(并解决相关操作系统错误):
zsoelim < input-file | tbl | nroff -mandoc -Tutf8

这是使用libpipeline的方法:

pipeline *p;
int status;

p = pipeline_new ();
pipeline_want_infile (p, "input-file");
pipeline_command_args (p, "zsoelim", NULL);
pipeline_command_args (p, "tbl", NULL);
pipeline_command_args (p, "nroff", "-mandoc", "-Tutf8", NULL);
status = pipeline_run (p);

libpipeline 的主页有更多的例子。该库也包含在许多发行版中,包括 Arch、Debian、Fedora、Linux from Scratch 和 Ubuntu。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接