Can I report progress for openmp tasks?

2020-05-27 04:47发布

问题:

Imagine a classic OMP task:

  • Summing a large vector of doubles in the range [0.0, 1.0)

Live On Coliru

using namespace std;

int main() {
    vector<double> v;

    // generate some data
    generate_n(back_inserter(v), 1ul << 18, 
       bind(uniform_real_distribution<double>(0,1.0), default_random_engine { random_device {}() }));

    long double sum = 0;

    {
#pragma omp parallel for reduction(+:sum)
        for(size_t i = 0; i < v.size(); i++)
        {
            sum += v[i];
        }
    }
    std::cout << "Done: sum = " << sum << "\n";
}

I have trouble coming up with an idea how to report progress. After all, OMP is handling all the coordination between team threads for me, and I don't have a piece of global state.

I could potentially use a regular std::thread and observe some shared variable from there, but isn't there a more "omp-ish" way to achieve this?

回答1:

Just let each thread in the team track local progress and update a global counter atomically. You could still make another thread observe it, or, as in my sample below, you could just do the terminal output within an OMP critical section.

The key here is to tune for a stepsize that doesn't lead to highly frequent updates, because then the locking for the critical region (and to a lesser extent the atomic load/stores) would degrade performance.

Live On Coliru

#include <omp.h>
#include <vector>
#include <random>
#include <algorithm>
#include <iterator>
#include <functional>
#include <iostream>
#include <iomanip>

using namespace std;

int main() {
    vector<double> v;
    // generate some data
    generate_n(back_inserter(v), 1ul << 18, bind(uniform_real_distribution<double>(0,1.0), default_random_engine { random_device {}() }));

    auto step_size   = 100ul;
    auto total_steps = v.size() / step_size + 1;

    size_t steps_completed = 0;
    long double sum = 0;

#pragma omp parallel 
    {
        size_t local_count = 0;


#pragma omp for reduction(+:sum)
        for(size_t i = 0; i < v.size(); i++)
        {
            sum += v[i];

            if (local_count++ % step_size == step_size-1)
            {
#pragma omp atomic
                ++steps_completed;

                if (steps_completed % 100 == 1)
                {
#pragma omp critical
                    std::cout << "Progress: " << steps_completed << " of " << total_steps << " (" << std::fixed << std::setprecision(1) << (100.0*steps_completed/total_steps) << "%)\n";
                }
            }
        }
    }
    std::cout << "Done: sum = " << sum << "\n";
}

Finally, print the result. Output:

Progress: 1 of 2622 (0.0%)
Progress: 191 of 2622 (7.3%)
Progress: 214 of 2622 (8.2%)
Progress: 301 of 2622 (11.5%)
Progress: 401 of 2622 (15.3%)
Progress: 501 of 2622 (19.1%)
Progress: 601 of 2622 (22.9%)
Progress: 701 of 2622 (26.7%)
Progress: 804 of 2622 (30.7%)
Progress: 901 of 2622 (34.4%)
Progress: 1003 of 2622 (38.3%)
Progress: 1101 of 2622 (42.0%)
Progress: 1201 of 2622 (45.8%)
Progress: 1301 of 2622 (49.6%)
Progress: 1402 of 2622 (53.5%)
Progress: 1501 of 2622 (57.2%)
Progress: 1601 of 2622 (61.1%)
Progress: 1701 of 2622 (64.9%)
Progress: 1801 of 2622 (68.7%)
Progress: 1901 of 2622 (72.5%)
Progress: 2001 of 2622 (76.3%)
Progress: 2101 of 2622 (80.1%)
Progress: 2203 of 2622 (84.0%)
Progress: 2301 of 2622 (87.8%)
Progress: 2402 of 2622 (91.6%)
Progress: 2501 of 2622 (95.4%)
Progress: 2601 of 2622 (99.2%)
Done: sum = 130943.8


回答2:

