C语言中的多进程和管道

3

我正在学习如何使用fork()创建新进程以及使用pipes与每个进程进行通信。假设我有一个包含20个单词的列表,并且我创建了3个进程。现在,我需要使用管道将单词分发给各个进程,并让每个进程对其接收到的单词列表进行排序。我想要实现的方式如下:

Word1 => Process1
Word2 => Process2
Word3 => Process3
Word4 => Process1
Word5 => Process2
Word6 => Process3
.
.
.

所以每个进程都有一个待排序的单词列表,最终我将使用MergeSort来合并每个进程返回的已排序列表。我不确定如何使用管道与每个进程通信(例如向每个进程提供一个单词)。任何能让我走上正确轨道的帮助都将不胜感激。

我还没有开始编码,也不一定要找任何代码,我只是试图弄清楚系统的整体设计(也许是伪代码?) - Arian Motamedi
请查找http://stackoverflow.com/search?q=fork+pipe。这些通常是单个`fork()`和`pipe()`示例。但是您可以轻松地将它们适应于多个进程。 - Olaf Dietsche
3个回答

8

尝试使用此代码进行测试。它使用固定数量的子进程,但您可以通过调整枚举MAX_KIDS来更改该数字(大多数情况下测试时将其设置为3;稍后我将其更改为5以确保)。

#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <unistd.h>

typedef struct Child
{
    FILE *fp_to;
    FILE *fp_from;
    pid_t pid;
} Child;

enum { P_READ, P_WRITE };   /* Read, write descriptor of a pipe */
enum { MAX_LINE = 4096 };

static void be_childish(void);
static void distribute(size_t nkids, Child *kids);
static void err_exit(const char *fmt, ...);
static void merge(size_t nkids, Child *kids);
static void wait_for_kids(size_t nkids, Child *kids);

static int make_kid(Child *kid)
{
    int pipe1[2];   /* From parent to child */
    int pipe2[2];   /* From child to parent */
    if (pipe(pipe1) != 0)
        return -1;
    if (pipe(pipe2) != 0)
    {
        close(pipe1[P_READ]);
        close(pipe1[P_WRITE]);
        return -1;
    }
    if ((kid->pid = fork()) < 0)
    {
        close(pipe1[P_READ]);
        close(pipe1[P_WRITE]);
        close(pipe2[P_READ]);
        close(pipe2[P_WRITE]);
        return -1;
    }
    else if (kid->pid == 0)
    {
        dup2(pipe1[P_READ], STDIN_FILENO);
        dup2(pipe2[P_WRITE], STDOUT_FILENO);
        close(pipe1[P_READ]);
        close(pipe1[P_WRITE]);
        close(pipe2[P_READ]);
        close(pipe2[P_WRITE]);
        /* Reads standard input from parent; writes standard output to parent */
        be_childish();
        exit(0);
    }
    else
    {
        kid->fp_to   = fdopen(pipe1[P_WRITE], "w");
        kid->fp_from = fdopen(pipe2[P_READ], "r");
        close(pipe1[P_READ]);
        close(pipe2[P_WRITE]);
        return 0;
    }
}

int main(void)
{
    enum { NUM_KIDS = 5 };
    Child kids[NUM_KIDS];
    struct sigaction act;

    sigfillset(&act.sa_mask);
    act.sa_flags   = 0;
    act.sa_handler = SIG_DFL;
    sigaction(SIGCHLD, &act, 0);

    for (int i = 0; i < NUM_KIDS; i++)
    {
        if (make_kid(&kids[i]) != 0)
            err_exit("Fault starting child %d\n", i);
    }

    distribute(NUM_KIDS, kids);
    merge(NUM_KIDS, kids);

    wait_for_kids(NUM_KIDS, kids);
    return(0);
}

static void err_exit(const char *fmt, ...)
{
    va_list args;
    va_start(args, fmt);
    vfprintf(stderr, fmt, args);
    va_end(args);
    exit(1);
}

static int qs_compare(const void *v1, const void *v2)
{
    const char *s1 = *(char **)v1;
    const char *s2 = *(char **)v2;
    return(strcmp(s1, s2));
}

static char *estrdup(const char *str)
{
    size_t len = strlen(str) + 1;
    char *out = malloc(len);
    if (out == 0)
        err_exit("Out of memory!\n");
    memmove(out, str, len);
    return(out);
}

