我想学习如何一起工作fork()
创建新的进程和pipes
与每个进程进行通信。 比方说,我有一个包含20个字的清单,我创建3个过程。 现在,我需要使用管道的过程之间分配的话,以及每个进程将整理收到的单词列表。 我想实现这一目标的方法是这样的:
Word1 => Process1
Word2 => Process2
Word3 => Process3
Word4 => Process1
Word5 => Process2
Word6 => Process3
.
.
.
因此,每个过程都会有一个单词列表进行排序,最终我将使用归并合并所有每个进程所返回的有序列表。 我不知道如何使用管道与每个进程(如饲料用一个字每一个过程)进行通信。 任何帮助,将让我在正确的轨道上,将不胜感激。
尝试为大小此代码。 它使用子进程的一个固定的数字,但可以通过调整枚举改变这个数字MAX_KIDS
(它主要是在3与组测试,后来我改成了5,以确保公正)。
#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <unistd.h>
typedef struct Child
{
FILE *fp_to;
FILE *fp_from;
pid_t pid;
} Child;
enum { P_READ, P_WRITE }; /* Read, write descriptor of a pipe */
enum { MAX_LINE = 4096 };
static void be_childish(void);
static void distribute(size_t nkids, Child *kids);
static void err_exit(const char *fmt, ...);
static void merge(size_t nkids, Child *kids);
static void wait_for_kids(size_t nkids, Child *kids);
static int make_kid(Child *kid)
{
int pipe1[2]; /* From parent to child */
int pipe2[2]; /* From child to parent */
if (pipe(pipe1) != 0)
return -1;
if (pipe(pipe2) != 0)
{
close(pipe1[P_READ]);
close(pipe1[P_WRITE]);
return -1;
}
if ((kid->pid = fork()) < 0)
{
close(pipe1[P_READ]);
close(pipe1[P_WRITE]);
close(pipe2[P_READ]);
close(pipe2[P_WRITE]);
return -1;
}
else if (kid->pid == 0)
{
dup2(pipe1[P_READ], STDIN_FILENO);
dup2(pipe2[P_WRITE], STDOUT_FILENO);
close(pipe1[P_READ]);
close(pipe1[P_WRITE]);
close(pipe2[P_READ]);
close(pipe2[P_WRITE]);
/* Reads standard input from parent; writes standard output to parent */
be_childish();
exit(0);
}
else
{
kid->fp_to = fdopen(pipe1[P_WRITE], "w");
kid->fp_from = fdopen(pipe2[P_READ], "r");
close(pipe1[P_READ]);
close(pipe2[P_WRITE]);
return 0;
}
}
int main(void)
{
enum { NUM_KIDS = 5 };
Child kids[NUM_KIDS];
struct sigaction act;
sigfillset(&act.sa_mask);
act.sa_flags = 0;
act.sa_handler = SIG_DFL;
sigaction(SIGCHLD, &act, 0);
for (int i = 0; i < NUM_KIDS; i++)
{
if (make_kid(&kids[i]) != 0)
err_exit("Fault starting child %d\n", i);
}
distribute(NUM_KIDS, kids);
merge(NUM_KIDS, kids);
wait_for_kids(NUM_KIDS, kids);
return(0);
}
static void err_exit(const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
vfprintf(stderr, fmt, args);
va_end(args);
exit(1);
}
static int qs_compare(const void *v1, const void *v2)
{
const char *s1 = *(char **)v1;
const char *s2 = *(char **)v2;
return(strcmp(s1, s2));
}
static char *estrdup(const char *str)
{
size_t len = strlen(str) + 1;
char *out = malloc(len);
if (out == 0)
err_exit("Out of memory!\n");
memmove(out, str, len);
return(out);
}
static void be_childish(void)
{
char **lines = 0;
size_t num_lines = 0;
size_t max_lines = 0;
char input[MAX_LINE];
while (fgets(input, sizeof(input), stdin) != 0)
{
if (num_lines >= max_lines)
{
size_t n = (2 * max_lines + 2);
void *space = realloc(lines, n * sizeof(char *));
if (space == 0)
err_exit("Out of memory!\n");
lines = space;
max_lines = n;
}
lines[num_lines++] = estrdup(input);
}
qsort(lines, num_lines, sizeof(char *), qs_compare);
for (size_t i = 0; i < num_lines; i++)
{
if (fputs(lines[i], stdout) == EOF)
err_exit("Short write to parent from %d\n", (int)getpid());
}
exit(0);
}
static void distribute(size_t nkids, Child *kids)
{
char input[MAX_LINE];
size_t n = 0;
while (fgets(input, sizeof(input), stdin) != 0)
{
if (fputs(input, kids[n].fp_to) == EOF)
err_exit("Short write to child %d\n", (int)kids[n].pid);
if (++n >= nkids)
n = 0;
}
/* Close pipes to children - let's them get on with sorting */
for (size_t i = 0; i < nkids; i++)
{
fclose(kids[i].fp_to);
kids[i].fp_to = 0;
}
}
static void read_line(Child *kid, char *buffer, size_t maxlen, int *length)
{
if (fgets(buffer, maxlen, kid->fp_from) != 0)
{
*length = strlen(buffer);
buffer[*length] = '\0';
}
else
{
buffer[0] = '\0';
*length = -1;
fclose(kid->fp_from);
kid->fp_from = 0;
}
}
static int not_all_done(size_t nkids, int *lengths)
{
for (size_t i = 0; i < nkids; i++)
{
if (lengths[i] > 0)
return 1;
}
return 0;
}
static void min_line(size_t nkids, int *len, char **lines, size_t maxlen,
Child *kids, char *output)
{
size_t min_kid = 0;
char *min_str = 0;
for (size_t i = 0; i < nkids; i++)
{
if (len[i] <= 0)
continue;
if (min_str == 0 || strcmp(min_str, lines[i]) > 0)
{
min_str = lines[i];
min_kid = i;
}
}
strcpy(output, min_str);
read_line(&kids[min_kid], lines[min_kid], maxlen, &len[min_kid]);
}
static void merge(size_t nkids, Child *kids)
{
char line_data[nkids][MAX_LINE];
char *lines[nkids];
int len[nkids];
char output[MAX_LINE];
for (size_t i = 0; i < nkids; i++)
lines[i] = line_data[i];
/* Preload first line from each kid */
for (size_t i = 0; i < nkids; i++)
read_line(&kids[i], lines[i], MAX_LINE, &len[i]);
while (not_all_done(nkids, len))
{
min_line(nkids, len, lines, MAX_LINE, kids, output);
fputs(output, stdout);
}
}
static void wait_for_kids(size_t nkids, Child *kids)
{
int pid;
int status;
while ((pid = waitpid(-1, &status, 0)) != -1)
{
for (size_t i = 0; i < nkids; i++)
{
if (pid == kids[i].pid)
kids[i].pid = -1;
}
}
/* This check loop is not really necessary */
for (size_t i = 0; i < nkids; i++)
{
if (kids[i].pid != -1)
err_exit("Child %d died without being tracked\n", (int)kids[i].pid);
}
}
总体情况通常是:
pid_t pids[3];
int fd[3][2];
int i;
for (i = 0; i < 3; ++i) {
/* create the pipe */
if (pipe(fd[i]) < 0) {
perror("pipe error");
exit(1);
}
/* fork the child */
pid[i] = fork();
if (pid[i] < 0) {
perror("fork error");
} else if (pid[i] > 0) {
/* in parent process */
/* close reading end */
close(fd[i][0]);
} else {
/* in child process */
/* close writing end */
close(fd[i][1]);
/* read from parent */
read(fd[i][0], line, max);
...
}
}
/* in parent process */
char words[100][10] = {...};
int j, child = 0;
/* for all words */
for (j = 0; j < 100; ++j) {
/* write to child */
write(fd[child][1], words[j], strlen(words[j]));
...
++child;
if (child >= 3)
child = 0;
}
重复从子传送回母管部。 要小心,不要当家长和孩子都试图在两个方向上同时进行通信死锁。
有什么魔力管道 - 他们只是两个端点通信介质。 该逻辑是大约:
创建3个管道和守住一个端点。 叉三次,并获得每个分叉那些孩子守住管道的另一端。 这个孩子将会进入读循环,等待输入和写回输出。 父进程可以循环所有输出,然后执行一循环读取的输入。 这是不是最好的策略,但它是迄今为止最简单的。 即
while there is work left to do:
for i in 1..3
write current work unit to pipe[i]
for i in 1..3
read back response from pipe[i]
一个给定的孩子刚看起来是这样的:
while(input = read from pipe)
result = do work on input
write result to pipe
下一步将做异步的回读的父进程,非阻塞方式(可能使用select
,或者只是一个忙等轮询循环)。 这就要求孩子报到哪个任务它们返回结果为,因为排序可能会导致混乱(例如,你可以不再依赖你发是你的第一反应的第一项工作单位)。 欢迎到并发错误的乐趣的世界。
鉴于你的问题有些尚未得以确认的性质,我希望这是某种有用的。