我实现了C ++中的多线程工作合并排序,但我已经碰了壁。
在我的实现,我递归拆分输入向量分成两个部分,然后线程这两个部分:
void MergeSort(vector<int> *in)
{
if(in->size() < 2)
return;
vector<int>::iterator ite = in->begin();
vector<int> left = vector<int> (ite, ite + in->size()/2);
vector<int> right = vector<int> (ite + in->size()/2, in->end() );
//current thread spawns 2 threads HERE
thread t1 = thread(MergeSort, &left);
thread t2 = thread(MergeSort, &right);
t1.join();
t2.join();
vector<int> ret;
ret.reserve(in->size() );
ret = MergeSortMerge(left, right);
in->clear();
in->insert(in->begin(), ret.begin(), ret.end() );
return;
}
该代码看起来是很漂亮,但它是我写过的最恶毒代码之一。 试图理清超过1000种int类型的数组引起这么多线程产卵,我跑出来的堆栈空间,和我的电脑BSODS :(始终如一。
我清楚地知道为什么这个代码滋生如此多线程的原因,这是不那么好,但在技术上(如果不是理论上),这是不是一个正确的实施?
基于一点谷歌搜索,我似乎找到了一个线程池的需要。 将使用线程池的解决我遇到的根本问题,我试图产卵线程太多的事实呢? 如果是这样,你有对图书馆的任何建议?
谢谢你的建议和帮助!
作为zdan解释,你要限制的线程数。 有考虑来确定什么是极限了两两件事,
CPU内核的数量。 在C ++ 11,可以使用std::thread::hardware_concurrency()
来确定硬件的核心。 然而,这个函数可以返回0意味着节目不知道多少个核心,在这种情况下,你可能会认为这个值是2或4。
通过数据的数目的限制进行处理。 您可以将数据由线程,直到每个线程1点的数据进行处理,但它会花费太多的只有1个数据,它不是成本效益。 例如,你也许可以说,当数据的数量小于50,你不想再划分。 因此可以判断基于类似要求的最大线程数total_data_number / 50 + 1
。
然后,你选择的情况下1:2的情况下之间的最小数量来确定极限。
在你的情况,因为你是递归生成的线程,你可以尝试确定以类似的方式递归深度。
我不认为一个线程池是要帮助你。 由于你的算法是递归的,你会去的地方池中的所有线程都消耗,池将不希望创建任何更多的线程和你的算法将阻止一个点。
你很可能只是限制你的线程创建递归深度为2或3(除非你有CPU的的。它不会让任何性能上的差异)。
您可以将您的堆栈空间的限制,但它是徒劳的。 线程太多,即使有游泳池,将在LOG 2(N)*每个线程的成本吃起来。 去迭代方法,并减少你的开销。 开销是杀手。 至于性能也越高,你会发现,使用过某种程度的N个线程的承诺,这里是是硬件并发可能会产生最好的结果。 将有开销,每个核心工作之间的良好平衡。 如果N获得的非常大,像在GPU上,那么其他的选项存在(双调),使不同的权衡,以降低通信(等待/加盟)的开销。
假设你有一个任务管理器,是用于池莉构建N至允许等待任务之前通知信号,`
#include <algorithm>
#include <array>
#include <cstdint>
#include <vector>
#include <sometaskmanager.h>
void parallel_merge( size_t N ) {
std::array<int, 1000> ary {0};
// fill array...
intmax_t stride_size = ary.size( )/N; //TODO: Put a MIN size here
auto semaphore = make_semaphore( N );
using iterator = typename std::array<int, 1000>::iterator;
std::vector<std::pair<iterator, iterator>> ranges;
auto last_it = ary.begin( );
for( intmax_t n=stride_size; n<N; n +=stride_size ) {
ranges.emplace_back( last_it, std::next(last_it, std::min(std::distance(last_it, ary.end()), stride_size)));
semaphore.notify( );
}
for( auto const & rng: ranges ) {
add_task( [&semaphore,rng]( ) {
std::sort( rng.first, rng.second );
});
}
semaphore.wait( );
std::vector<std::pair<iterator, iterator>> new_rng;
while( ranges.size( ) > 1 ) {
semaphore = make_semaphore( ranges.size( )/2 );
for( size_t n=0; n<ranges.size( ); n+=2 ) {
auto first=ranges[n].first;
auto last=ranges[n+1].second;
add_task( [&semaphore, first, mid=ranges[n].second, last]( ) {
std::inplace_merge( first, mid, last );
semaphore.notify( );
});
new_rng.emplace_back( first, last );
}
if( ranges.size( ) % 2 != 0 ) {
new_rng.push_back( ranges.back( ) );
}
ranges = new_rng;
semaphore.wait( );
}
}
正如你所看到的,瓶颈是在合并阶段是有很多cordination的是必须做到的。 肖恩家长做建设任务管理器的一个很好的演示,如果你没有一个和它是如何随着他的介绍更好的代码相对性能分析比较:并发, http://sean-parent.stlab.cc/presentations /2016-11-16-concurrency/2016-11-16-concurrency.pdf 。 TBB和PPL有任务管理器。