I've implemented a working multithreaded merge sort in C++, but I've hit a wall.
In my implementation, I recursively split an input vector into two parts, and then thread these two parts:
void MergeSort(vector<int> *in)
{
if(in->size() < 2)
return;
vector<int>::iterator ite = in->begin();
vector<int> left = vector<int> (ite, ite + in->size()/2);
vector<int> right = vector<int> (ite + in->size()/2, in->end() );
//current thread spawns 2 threads HERE
thread t1 = thread(MergeSort, &left);
thread t2 = thread(MergeSort, &right);
t1.join();
t2.join();
vector<int> ret;
ret.reserve(in->size() );
ret = MergeSortMerge(left, right);
in->clear();
in->insert(in->begin(), ret.begin(), ret.end() );
return;
}
The code appears to be pretty, but it's one of the most vicious codes I've ever written. Trying to sort an array of more than 1000 int values causes so many threads to spawn, that I run out of stack space, and my computer BSODs :( Consistently.
I am well aware of the reason why this code spawns so many threads, which isn't so good, but technically (if not theoretically), is this not a proper implementation?
Based on a bit of Googling, I seem to have found the need for a threadpool. Would the use of a threadpool resolve the fundamental issue I am running into, the fact that I am trying to spawn too many threads? If so, do you have any recommendations on libraries?
Thank you for the advice and help!
As zdan explained, you shall limit the number of threads. There are two things to consider to determine what's the limit,
The number of CPU cores. In C++11, you can use std::thread::hardware_concurrency()
to determine the hardware cores. However, this function may return 0 meaning that the program doesn't know how many cores, in this case, you may assume this value to be 2 or 4.
Limited by the number of data to be processed. You can divide the data to be processed by threads until 1 data per thread, but it will cost too much for only 1 data and it's not cost efficient. For example, you can probably say, when the number of data is smaller than 50, you don't want to divide anymore. So you can determine the maximum number of threads required based on something like total_data_number / 50 + 1
.
Then, you choose a minimum number between case 1 & case 2 to determine the limit.
In your case, because you are generating thread by recursion, you can try to determine the recursion depth in similar ways.
I don't think a threadpool is going to help you. Since your algorithm is recursive you'll get to a point where all threads in your pool are consumed and the pool won't want to create any more threads and your algorithm will block.
You could probably just limit your thread creation recursion depth to 2 or 3 (unless you've got a LOT of CPU's it won't make any difference in performance).
You can set your limits on stack space but it is futile. Too many threads, even with a pool, will eat it up at log2(N)*cost per thread. Go for an iterative approach and reduce your overhead. Overhead is the killer.
As far as performance goes you are going to find that using some level of over commit of N threads, where is is the hardware concurrency will probably yield the best results. There will be a good balance between overhead and work per core. If N get's very large, like on a GPU, then other options exist(Bitonic) that make different trade-offs to reduce the communication(waiting/joining) overhead.
Assuming you have a task manager and a semaphore that is contructed for N notifies before allowing the waiting task through,
`
#include <algorithm>
#include <array>
#include <cstdint>
#include <vector>
#include <sometaskmanager.h>
void parallel_merge( size_t N ) {
std::array<int, 1000> ary {0};
// fill array...
intmax_t stride_size = ary.size( )/N; //TODO: Put a MIN size here
auto semaphore = make_semaphore( N );
using iterator = typename std::array<int, 1000>::iterator;
std::vector<std::pair<iterator, iterator>> ranges;
auto last_it = ary.begin( );
for( intmax_t n=stride_size; n<N; n +=stride_size ) {
ranges.emplace_back( last_it, std::next(last_it, std::min(std::distance(last_it, ary.end()), stride_size)));
semaphore.notify( );
}
for( auto const & rng: ranges ) {
add_task( [&semaphore,rng]( ) {
std::sort( rng.first, rng.second );
});
}
semaphore.wait( );
std::vector<std::pair<iterator, iterator>> new_rng;
while( ranges.size( ) > 1 ) {
semaphore = make_semaphore( ranges.size( )/2 );
for( size_t n=0; n<ranges.size( ); n+=2 ) {
auto first=ranges[n].first;
auto last=ranges[n+1].second;
add_task( [&semaphore, first, mid=ranges[n].second, last]( ) {
std::inplace_merge( first, mid, last );
semaphore.notify( );
});
new_rng.emplace_back( first, last );
}
if( ranges.size( ) % 2 != 0 ) {
new_rng.push_back( ranges.back( ) );
}
ranges = new_rng;
semaphore.wait( );
}
}
As you can see, the bottleneck is in the merge phase as there is a lot of cordination that must be done. Sean Parent does a good presentation of building a task manager if you don't have one and about how it compares along with a relative performance analysis in his presentation Better Code: Concurrency, http://sean-parent.stlab.cc/presentations/2016-11-16-concurrency/2016-11-16-concurrency.pdf . TBB and PPL have task managers.