多处理和管道用C(Multiprocessing and Pipes in C)

2019-08-02 07:10发布

我想学习如何一起工作fork()创建新的进程和pipes与每个进程进行通信。 比方说,我有一个包含20个字的清单,我创建3个过程。 现在,我需要使用管道的过程之间分配的话,以及每个进程将整理收到的单词列表。 我想实现这一目标的方法是这样的:

Word1 => Process1
Word2 => Process2
Word3 => Process3
Word4 => Process1
Word5 => Process2
Word6 => Process3

因此,每个过程都会有一个单词列表进行排序,最终我将使用归并合并所有每个进程所返回的有序列表。 我不知道如何使用管道与每个进程(如饲料用一个字每一个过程)进行通信。 任何帮助,将让我在正确的轨道上,将不胜感激。

Answer 1:

尝试为大小此代码。 它使用子进程的一个固定的数字,但可以通过调整枚举改变这个数字MAX_KIDS (它主要是在3与组测试,后来我改成了5,以确保公正)。

#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <unistd.h>

typedef struct Child
    FILE *fp_to;
    FILE *fp_from;
    pid_t pid;
} Child;

enum { P_READ, P_WRITE };   /* Read, write descriptor of a pipe */
enum { MAX_LINE = 4096 };

static void be_childish(void);
static void distribute(size_t nkids, Child *kids);
static void err_exit(const char *fmt, ...);
static void merge(size_t nkids, Child *kids);
static void wait_for_kids(size_t nkids, Child *kids);

static int make_kid(Child *kid)
    int pipe1[2];   /* From parent to child */
    int pipe2[2];   /* From child to parent */
    if (pipe(pipe1) != 0)
        return -1;
    if (pipe(pipe2) != 0)
        return -1;
    if ((kid->pid = fork()) < 0)
        return -1;
    else if (kid->pid == 0)
        dup2(pipe1[P_READ], STDIN_FILENO);
        dup2(pipe2[P_WRITE], STDOUT_FILENO);
        /* Reads standard input from parent; writes standard output to parent */
        kid->fp_to   = fdopen(pipe1[P_WRITE], "w");
        kid->fp_from = fdopen(pipe2[P_READ], "r");
        return 0;

int main(void)
    enum { NUM_KIDS = 5 };
    Child kids[NUM_KIDS];
    struct sigaction act;

    act.sa_flags   = 0;
    act.sa_handler = SIG_DFL;
    sigaction(SIGCHLD, &act, 0);

    for (int i = 0; i < NUM_KIDS; i++)
        if (make_kid(&kids[i]) != 0)
            err_exit("Fault starting child %d\n", i);

    distribute(NUM_KIDS, kids);
    merge(NUM_KIDS, kids);

    wait_for_kids(NUM_KIDS, kids);

static void err_exit(const char *fmt, ...)
    va_list args;
    va_start(args, fmt);
    vfprintf(stderr, fmt, args);

static int qs_compare(const void *v1, const void *v2)
    const char *s1 = *(char **)v1;
    const char *s2 = *(char **)v2;
    return(strcmp(s1, s2));

static char *estrdup(const char *str)
    size_t len = strlen(str) + 1;
    char *out = malloc(len);
    if (out == 0)
        err_exit("Out of memory!\n");
    memmove(out, str, len);

static void be_childish(void)
    char **lines = 0;
    size_t num_lines = 0;
    size_t max_lines = 0;
    char input[MAX_LINE];

    while (fgets(input, sizeof(input), stdin) != 0)
        if (num_lines >= max_lines)
            size_t n = (2 * max_lines + 2);
            void *space = realloc(lines, n * sizeof(char *));
            if (space == 0)
                err_exit("Out of memory!\n");
            lines = space;
            max_lines = n;
        lines[num_lines++] = estrdup(input);

    qsort(lines, num_lines, sizeof(char *), qs_compare);

    for (size_t i = 0; i < num_lines; i++)
        if (fputs(lines[i], stdout) == EOF)
            err_exit("Short write to parent from %d\n", (int)getpid());


