Non-blocking pthread stop - or why does std::atomi

2019-07-24 05:47发布

问题:

I implemented a little multithreaded application which does the following:

MainThread

A main thread starts a timer using setitimer and starts up to 8 threads. The timer from the main thread is used to read repeatedly from > a file (every 0.25s). When the timer is called 20 times (after ~5s), I want to stop the threads and get the amount of computations done by every thread.

MainThread.h

class MainThread {
  private:
    int counter;
    ThreadManager tm;
    bool registerTimer(double seconds);
    void startTimerWithInterval(double interval);
    void read() {
      /**
       *  If counter >= 20, call stopWorker on all threads
       */
      tm.stopWorkers();
    }
  public:
    MainThread():counter(0){}
}

WorkerThreads

Perform some expensive computations whithin an infinity loop. After a certain amount of computations, the thread has to store the number of computations it performed. This value (amount of computations) has to be quite accurate, so i think I have to stop the threads (quite) immediatly.

ThreadClass.h

class WorkerThread { 
  private:
    /**
     * ...
     */
    std::atomic_flag keep_Running = ATOMIC_FLAG_INIT;

    static void* run(void* args) {
      ((WorkerThread*)args)->process();
      pthread_exit(nullptr);
      return nullptr;
    }

  public:
    /**
     * ...
     */
    bool startWorker() {
      keep_Running.test_and_set();
      bool result = (pthread_create(&thread, pthread_attr, run, this) == 0);
      if(!result) {
        keep_Running.clear();
      }
      return result;
    }
    void stopWorker() {
      keep_Running.clear();
    }
    bool keepRunning() {
      return keep_Running.test_and_set();
    }
    virtual void process() = 0;
};

ComputationThread.h

class ComputationThread : public WorkerThread {
  public:
    virtual void process() override {
      /**
       *  Perform computations with ~400MB data
       *  check every 16B, whether keepRunning still true
       */
      bool keep_running = true;
      while(keep_running) {
        /**
         * Process 4B
         */
        keep_running = keepRunning();
      }
    }
};

If I use some kind of flag, to track the running state of a thread, i have to make this flag threadsafe, don't I? I tried a std::atomic_flag because it should be lock-free and has atomic operations, but this results in a dramatically drop of performance. My question is, does the std::atomic_flag causes the performance drop or is this just because I perform the check way too often? Does anyone knows a better way?

Before you ask, I HAVE to use pthread instead of std::thread to assign a thread to a specified core within the thread creation (using pthread_attrib_t).

回答1:

Don't use std::atomic_flag.

It is meant as a low level atomic primitive and therefore has a very limited interface.
Its main limitation is that you can only test its value by setting it to true in a single atomic call named test_and_set()
This is a Read-Modify-Write operation (RMW) which performs expensive synchronization between all cores. Since you are calling this on every loop iteration, it slows down significantly.

Use a regular atomic<bool> and set it once you are finished. This way, inside the loop you only have to read it, which is an atomic load and that translates to a regular mov operation. Setting a specific memory order will have no impact on performance (at least on X86).



回答2:

std::atomic_flag::test_and_set() includes a default argument of std::memory_order order = memory_order_seq_cst

Atomic operations tagged memory_order_seq_cst not only order memory the same way as release/acquire ordering (everything that happened-before a store in one thread becomes a visible side effect in the thread that did a load), but also establish a single total modification order of all atomic operations that are so tagged.

...

Total sequential ordering requires a full memory fence CPU instruction on all multi-core systems. This may become a performance bottleneck since it forces the affected memory accesses to propagate to every core.

This flag for the memory_order is going to cause each thread to perform its memory operations for test_and_set in order, loading and saving to memory sequentially, which is going to be slower as each thread is going to spend time waiting on the other threads to perform their memory operations.