On processors without native atomic support (and even with them) using #pragma omp atomic, as the other answers here suggest, can slow your program down.

The idea of a progress indicator is to give the user an idea of when something will finish. If you're on target plus/minus a smallish fraction of the total run-time, the user isn't going to be too bothered. That is, the user would prefer that things finish sooner at the expense of knowing more exactly when things will finish.

For this reason, I usually track progress on only a single thread and use it to estimate total progress. This is just fine for situations in which each thread has a similar workload. Since you are using #pragma omp parallel for, you're likely working over a series of similar elements without interdependencies, so my assumption is probably valid for your use-case.

I've wrapped this logic in a class ProgressBar, which I usually include in a header file, along with its helper class Timer. The class uses ANSI control signals to keep things looking nice.

The output looks like this:

[======                                            ] (12% - 22.0s - 4 threads)

It's also easy to have the compiler eliminate all the overhead of the progressbar by declaring the -DNOPROGRESS compilation flag.

Code and an example usage follows:

#include <iostream>
#include <chrono>
#include <thread>
#include <iomanip>
#include <stdexcept>

#ifdef _OPENMP
  ///Multi-threading - yay!
  #include <omp.h>
#else
  ///Macros used to disguise the fact that we do not have multithreading enabled.
  #define omp_get_thread_num()  0
  #define omp_get_num_threads() 1
#endif


///@brief Used to time how intervals in code.
///
///Such as how long it takes a given function to run, or how long I/O has taken.
class Timer{
 private:
  typedef std::chrono::high_resolution_clock clock;
  typedef std::chrono::duration<double, std::ratio<1> > second;

  std::chrono::time_point<clock> start_time; ///< Last time the timer was started
  double accumulated_time;                   ///< Accumulated running time since creation
  bool running;                              ///< True when the timer is running

 public:
  Timer(){
    accumulated_time = 0;
    running          = false;
  }

  ///Start the timer. Throws an exception if timer was already running.
  void start(){
    if(running)
      throw std::runtime_error("Timer was already started!");
    running=true;
    start_time = clock::now();
  }

  ///Stop the timer. Throws an exception if timer was already stopped.
  ///Calling this adds to the timer's accumulated time.
  ///@return The accumulated time in seconds.
  double stop(){
    if(!running)
      throw std::runtime_error("Timer was already stopped!");

    accumulated_time += lap();
    running           = false;

    return accumulated_time;
  }

  ///Returns the timer's accumulated time. Throws an exception if the timer is
  ///running.
  double accumulated(){
    if(running)
      throw std::runtime_error("Timer is still running!");
    return accumulated_time;
  }

  ///Returns the time between when the timer was started and the current
  ///moment. Throws an exception if the timer is not running.
  double lap(){
    if(!running)
      throw std::runtime_error("Timer was not started!");
    return std::chrono::duration_cast<second> (clock::now() - start_time).count();
  }

  ///Stops the timer and resets its accumulated time. No exceptions are thrown
  ///ever.
  void reset(){
    accumulated_time = 0;
    running          = false;
  }
};


///@brief Manages a console-based progress bar to keep the user entertained.
///
///Defining the global `NOPROGRESS` will
///disable all progress operations, potentially speeding up a program. The look
///of the progress bar is shown in ProgressBar.hpp.
class ProgressBar{
 private:
  uint32_t total_work;    ///< Total work to be accomplished
  uint32_t next_update;   ///< Next point to update the visible progress bar
  uint32_t call_diff;     ///< Interval between updates in work units
  uint32_t work_done;
  uint16_t old_percent;   ///< Old percentage value (aka: should we update the progress bar) TODO: Maybe that we do not need this
  Timer    timer;         ///< Used for generating ETA

  ///Clear current line on console so a new progress bar can be written
  void clearConsoleLine() const {
    std::cerr<<"\r\033[2K"<<std::flush;
  }

