我想创建使用boost :: ASIO有限的线程池类。 但我停留在一个点上可有一个人帮我。
唯一的问题是我应该减少计数器的地方吗?
如预期的代码不起作用。
问题是我不知道什么时候我的线程执行完成,我怎么会知道,它已经回到泳池
#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>
using namespace std;
using namespace boost;
class ThreadPool
{
static int count;
int NoOfThread;
thread_group grp;
mutex mutex_;
asio::io_service io_service;
int counter;
stack<thread*> thStk ;
public:
ThreadPool(int num)
{
NoOfThread = num;
counter = 0;
mutex::scoped_lock lock(mutex_);
if(count == 0)
count++;
else
return;
for(int i=0 ; i<num ; ++i)
{
thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
}
}
~ThreadPool()
{
io_service.stop();
grp.join_all();
}
thread* getThread()
{
if(counter > NoOfThread)
{
cout<<"run out of threads \n";
return NULL;
}
counter++;
thread* ptr = thStk.top();
thStk.pop();
return ptr;
}
};
int ThreadPool::count = 0;
struct callable
{
void operator()()
{
cout<<"some task for thread \n";
}
};
int main( int argc, char * argv[] )
{
callable x;
ThreadPool pool(10);
thread* p = pool.getThread();
cout<<p->get_id();
//how i can assign some function to thread pointer ?
//how i can return thread pointer after work done so i can add
//it back to stack?
return 0;
}
总之,你需要与其他功能,将包裹用户提供的任务:
- 调用用户函数或调用的对象。
- 锁定互斥和递减计数器。
我可能不理解所有此线程池的要求。 因此,为了清楚,这里是一个明确的清单,以什么我相信是要求:
- 池管理线程的寿命。 用户应该无法删除驻留在池中的线程。
- 用户可以以非侵入性的方式分配给池的任务。
- 当被分配了任务,如果池中的所有线程当前正在运行的其他任务,该任务将被丢弃。
之前我提供一个实现,还有我想强调几个关键点:
- 一旦线程已经启动,它将运行直到完成,取消或终止。 线程执行的功能不能被重新分配。 要允许一个线程在其生命过程中执行多种功能,该线程将要使用的功能将从队列中读取,如推出
io_service::run()
,并调用类型都贴到事件队列,如从io_service::post()
-
io_service::run()
,如果没有工作,在等候退回io_service
时, io_service
停止,或异常是从一个处理器的线程运行抛出。 为了防止io_serivce::run()
距离时,有没有未完成的工作返回时, io_service::work
,可以使用类。 - 定义任务的类型要求(即任务的类型必须是通过调用
object()
语法)而不是需要一种类型的(即,任务必须从继承process
),提供了更多的灵活性给用户。 它允许用户提供一个任务作为函数指针或一个类型提供无参operator()
实现使用boost::asio
:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}
/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}
/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}
private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};
有关落实情况的几点意见:
- 异常处理需要围绕用户的任务发生。 如果用户的函数或可调用对象抛出一个异常的类型是不
boost::thread_interrupted
,然后std::terminate()
被调用。 这是Boost.Thread的的结果在线程函数的异常行为。 这也是值得一读Boost.Asio的的从处理器抛出的异常的效果 。 - 如果用户提供的
task
通过boost::bind
,那么嵌套boost::bind
将编译失败。 一个下列选项是必需的: - 不支持
task
所创造boost::bind
。 - 元编程进行编译时分支基于是否将用户的类型,如果结果
boost::bind
,这样boost::protect
可以使用,如boost::protect
某些函数对象只正常功能。 - 使用另一种类型的传递
task
间接对象。 我选择使用boost::function
在失去的确切类型的成本以提高可读性。 boost::tuple
,而略少可读,也可以用来保存确切类型,如Boost.Asio的的看到系列化的例子。
应用程序代码现在可以使用thread_pool
型非侵入:
void work() {};
struct worker
{
void operator()() {};
};
void more_work( int ) {};
int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}
该thread_pool
可以在没有Boost.Asio的创建,并可能对维护稍微容易些,因为他们不再需要了解Boost.Asio
行为,如确实在io_service::run()
回报,什么是io_service::work
目标:
#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}
/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}
try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}
private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;
// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();
lock.unlock();
// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};