我实现了从Cormen著名的文本并行合并排序算法。 我使用并行线程写在C,并使用MinGW在Win7的X64(也与GCC后来测试在Ubuntu有相同的结果)编制。 我在并行第一种方法是幼稚的......我产生了一个新的线程在每次递归级别(这实际上是什么Cormen的伪代码暗示)。 然而,这通常最终要么采取太长或崩溃由于分段错误(我可以假设有一些硬性限制系统可以多少线程处理)。 这似乎是递归的并行共同新手的错误,其实我发现了一个类似的讨论在这个网站。 所以我代替建议在该线程,即设置问题大小阈值,如果该产卵新线程函数给出小于阈值的集合(说10,000个元素),那么它只是操作上的元素直接,而不是创造这样一个小套一个新的线程。
现在,一切似乎做工精细。 我列出了一些我下面的结果。 N是问题的大小(一个整数集的[1,2,3,...,N]彻底加扰)和阈值是低于该我的并行排序和并行合并函数将拒绝生成新线程的值。 第一个表显示在毫秒时间排序,第二显示工作线程都多少排序/合并在每种情况下产生了。 纵观N = 1E6和N = 1E7排在底部的表,你可以看到,只要我降低门槛使得多于〜8000合并工人被允许,我得到段错误。 同样,我认为是由于某些限制,系统给出的线程,我很乐意听到更多有关这一点,但它不是我的主要问题。
主要的问题,就是为什么要使用一个相当高的门槛,这将催生一个预期15/33工作线程(以下从以前的行模式)时,最后一行得到段错误。 当然这不是太多的线程我的系统来处理。 它没有完整的一个实例(表右下单元格)使用了大约1.2GB RAM(我的系统有6GB),以及线程版本似乎从来没有比在每个行右侧0线程的那些采取更多的内存。
- 我不认为我打任何形式的堆限制...可用吨的RAM,它应该只需要1GB〜即使被允许产卵15/33线程。
- 我也并不认为这是一个堆栈问题。 我设计的方案,以使用最少的堆栈,我不认为脚印每个线程将在所有相关问题的大小N,只有堆。 我与这个漂亮的经验不足......但我确实在gdb核心转储栈回溯,从车顶到栈的底部的地址近得似乎排除溢出那里。
- 我想读在pthread_create的返回值...在Windows我在飞机坠毁前几次得到的值11(但它似乎没有触发崩溃,因为有几个11的,然后几个0的,即没有错误,则另一个11)。 该错误代码是EAGAIN,资源不可用......但我不知道它真正的意思在这里。 此外,在Ubuntu的错误代码为0,甚至一直到崩溃每次。
- 我尝试Valgrind的,并得到了很多关于内存泄漏的消息,但我不知道那些是合法的,因为我知道Valgrind的需要额外的资源,我能得到那些没有Valgrind的工作正常类型的其他问题集大小错误。
但是很明显这是涉及到问题的规模和系统资源......我希望有一些片一般知识的我失踪,使答案是很清楚。
有任何想法吗? 对不起,文字的长壁...谢谢如果你已经远远阅读! 如果它似乎相关,我可以张贴的来源。
编辑:来源添加以供参考:
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>
const int N = 100000000;
const int SORT_THRESHOLD = 10000000;
const int MERGE_THRESHOLD = 10000000;
int sort_thread_count = 0;
int merge_thread_count = 0;
typedef struct s_pmergesort_args {
int *vals_in, p, r, *vals_out, s;
} pmergesort_args;
typedef struct s_pmerge_args {
int *temp, p1, r1, p2, r2, *vals_out, p3;
} pmerge_args;
void *p_merge_sort(void *v_pmsa);
void *p_merge(void *v_pma);
int binary_search(int val, int *temp, int p, int r);
int main() {
int *values, i, rand1, rand2, temp, *sorted;
long long rand1a, rand1b, rand2a, rand2b;
struct timeval start, end;
/* allocate values on heap and initialize */
values = malloc(N * sizeof(int));
sorted = malloc(N * sizeof(int));
for (i = 0; i < N; i++) {
values[i] = i + 1;
sorted[i] = 0;
}
/* scramble
* - complicated logic to maximize swapping
* - lots of testing (not shown) was done to verify optimal swapping */
srand(time(NULL));
for (i = 0; i < N/10; i++) {
rand1a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
rand1b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
rand1 = (int)((rand1a * rand1b + rand()) % N);
rand2a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
rand2b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
rand2 = (int)((rand2a * rand2b + rand()) % N);
temp = values[rand1];
values[rand1] = values[rand2];
values[rand2] = temp;
}
/* set up args for p_merge_sort */
pmergesort_args pmsa;
pmsa.vals_in = values;
pmsa.p = 0;
pmsa.r = N-1;
pmsa.vals_out = sorted;
pmsa.s = 0;
/* sort */
gettimeofday(&start, NULL);
p_merge_sort(&pmsa);
gettimeofday(&end, NULL);
/* verify sorting */
for (i = 1; i < N; i++) {
if (sorted[i] < sorted[i-1]) {
fprintf(stderr, "Error: array is not sorted.\n");
exit(0);
}
}
printf("Success: array is sorted.\n");
printf("Sorting took %dms.\n", (int)(((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec))/1000));
free(values);
free(sorted);
printf("( sort threads created: %d )\n", sort_thread_count);
printf("( merge threads created: %d )\n", merge_thread_count);
return 0;
}
void *p_merge_sort(void *v_pmsa) {
pmergesort_args pmsa = *((pmergesort_args *) v_pmsa);
int *vals_in = pmsa.vals_in;
int p = pmsa.p;
int r = pmsa.r;
int *vals_out = pmsa.vals_out;
int s = pmsa.s;
int n = r - p + 1;
pthread_t worker;
if (n > SORT_THRESHOLD) {
sort_thread_count++;
}
if (n == 1) {
vals_out[s] = vals_in[p];
} else {
int *temp = malloc(n * sizeof(int));
int q = (p + r) / 2;
int q_ = q - p + 1;
pmergesort_args pmsa_l;
pmsa_l.vals_in = vals_in;
pmsa_l.p = p;
pmsa_l.r = q;
pmsa_l.vals_out = temp;
pmsa_l.s = 0;
pmergesort_args pmsa_r;
pmsa_r.vals_in = vals_in;
pmsa_r.p = q+1;
pmsa_r.r = r;
pmsa_r.vals_out = temp;
pmsa_r.s = q_;
if (n > SORT_THRESHOLD) {
pthread_create(&worker, NULL, p_merge_sort, &pmsa_l);
} else {
p_merge_sort(&pmsa_l);
}
p_merge_sort(&pmsa_r);
if (n > SORT_THRESHOLD) {
pthread_join(worker, NULL);
}
pmerge_args pma;
pma.temp = temp;
pma.p1 = 0;
pma.r1 = q_ - 1;
pma.p2 = q_;
pma.r2 = n - 1;
pma.vals_out = vals_out;
pma.p3 = s;
p_merge(&pma);
free(temp);
}
}
void *p_merge(void *v_pma) {
pmerge_args pma = *((pmerge_args *) v_pma);
int *temp = pma.temp;
int p1 = pma.p1;
int r1 = pma.r1;
int p2 = pma.p2;
int r2 = pma.r2;
int *vals_out = pma.vals_out;
int p3 = pma.p3;
int n1 = r1 - p1 + 1;
int n2 = r2 - p2 + 1;
int q1, q2, q3, t;
pthread_t worker;
if (n1 < n2) {
t = p1; p1 = p2; p2 = t;
t = r1; r1 = r2; r2 = t;
t = n1; n1 = n2; n2 = t;
}
if (n1 > MERGE_THRESHOLD) {
merge_thread_count++;
}
if (n1 == 0) {
return;
} else {
q1 = (p1 + r1) / 2;
q2 = binary_search(temp[q1], temp, p2, r2);
q3 = p3 + (q1 - p1) + (q2 - p2);
vals_out[q3] = temp[q1];
pmerge_args pma_l;
pma_l.temp = temp;
pma_l.p1 = p1;
pma_l.r1 = q1-1;
pma_l.p2 = p2;
pma_l.r2 = q2-1;
pma_l.vals_out = vals_out;
pma_l.p3 = p3;
if (n1 > MERGE_THRESHOLD) {
pthread_create(&worker, NULL, p_merge, &pma_l);
} else {
p_merge(&pma_l);
}
pmerge_args pma_r;
pma_r.temp = temp;
pma_r.p1 = q1+1;
pma_r.r1 = r1;
pma_r.p2 = q2;
pma_r.r2 = r2;
pma_r.vals_out = vals_out;
pma_r.p3 = q3+1;
p_merge(&pma_r);
if (n1 > MERGE_THRESHOLD) {
pthread_join(worker, NULL);
}
}
}
int binary_search(int val, int *temp, int p, int r) {
int low = p;
int mid;
int high = (p > r+1)? p : r+1;
while (low < high) {
mid = (low + high) / 2;
if (val <= temp[mid]) {
high = mid;
} else {
low = mid + 1;
}
}
return high;
}
编辑2:添加下面显示“最大”和每个版本(最大意义最高同时分配/使用情况和总的意思所有分配请求通过程序的生命之和)中使用“总” RAM的新形象。 这表明,与N = 1E8和阈值= 1E7我应该得到3.2GB的最大使用量(这在我的系统应该能够支持)。 但同样...我想这是关系到pthread库其他一些限制......不是我的实际系统资源。