在C语言中实现多个管道

16

我正在尝试在C中实现多个管道。我在这个网站上找到了一个教程,我的函数是基于这个示例的。以下是我的函数:

void executePipes(cmdLine* command, char* userInput) {
    int numPipes = 2 * countPipes(userInput);
    int status;
    int i = 0, j = 0;
    int pipefds[numPipes];

    for(i = 0; i < (numPipes); i += 2)
        pipe(pipefds + i);

    while(command != NULL) {
        if(fork() == 0){

            if(j != 0){
                dup2(pipefds[j - 2], 0);
            }

            if(command->next != NULL){
                dup2(pipefds[j + 1], 1);
            }    

            for(i = 0; i < (numPipes); i++){
                close(pipefds[i]);
            }
            if( execvp(*command->arguments, command->arguments) < 0 ){
                perror(*command->arguments);
                exit(EXIT_FAILURE);
            }
        }

        else{
                if(command != NULL)
                    command = command->next;

                j += 2;
                for(i = 0; i < (numPipes ); i++){
                   close(pipefds[i]);
                }
               while(waitpid(0,0,0) < 0);
        }
    }

}
执行后,当我输入像ls | grep bin这样的命令时,终端挂起并没有输出任何结果。我确定已关闭所有管道,但它仍然挂起。我认为问题出在waitpid上,于是我删除了waitpid,但执行后仍无结果。我哪里做错了吗?谢谢。
void runPipedCommands(cmdLine* command, char* userInput) {
    int numPipes = countPipes(userInput);

    int status;
    int i = 0, j = 0;

    pid_t pid;

    int pipefds[2*numPipes];

    for(i = 0; i < 2*(numPipes); i++){
        if(pipe(pipefds + i*2) < 0) {
            perror("pipe");
            exit(EXIT_FAILURE);
        }
    }

    while(command) {
        pid = fork();
        if(pid == 0) {

            //if not first command
            if(j != 0){
                if(dup2(pipefds[(j-1) * 2], 0) < 0){
                    perror(" dup2");///j-2 0 j+1 1
                    exit(EXIT_FAILURE);
                    //printf("j != 0  dup(pipefd[%d], 0])\n", j-2);
                }
            //if not last command
            if(command->next){
                if(dup2(pipefds[j * 2 + 1], 1) < 0){
                    perror("dup2");
                    exit(EXIT_FAILURE);
                }
            }

            for(i = 0; i < 2*numPipes; i++){
                    close(pipefds[i]);
            }

            if( execvp(*command->arguments, command->arguments) < 0 ){
                    perror(*command->arguments);
                    exit(EXIT_FAILURE);
            }
        } else if(pid < 0){
            perror("error");
            exit(EXIT_FAILURE);
        }

        command = command->next;
        j++;
    }
        for(i = 0; i < 2 * numPipes; i++){
            close(pipefds[i]);
            puts("closed pipe in parent");
        }

        while(waitpid(0,0,0) <= 0);

    }

}

1
帖子的风格提示:删除注释掉的代码,并删除任何多余的空格。 - tdenniston
你能否发布你的实现完整代码,以便说明为什么需要使用结构体命令? - ss321c
6个回答

21
我认为这里的问题在于你等待并关闭相同循环中创建子进程的过程。在第一次迭代中,子进程将被执行(这将销毁子进程程序,并用您的第一个命令覆盖它),然后父进程关闭其所有文件描述符并等待子进程完成,然后才迭代到创建下一个子进程。此时,由于父进程已经关闭了所有管道,因此任何后续子进程都无法写入或读取任何内容。由于您没有检查dup2调用的成功,因此这个问题未被注意到。
如果您想保持相同的循环结构,则需要确保父进程只关闭已使用过的文件描述符,而不是关闭未使用过的文件描述符。然后,在创建所有子进程之后,您的父进程可以等待。
编辑:我在答案中混淆了父/子进程,但推理仍然成立:继续fork的进程将关闭所有管道的副本,因此第一个fork之后的任何进程都将没有有效的文件描述符可读/写。
伪代码,使用预先创建的管道数组:
/* parent creates all needed pipes at the start */
for( i = 0; i < num-pipes; i++ ){
    if( pipe(pipefds + i*2) < 0 ){
        perror and exit
    }
}

commandc = 0
while( command ){
    pid = fork()
    if( pid == 0 ){
        /* child gets input from the previous command,
            if it's not the first command */
        if( not first command ){
            if( dup2(pipefds[(commandc-1)*2], 0) < ){
                perror and exit
            }
        }
        /* child outputs to next command, if it's not
            the last command */
        if( not last command ){
            if( dup2(pipefds[commandc*2+1], 1) < 0 ){
                perror and exit
            }
        }
        close all pipe-fds
        execvp
        perror and exit
    } else if( pid < 0 ){
        perror and exit
    }
    cmd = cmd->next
    commandc++
}