static void be_childish(void)
{
    char **lines = 0;
    size_t num_lines = 0;
    size_t max_lines = 0;
    char input[MAX_LINE];

    while (fgets(input, sizeof(input), stdin) != 0)
    {
        if (num_lines >= max_lines)
        {
            size_t n = (2 * max_lines + 2);
            void *space = realloc(lines, n * sizeof(char *));
            if (space == 0)
                err_exit("Out of memory!\n");
            lines = space;
            max_lines = n;
        }
        lines[num_lines++] = estrdup(input);
    }

    qsort(lines, num_lines, sizeof(char *), qs_compare);

    for (size_t i = 0; i < num_lines; i++)
    {
        if (fputs(lines[i], stdout) == EOF)
            err_exit("Short write to parent from %d\n", (int)getpid());
    }

    exit(0);
}

static void distribute(size_t nkids, Child *kids)
{
    char   input[MAX_LINE];
    size_t n = 0;

    while (fgets(input, sizeof(input), stdin) != 0)
    {
        if (fputs(input, kids[n].fp_to) == EOF)
            err_exit("Short write to child %d\n", (int)kids[n].pid);
        if (++n >= nkids)
            n = 0;
    }

    /* Close pipes to children - let's them get on with sorting */
    for (size_t i = 0; i < nkids; i++)
    {
        fclose(kids[i].fp_to);
        kids[i].fp_to = 0;
    }
}

static void read_line(Child *kid, char *buffer, size_t maxlen, int *length)
{
    if (fgets(buffer, maxlen, kid->fp_from) != 0)
    {
        *length = strlen(buffer);
        buffer[*length] = '\0';
    }
    else
    {
        buffer[0] = '\0';
        *length = -1;
        fclose(kid->fp_from);
        kid->fp_from = 0;
    }
}

static int not_all_done(size_t nkids, int *lengths)
{
    for (size_t i = 0; i < nkids; i++)
    {
        if (lengths[i] > 0)
            return 1;
    }
    return 0;
}

static void min_line(size_t nkids, int *len, char **lines, size_t maxlen,
                     Child *kids, char *output)
{
    size_t  min_kid = 0;
    char   *min_str = 0;
    for (size_t i = 0; i < nkids; i++)
    {
        if (len[i] <= 0)
            continue;
        if (min_str == 0 || strcmp(min_str, lines[i]) > 0)
        {
            min_str = lines[i];
            min_kid = i;
        }
    }
    strcpy(output, min_str);
    read_line(&kids[min_kid], lines[min_kid], maxlen, &len[min_kid]);
}

static void merge(size_t nkids, Child *kids)
{
    char line_data[nkids][MAX_LINE];
    char *lines[nkids];
    int  len[nkids];
    char output[MAX_LINE];

    for (size_t i = 0; i < nkids; i++)
        lines[i] = line_data[i];

    /* Preload first line from each kid */
    for (size_t i = 0; i < nkids; i++)
        read_line(&kids[i], lines[i], MAX_LINE, &len[i]);

    while (not_all_done(nkids, len))
    {
        min_line(nkids, len, lines, MAX_LINE, kids, output);
        fputs(output, stdout);
    }
}

static void wait_for_kids(size_t nkids, Child *kids)
{
    int pid;
    int status;

    while ((pid = waitpid(-1, &status, 0)) != -1)
    {
        for (size_t i = 0; i < nkids; i++)
        {
            if (pid == kids[i].pid)
                kids[i].pid = -1;
        }
    }

    /* This check loop is not really necessary */
    for (size_t i = 0; i < nkids; i++)
    {
        if (kids[i].pid != -1)
            err_exit("Child %d died without being tracked\n", (int)kids[i].pid);
    }
}