 public:
  ///@brief Start/reset the progress bar.
  ///@param total_work  The amount of work to be completed, usually specified in cells.
  void start(uint32_t total_work){
    timer = Timer();
    timer.start();
    this->total_work = total_work;
    next_update      = 0;
    call_diff        = total_work/200;
    old_percent      = 0;
    work_done        = 0;
    clearConsoleLine();
  }

  ///@brief Update the visible progress bar, but only if enough work has been done.
  ///
  ///Define the global `NOPROGRESS` flag to prevent this from having an
  ///effect. Doing so may speed up the program's execution.
  void update(uint32_t work_done0){
    //Provide simple way of optimizing out progress updates
    #ifdef NOPROGRESS
      return;
    #endif

    //Quick return if this isn't the main thread
    if(omp_get_thread_num()!=0)
      return;

    //Update the amount of work done
    work_done = work_done0;

    //Quick return if insufficient progress has occurred
    if(work_done<next_update)
      return;

    //Update the next time at which we'll do the expensive update stuff
    next_update += call_diff;

    //Use a uint16_t because using a uint8_t will cause the result to print as a
    //character instead of a number
    uint16_t percent = (uint8_t)(work_done*omp_get_num_threads()*100/total_work);

    //Handle overflows
    if(percent>100)
      percent=100;

    //In the case that there has been no update (which should never be the case,
    //actually), skip the expensive screen print
    if(percent==old_percent)
      return;

    //Update old_percent accordingly
    old_percent=percent;

    //Print an update string which looks like this:
    //  [================================================  ] (96% - 1.0s - 4 threads)
    std::cerr<<"\r\033[2K["
             <<std::string(percent/2, '=')<<std::string(50-percent/2, ' ')
             <<"] ("
             <<percent<<"% - "
             <<std::fixed<<std::setprecision(1)<<timer.lap()/percent*(100-percent)
             <<"s - "
             <<omp_get_num_threads()<< " threads)"<<std::flush;
  }

  ///Increment by one the work done and update the progress bar
  ProgressBar& operator++(){
    //Quick return if this isn't the main thread
    if(omp_get_thread_num()!=0)
      return *this;

    work_done++;
    update(work_done);
    return *this;
  }

  ///Stop the progress bar. Throws an exception if it wasn't started.
  ///@return The number of seconds the progress bar was running.
  double stop(){
    clearConsoleLine();

    timer.stop();
    return timer.accumulated();
  }

  ///@return Return the time the progress bar ran for.
  double time_it_took(){
    return timer.accumulated();
  }

  uint32_t cellsProcessed() const {
    return work_done;
  }
};

int main(){
  ProgressBar pg;
  pg.start(100);
  //You should use 'default(none)' by default: be specific about what you're
  //sharing
  #pragma omp parallel for default(none) schedule(static) shared(pg)
  for(int i=0;i<100;i++){
    pg.update(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
}


回答3:

My code below is similar to the sehe one, but there are some differences, which allowed me to cope with skipped points to report because of exact equalities, involving division by modulo. Also, the global counter collects actual loop executions for all threads, but it might be imprecise - which is acceptable for this particular problem. I use only the master thread for reporting.

const size_t size = ...
const size_t step_size = size / 100;
const size_t nThreads = ...
const size_t local_count_max = step_size / nThreads;
size_t count = 0;
#pragma omp parallel num_threads(nThreads)
{
  size_t reported_count = 0;
  size_t local_count = 0;
  #pragma omp for
  for (size_t i = 0; i < size; ++i)
  {
    <... do some useful work ...>
    // -------------------------- update local and global progress counters
    if (local_count >= local_count_max)
    {
      #pragma omp atomic
      count += local_count_max;
      local_count = 0;
    }
    else
    {
      ++local_count;
    }
    // ------------------------------ report progress (in master thread only)
    #pragma omp master
    if (count - reported_count >= step_size)
    {
      <... report the progress ...>
      reported_count = count;
    }
  }
}