/* parent closes all of its copies at the end */
for( i = 0; i < 2 * num-pipes; i++ ){
    close( pipefds[i] );
}
在这段代码中,原始的父进程为每个命令创建一个子进程,因此它在整个过程中幸存下来。子进程检查是否应该从上一个命令获取输入并将其输出发送到下一个命令。然后关闭所有管道文件描述符的副本,接着进行exec操作。父进程仅仅是一直在fork直到为每个命令都创建了一个子进程。然后关闭所有描述符的副本并可以继续等待。
首先创建所需的所有管道,然后在循环中管理它们是棘手的,需要进行一些数组计算。目标看起来像这样:
cmd0    cmd1   cmd2   cmd3   cmd4
   pipe0   pipe1  pipe2  pipe3
   [0,1]   [2,3]  [4,5]  [6,7]

意识到,在任何给定的时间,你只需要两组管道(前一个命令的管道和后一个命令的管道)将简化你的代码并使其更加健壮。Ephemient 在这里给出了伪代码。他的代码更简洁,因为父进程和子进程不必执行不必要的循环以关闭不需要的文件描述符,并且父进程在 fork 后可以立即关闭它的文件描述符副本。

另一方面,你应该始终检查 pipe、dup2、fork 和 exec 的返回值。

编辑 2: 伪代码中有错别字。OP:num-pipes 应该是管道的数量。例如,“ls | grep foo | sort -r”有 2 个管道。


谢谢你的帮助。我可能听起来很绝望,但你能给我一个伪代码来模拟你认为的问题吗?我已经尽力了,但仍然有一个内存泄漏的问题。我只是看不到问题出在哪里。谢谢。 - mkab
我尝试实现您的代码。请检查我的修改后的问题。我添加了代码。 - mkab
我实现了它以确认它能正常工作。对于索引的拼写错误,我很抱歉。再次强调,在循环时最好始终保持两组管道并轮换使用。 - Christopher Neylan
没问题 :)。只是为了确认一下,在我的情况下,如果我想声明管道文件描述符,应该是 int pipefds[2*numPipes] 对吧? - mkab
@ChristopherNeylan,我不理解为什么所有的管道都在循环内关闭,而不是只复制一次并且为什么父进程还要额外关闭所有的管道。你能解释一下吗? - andrzej1_1
显示剩余10条评论

9
这里是正确运行的代码。
void runPipedCommands(cmdLine* command, char* userInput) {
    int numPipes = countPipes(userInput);


    int status;
    int i = 0;
    pid_t pid;

    int pipefds[2*numPipes];

    for(i = 0; i < (numPipes); i++){
        if(pipe(pipefds + i*2) < 0) {
            perror("couldn't pipe");
            exit(EXIT_FAILURE);
        }
    }


    int j = 0;
    while(command) {
        pid = fork();
        if(pid == 0) {

            //if not last command
            if(command->next){
                if(dup2(pipefds[j + 1], 1) < 0){
                    perror("dup2");
                    exit(EXIT_FAILURE);
                }
            }

            //if not first command&& j!= 2*numPipes
            if(j != 0 ){
                if(dup2(pipefds[j-2], 0) < 0){
                    perror(" dup2");///j-2 0 j+1 1
                    exit(EXIT_FAILURE);

                }
            }


            for(i = 0; i < 2*numPipes; i++){
                    close(pipefds[i]);
            }

            if( execvp(*command->arguments, command->arguments) < 0 ){
                    perror(*command->arguments);
                    exit(EXIT_FAILURE);
            }
        } else if(pid < 0){
            perror("error");
            exit(EXIT_FAILURE);
        }

        command = command->next;
        j+=2;
    }
    /**Parent closes the pipes and wait for children*/

    for(i = 0; i < 2 * numPipes; i++){
        close(pipefds[i]);
    }

    for(i = 0; i < numPipes + 1; i++)
        wait(&status);
}