抱歉,该程序从标准输入读取。在distribute()中,它读取行(而不是单词),并将其发送给子进程。没有什么可以阻止您拆分行并将单词(每个后跟换行符)发送到子进程。最终输出也会发送到标准输出(或父进程); 这在merge()中实现。我选择使用fdopen()允许我在管道上使用标准I/O;早期版本主要或完全使用文件描述符与子进程通信。 - Jonathan Leffler
我正在查看代码。我该如何打开一个包含distribure()函数中所有单词的txt文件?现在我无法弄清楚我的stdin设置为什么... - Arian Motamedi
我使用的程序名称是mp; 我使用了mp < random.data。你可以很好地安排将参数传递给主程序int main(int argc, char **argv) { ... },然后将文件名(或名称)传递给distribute(int nfiles, char **fnames, size_t nkids, Child *kids) { for (int i = 0; i < nfiles; i++) { FILE *fp = fopen(fnames[i]); if (fp != 0) { if (fgets(input, sizeof(input), fp) != 0) { ...现在就像这样...但缩进... } fclose(fp); } } ...关闭循环... }。在最简单的情况下,您可以调用distibute(argc-1, argv+1, NUM_KIDS, kids) - Jonathan Leffler
好的,那很有道理。现在,在单词被排序和合并后,我该如何在主方法中打印它们呢?这是否可能? - Arian Motamedi
“merge()”方法当前是如何打印的?我看不到任何“printf()”语句...?! - Arian Motamedi
显示剩余3条评论

5

总体来说,通常情况如下:

pid_t pids[3];
int fd[3][2];

int i;
for (i = 0; i < 3; ++i) {
    /* create the pipe */
    if (pipe(fd[i]) < 0) {
            perror("pipe error");
            exit(1);
    }

   /* fork the child */
   pid[i] = fork();
   if (pid[i] < 0) {
       perror("fork error");
   } else if (pid[i] > 0) {
       /* in parent process */
       /* close reading end */
       close(fd[i][0]);
   } else {
       /* in child process */
       /* close writing end */
       close(fd[i][1]);
       /* read from parent */
       read(fd[i][0], line, max);
       ...
    }
}

/* in parent process */
char words[100][10] = {...};
int j, child = 0;
/* for all words */
for (j = 0; j < 100; ++j) {
    /* write to child */
    write(fd[child][1], words[j], strlen(words[j]));
    ...
    ++child;
    if (child >= 3)
        child = 0;
}

复制管道部分,以便从子进程向父进程进行双向通信。在父进程和子进程同时尝试进行双向通信时,要注意不产生死锁。

好的,这就是我困惑的地方。我知道如何处理单个fork()调用,但在我的情况下,我需要有3个fork()调用。问题是我不确定如何与每个进程交互。例如,我创建一个循环来三次fork(),并将Word1给Process1,Word2给Process2,Word3给Process3。现在,循环结束了,我有了3个进程,但我还有17个单词要分配给这三个进程:我怎样才能回到Process1并给它Word4呢? - Arian Motamedi
我会将pid存储在数组中并对其进行迭代。我已经修改了我的示例,并希望在匆忙中正确理解。当您完成发送单词后,请“close()”管道,以便子进程接收EOF并知道他们应该开始排序等操作。 - Olaf Dietsche
编辑:好的,我相信fd[3][2]表示一个包含3个fds的数组,每个fd都包含2个端点(读/写),对吗?另外一个问题。在我完成单词分配后,我需要父进程等待每个子进程完成排序过程。我相信waitpid()函数是我应该使用的,但我不确定如何正确使用它。有什么想法吗? - Arian Motamedi
是的,您需要保存每个子进程的管道文件描述符和进程ID。wait()waitpid()表示等待子进程或任何子进程(pid=-1)退出。如果它们不需要回报,那就没关系了。如果他们需要通过管道发送排序后的单词,则必须复制管道并从中读取,直到EOF。 - Olaf Dietsche
请查看我的答案中的wait_for_kids()函数。 - Jonathan Leffler

4

管道并没有什么神奇的地方-它们只是具有两个端点的通信介质。其逻辑大致如下:

创建3个管道,并保留其中一个端点。分别进行三次fork,让每个派生出来的子进程持有该管道的另一个端点。然后子进程进入读取循环,等待输入并写回输出。父进程可以轮流处理所有输出,然后轮流读取输入。这不是最好的策略,但绝对是最简单的方法。例如:

while there is work left to do:
   for i in 1..3
       write current work unit to pipe[i]

   for i in 1..3
       read back response from pipe[i]

一个给定的子元素看起来像这样:
while(input = read from pipe)
    result = do work on input
    write result to pipe

下一步将以异步、非阻塞的方式在父进程中执行读回操作(可能使用select,或者只是一个忙等待轮询循环)。这需要子进程报告他们返回结果的任务,因为顺序可能会变得混乱(即你不能再依靠发送的第一个工作单位是你收到的第一个响应)。欢迎来到并发错误的有趣世界。
考虑到你提问的内容有些模糊不清,希望这个翻译对你有所帮助。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接