使用升压ASIO线程池(Thread pool using boost asio)

2019-06-17 22:46发布

我想创建使用boost :: ASIO有限的线程池类。 但我停留在一个点上可有一个人帮我。

唯一的问题是我应该减少计数器的地方吗?

如预期的代码不起作用。

问题是我不知道什么时候我的线程执行完成,我怎么会知道,它已经回到泳池

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class ThreadPool
{
    static int count;
    int NoOfThread;
    thread_group grp;
    mutex mutex_;
    asio::io_service io_service;
    int counter;
    stack<thread*> thStk ;

public:
    ThreadPool(int num)
    {   
        NoOfThread = num;
        counter = 0;
        mutex::scoped_lock lock(mutex_);

        if(count == 0)
            count++;
        else
            return;

        for(int i=0 ; i<num ; ++i)
        {
            thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
        }
    }
    ~ThreadPool()
    {
        io_service.stop();
        grp.join_all();
    }

    thread* getThread()
    {
        if(counter > NoOfThread)
        {
            cout<<"run out of threads \n";
            return NULL;
        }

        counter++;
        thread* ptr = thStk.top();
        thStk.pop();
        return ptr;
    }
};
int ThreadPool::count = 0;


struct callable
{
    void operator()()
    {
        cout<<"some task for thread \n";
    }
};

int main( int argc, char * argv[] )
{

    callable x;
    ThreadPool pool(10);
    thread* p = pool.getThread();
    cout<<p->get_id();

    //how i can assign some function to thread pointer ?
    //how i can return thread pointer after work done so i can add 
//it back to stack?


    return 0;
}

Answer 1:

总之,你需要与其他功能,将包裹用户提供的任务:

  • 调用用户函数或调用的对象。
  • 锁定互斥和递减计数器。

我可能不理解所有此线程池的要求。 因此,为了清楚,这里是一个明确的清单,以什么我相信是要求:

  • 池管理线程的寿命。 用户应该无法删除驻留在池中的线​​程。
  • 用户可以以非侵入性的方式分配给池的任务。
  • 当被分配了任务,如果池中的所有线程当前正在运行的其他任务,该任务将被丢弃。

之前我提供一个实现,还有我想强调几个关键点:

  • 一旦线程已经启动,它将运行直到完成,取消或终止。 线程执行的功能不能被重新分配。 要允许一个线程在其生命过程中执行多种功能,该线程将要使用的功能将从队列中读取,如推出io_service::run() ,并调用类型都贴到事件队列,如从io_service::post()
  • io_service::run() ,如果没有工作,在等候退回io_service时, io_service停止,或异常是从一个处理器的线程运行抛出。 为了防止io_serivce::run()距离时,有没有未完成的工作返回时, io_service::work ,可以使用类。
  • 定义任务的类型要求(即任务的类型必须是通过调用object()语法)而不是需要一种类型的(即,任务必须从继承process ),提供了更多的灵活性给用户。 它允许用户提供一个任务作为函数指针或一个类型提供无参operator()

实现使用boost::asio

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  boost::asio::io_service io_service_;
  boost::asio::io_service::work work_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : work_( io_service_ ),
      available_( pool_size )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &boost::asio::io_service::run,
                                           &io_service_ ) );
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Force all threads to return from io_service::run().
    io_service_.stop();

    // Suppress all exceptions.
    try
    {
      threads_.join_all();
    }
    catch ( const std::exception& ) {}
  }

  /// @brief Adds a task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Post a wrapped task into the queue.
    io_service_.post( boost::bind( &thread_pool::wrap_task, this,
                                   boost::function< void() >( task ) ) );
  }

private:
  /// @brief Wrap a task so that the available count can be increased once
  ///        the user provided task has completed.
  void wrap_task( boost::function< void() > task )
  {
    // Run the user supplied task.
    try
    {
      task();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}

    // Task has finished, so increment count of available threads.
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
  }
};

有关落实情况的几点意见:

  • 异常处理需要围绕用户的任务发生。 如果用户的函数或可调用对象抛出一个异常的类型是不boost::thread_interrupted ,然后std::terminate()被调用。 这是Boost.Thread的的结果在线程函数的异常行为。 这也是值得一读Boost.Asio的的从处理器抛出的异常的效果 。
  • 如果用户提供的task通过boost::bind ,那么嵌套boost::bind将编译失败。 一个下列选项是必需的:
    • 不支持task所创造boost::bind
    • 元编程进行编译时分支基于是否将用户的类型,如果结果boost::bind ,这样boost::protect可以使用,如boost::protect某些函数对象只正常功能。
    • 使用另一种类型的传递task间接对象。 我选择使用boost::function在失去的确切类型的成本以提高可读性。 boost::tuple ,而略少可读,也可以用来保存确切类型,如Boost.Asio的的看到系列化的例子。

应用程序代码现在可以使用thread_pool型非侵入:

void work() {};

struct worker
{
  void operator()() {};
};

void more_work( int ) {};

int main()
{ 
  thread_pool pool( 2 );
  pool.run_task( work );                        // Function pointer.
  pool.run_task( worker() );                    // Callable object.
  pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}

thread_pool可以在没有Boost.Asio的创建,并可能对维护稍微容易些,因为他们不再需要了解Boost.Asio行为,如确实在io_service::run()回报,什么是io_service::work目标:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  std::queue< boost::function< void() > > tasks_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool running_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : available_( pool_size ),
      running_( true )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Set running flag to false then notify all threads.
    {
      boost::unique_lock< boost::mutex > lock( mutex_ );
      running_ = false;
      condition_.notify_all();
    }

    try
    {
      threads_.join_all();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}
  }

  /// @brief Add task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Set task and signal condition variable so that a worker thread will
    // wake up andl use the task.
    tasks_.push( boost::function< void() >( task ) );
    condition_.notify_one();
  }

private:
  /// @brief Entry point for pool threads.
  void pool_main()
  {
    while( running_ )
    {
      // Wait on condition variable while the task is empty and the pool is
      // still running.
      boost::unique_lock< boost::mutex > lock( mutex_ );
      while ( tasks_.empty() && running_ )
      {
        condition_.wait( lock );
      }
      // If pool is no longer running, break out.
      if ( !running_ ) break;

      // Copy task locally and remove from the queue.  This is done within
      // its own scope so that the task object is destructed immediately
      // after running the task.  This is useful in the event that the
      // function contains shared_ptr arguments bound via bind.
      {
        boost::function< void() > task = tasks_.front();
        tasks_.pop();

        lock.unlock();

        // Run the task.
        try
        {
          task();
        }
        // Suppress all exceptions.
        catch ( const std::exception& ) {}
      }

      // Task has finished, so increment count of available threads.
      lock.lock();
      ++available_;
    } // while running_
  }
};


文章来源: Thread pool using boost asio