I would like to parallelise a linear operation (fitting a complicated mathematical function to some dataset) with multiple processors.
Assume I have 8 cores in my machine, and I want to fit 1000 datasets. What I expect is some system that takes the 1000 datasets as a queue, and sends them to the 8 cores for processing, so it starts by taking the first 8 from the 1000 as FIFO. The fitting times of each dataset is in general different than the other, so some of the 8 datasets being fitted could take longer than the others. What I want from the system is to save the results of the fitted data sets, and then resume taking new datasets from the big queue (1000 datasets) for each thread that is done. This has to resume till the whole 1000 datasets is processed. And then I could move on with my program.
What is such a system called? and are there models for that on C++?
I parallelise with OpenMP, and use advanced C++ techniques like templates and polymorphism.
Thank you for any efforts.
You can either use OpenMP parallel for with dynamic schedule or OpenMP tasks. Both could be used to parallelise cases where each iteration takes different amount of time to complete. With dynamically scheduled for:
#pragma omp parallel
{
Fitter fitter;
fitter.init();
#pragma omp for schedule(dynamic,1)
for (int i = 0; i < numFits; i++)
fitter.fit(..., &results[i]);
}
schedule(dynamic,1)
makes each thread execute one iteration at a time and threads are never left idle unless there are no more iterations to process.
With tasks:
#pragma omp parallel
{
Fitter fitter;
fitter.init();
#pragma omp single
for (int i = 0; i < numFits; i++)
{
#pragma omp task
fitter.fit(..., &results[i]);
}
#pragma omp taskwait
// ^^^ only necessary if more code before the end of the parallel region
}
Here one of the threads runs a for-loop which produces 1000 OpenMP tasks. OMP tasks are kept in a queue and processed by idle threads. It works somewhat similar to dynamic for-loops but allows for greater freedom in the code constructs (e.g. with tasks you can parallelise recursive algorithms). The taskwait
construct waits for all pending tasks to be done. It is implied at the end of the parallel region so it is really necessary only if more code follows before the end of the parallel region.
In both cases each invocation to fit()
will be done in a different thread. You have to make sure that fitting one set of parameters does not affect fitting other sets, e.g. that fit()
is a thread-safe method/function. Both cases also require that the time to execute fit()
is much higher than the overhead of the OpenMP constructs.
OpenMP tasking requires OpenMP 3.0 compliant compiler. This rules out all versions of MS VC++ (even the one in VS2012), should you happen to develop on Windows.
If you'd like to have only one instance of fitter ever initialised per thread, then you should take somewhat different approach, e.g. make the fitter object global and threadprivate
:
#include <omp.h>
Fitter fitter;
#pragma omp threadprivate(fitter)
...
int main()
{
// Disable dynamic teams
omp_set_dynamic(0);
// Initialise all fitters once per thread
#pragma omp parallel
{
fitter.init();
}
...
#pragma omp parallel
{
#pragma omp for schedule(dynamic,1)
for (int i = 0; i < numFits; i++)
fitter.fit(..., &results[i]);
}
...
return 0;
}
Here fitter
is a global instance of the Fitter
class. The omp threadprivate
directive instructs the compiler to put it in the Thread-Local Storage, e.g. to make it per-thread global variable. These persists between the different parallel regions. You can also use omp threadprivate
on static
local variables. These too persist between the different parallel regions (but only in the same function):
#include <omp.h>
int main()
{
// Disable dynamic teams
omp_set_dynamic(0);
static Fitter fitter; // must be static
#pragma omp threadprivate(fitter)
// Initialise all fitters once per thread
#pragma omp parallel
{
fitter.init();
}
...
#pragma omp parallel
{
#pragma omp for schedule(dynamic,1)
for (int i = 0; i < numFits; i++)
fitter.fit(..., &results[i]);
}
...
return 0;
}
The omp_set_dynamic(0)
call disables dynamic teams, i.e. each parallel region will always execute with as many threads as specified by the OMP_NUM_THREADS
environment variable.
What you basically want is a pool of workers (or a thread pool) which take a job from a queue, process it, and proceed with another job afterwards. OpenMP provides different approaches to handle such tasks, e.g. barriers (all workers run until a certain point and only proceed when a certain requirement is fulfilled) or reductions to accumulate values into a global variable after the workers managed to compute their respective parts.
Your question is very broad, but one more hint I can give you is to take a look into the MapReduce paradigm. In this paradigm, a function is mapped over a dataset and the result is ordered into buckets which are reduced using another function (which can possibly be the same function again). In your case this would mean that each of your processors/cores/nodes maps a given function over its assigned set of data and sends the result buckets to another node responsible to combine it. I guess that you have to look into MPI if you want to use MapReduce with C++ and without using a specific MapReduce framework. As you are running the program on one node, maybe you can do something similar with OpenMP, so searching the web for that might helps.
TL;DR search for pool of workers (thread pool), barriers and MapReduce.