static void distribute(size_t nkids, Child *kids)
    char   input[MAX_LINE];
    size_t n = 0;

    while (fgets(input, sizeof(input), stdin) != 0)
        if (fputs(input, kids[n].fp_to) == EOF)
            err_exit("Short write to child %d\n", (int)kids[n].pid);
        if (++n >= nkids)
            n = 0;

    /* Close pipes to children - let's them get on with sorting */
    for (size_t i = 0; i < nkids; i++)
        kids[i].fp_to = 0;

static void read_line(Child *kid, char *buffer, size_t maxlen, int *length)
    if (fgets(buffer, maxlen, kid->fp_from) != 0)
        *length = strlen(buffer);
        buffer[*length] = '\0';
        buffer[0] = '\0';
        *length = -1;
        kid->fp_from = 0;

static int not_all_done(size_t nkids, int *lengths)
    for (size_t i = 0; i < nkids; i++)
        if (lengths[i] > 0)
            return 1;
    return 0;

static void min_line(size_t nkids, int *len, char **lines, size_t maxlen,
                     Child *kids, char *output)
    size_t  min_kid = 0;
    char   *min_str = 0;
    for (size_t i = 0; i < nkids; i++)
        if (len[i] <= 0)
        if (min_str == 0 || strcmp(min_str, lines[i]) > 0)
            min_str = lines[i];
            min_kid = i;
    strcpy(output, min_str);
    read_line(&kids[min_kid], lines[min_kid], maxlen, &len[min_kid]);

static void merge(size_t nkids, Child *kids)
    char line_data[nkids][MAX_LINE];
    char *lines[nkids];
    int  len[nkids];
    char output[MAX_LINE];

    for (size_t i = 0; i < nkids; i++)
        lines[i] = line_data[i];

    /* Preload first line from each kid */
    for (size_t i = 0; i < nkids; i++)
        read_line(&kids[i], lines[i], MAX_LINE, &len[i]);

    while (not_all_done(nkids, len))
        min_line(nkids, len, lines, MAX_LINE, kids, output);
        fputs(output, stdout);

static void wait_for_kids(size_t nkids, Child *kids)
    int pid;
    int status;

    while ((pid = waitpid(-1, &status, 0)) != -1)
        for (size_t i = 0; i < nkids; i++)
            if (pid == kids[i].pid)
                kids[i].pid = -1;

    /* This check loop is not really necessary */
    for (size_t i = 0; i < nkids; i++)
        if (kids[i].pid != -1)
            err_exit("Child %d died without being tracked\n", (int)kids[i].pid);

Answer 2:


pid_t pids[3];
int fd[3][2];

int i;
for (i = 0; i < 3; ++i) {
    /* create the pipe */
    if (pipe(fd[i]) < 0) {
            perror("pipe error");

   /* fork the child */
   pid[i] = fork();
   if (pid[i] < 0) {
       perror("fork error");
   } else if (pid[i] > 0) {
       /* in parent process */
       /* close reading end */
   } else {
       /* in child process */
       /* close writing end */
       /* read from parent */
       read(fd[i][0], line, max);

/* in parent process */
char words[100][10] = {...};
int j, child = 0;
/* for all words */
for (j = 0; j < 100; ++j) {
    /* write to child */
    write(fd[child][1], words[j], strlen(words[j]));
    if (child >= 3)
        child = 0;

重复从子传送回母管部。 要小心,不要当家长和孩子都试图在两个方向上同时进行通信死锁。

Answer 3:

有什么魔力管道 - 他们只是两个端点通信介质。 该逻辑是大约:

创建3个管道和守住一个端点。 叉三次,并获得每个分叉那些孩子守住管道的另一端。 这个孩子将会进入读循环,等待输入和写回输出。 父进程可以循环所有输出,然后执行一循环读取的输入。 这是不是最好的策略,但它是迄今为止最简单的。 即

while there is work left to do:
   for i in 1..3
       write current work unit to pipe[i]

   for i in 1..3
       read back response from pipe[i]


while(input = read from pipe)
    result = do work on input
    write result to pipe

下一步将做异步的回读的父进程,非阻塞方式(可能使用select ,或者只是一个忙等轮询循环)。 这就要求孩子报到哪个任务它们返回结果为,因为排序可能会导致混乱(例如,你可以不再依赖你发是你的第一反应的第一项工作单位)。 欢迎到并发错误的乐趣的世界。