这个条件中有什么缺失吗?if(j != 0 ){ - Knu
1
@Knu:可能有点晚了,但是不,那个条件中没有遗漏的东西。 - mkab
@mkab 你确定吗?在该语句上面的注释中有 j!= 2*numPipes - wizzwizz4

3
(简化后的)相关代码如下:

    if(fork() == 0){
            // do child stuff here
            ....
    }
    else{
            // do parent stuff here
            if(command != NULL)
                command = command->next;

            j += 2;
            for(i = 0; i < (numPipes ); i++){
               close(pipefds[i]);
            }
           while(waitpid(0,0,0) < 0);
    }

这意味着父(控制)进程会执行以下操作:
  • fork (复制)
  • 关闭所有管道
  • 等待子进程
  • 下一个循环 / 子进程
但它应该像这样:
  • fork (复制)
  • fork (复制)
  • fork (复制)
  • 关闭所有管道 (现在应该已经复制了所有内容)
  • 等待所有子进程

如果我理解正确的话,我应该创建另一个分支,然后如果这个分支为0,我检查while(command != NULL)。然后我将上面编写的整个代码保留在while命令中。我是对的吗? - mkab
我的答案中的代码并不是解决问题的建议。它只是对你的代码进行了总结,并稍微强调了实际发生的情况。 - A.H.
是的,我知道。这是和我的代码一样的。我只是想理解你所建议的内容。请检查我的修改后的问题。我添加了一些代码。 - mkab

1

您只需要像下面这样交替使用两个管道符:

typedef int io[2];

extern int I; //piped command current index
extern int pipe_count; //count of '|'

#define CURRENT 0
#define PREVIOUS 1
#define READ 0
#define WRITE 1
#define is_last_command (I == pipe_count)

bool connect(io pipes[2])
{
    if (pipe_count)
    {
        if (is_last_command || I != 0)
            dup2(pipes[PREVIOUS][READ], STDIN_FILENO);
        if (I == 0 || !is_last_command)
            dup2(pipes[CURRENT][WRITE], STDOUT_FILENO);
    }
    return (true);
}

void close_(io pipes[2])
{
    if (pipe_count)
    {
        if (is_last_command || I != 0)
            close(pipes[PREVIOUS][READ]);
        if (I == 0 || !is_last_command)
            close(pipes[CURRENT][WRITE]);
    }
}

void alternate(int **pipes)
{
    int *pipe_current;

    pipe_current = pipes[CURRENT];
    pipes[CURRENT] = pipes[PREVIOUS];
    pipes[PREVIOUS] = pipe_current;
}

使用示例:

#define ERROR -1
#define CHILD 0

void execute(char **command)
{
    static io pipes[2];

    if (pipe_count && pipe(pipes[CURRENT]) == ERROR)
        exit_error("pipe");
    if (fork()==CHILD && connect(pipes))
    {
        execvp(command[0], command);
        _exit(EXIT_FAILURE);
    }
    while (wait(NULL) >= 0);
    close_(pipes);
    alternate((int **)pipes);
}

static void run(char ***commands)
{
    for (I = 0; commands[I]; I++)
        if (*commands[I])
            execute(commands[I]);
}

我将为需要的人留下一个完整可用的代码链接

0

在Christopher Neylan提到的一次最多使用两个管道的想法基础上,我编写了n个管道的伪代码。args是大小为“args_size”的字符指针数组,它是一个全局变量。

// MULTIPLE PIPES
// Test case:   char *args[] = {"ls", "-l", "|", "head", "|", "tail", "-4", 
0};// "|", "grep", "Txt", 0};   
enum fileEnd{READ, WRITE};

void multiple pipes( char** args){
pid_t cpid;
// declare pipes
int pipeA[2]
int pipeB[2]
// I have done getNumberofpipes
int numPipes = getNumberOfPipes;
int command_num = numPipes+1;
// holds sub array of args 
// which is a statement to execute
// for example: cmd = {"ls", "-l", NULL}
char** cmd 
// iterate over args
for(i = 0; i < args_size; i++){
  // 
  // strip subarray from main array
  //  cmd 1 | cmd 2 | cmd3 => cmd
  // cmd = {"ls", "-l", NULL}
  //Open/reopen one pipe

  //if i is even open pipeB
    if(i % 2)  pipe(pipeB);
  //if i is odd open pipeA
    else       pipe(pipeA);


  switch(cpid = fork(){
      case -1: error forking
      case 0: // child process
            childprocess(i);
      default: // parent process
           parentprocess(i, cpid);
  }
}
}
// parent pipes must be closed in parent
void parentprocess(int i, pid_t cpid){

   // if first command
   if(i == 0)  
        close(pipeB[WRITE]);

   // if last command close WRITE
   else if (i == numPipes){
       // if i is even close pipeB[WRITE]
       // if i is odd close pipeA[WRITE]
   }

   // otherwise if in middle close READ and WRITE 
   // for appropriate pipes
      // if i is even
      close(pipeA[READ])
      close(pipeB[WRITE])
      // if i is odd
      close(pipeB[READ])
      close(pipeA[WRITE])
   }

   int returnvalue, status;
   waitpid(cpid, returnvalue, status);
}
void childprocess(int i){

    // if in first command
    if(i == 0)
        dup2(pipeB[WRITE], STDOUT_FILENO);
    //if in last command change stdin for
    // the necessary pipe. Don't touch stdout - 
    // stdout goes to shell
    else if( numPipes == i){
        // if i is even
        dup2(pipeB[READ], STDIN_FILENO)
        //if i is odd
        dup2(pipeA[READ], STDIN_FILENO);        
    }
    // otherwise, we are in middle command where
    // both pipes are used.
    else{
       // if i is even
       dup2(pipeA[READ], STDIN_FILENO)
       dupe(pipeB[WRITE], STDOUT_FILENO)
       // if i is odd
       dup2(pipeB[READ], STDIN_FILENO)
       dup2(pipeA[WRITE], STDOUT_FILENO)
    }

    // execute command for this iteration
    // check for errors!!
    // The exec() functions only return if an error has occurred. The return value is -1, and errno is set to indicate the error.
    if(exec(cmd, cmd) < 0)
        printf("Oh dear, something went wrong with read()! %s\n", strerror(errno));
    }   
}

-1

你想做的基本上是一个递归函数,其中子进程执行第一个命令,如果没有其他命令则父进程执行第二个命令或者再次调用该函数。


1
在这里发布您的电子邮件地址以供OP请求您的代码并不是回答问题的正确方式。但是请随意扩展您的答案。 - Kev
这更像是一条评论/高层建议,而不是一个完整的答案。 - ggorlen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接