这个问题应该比我过去几年稍微简单一些。 我实现了我的程序如下工作队列:
Pool.h:
// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
public:
tpool( std::size_t tpool_size );
~tpool();
template< typename Task >
void run_task( Task task ){
boost::unique_lock< boost::mutex > lock( mutex_ );
if( 0 < available_ ) {
--available_;
io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function< void() > ( task ) ) );
}
}
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
void wrap_task( boost::function< void() > task );
};
extern tpool dbpool;
#endif
pool.cpp:
#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include "pool.h"
tpool::tpool( std::size_t tpool_size ) : work_( io_service_ ), available_( tpool_size ) {
for ( std::size_t i = 0; i < tpool_size; ++i ){
threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) );
}
}
tpool::~tpool() {
io_service_.stop();
try {
threads_.join_all();
}
catch( ... ) {}
}
void tpool::wrap_task( boost::function< void() > task ) {
// run the supplied task
try {
task();
} // suppress exceptions
catch( ... ) {
}
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
tpool dbpool( 50 );
问题是,虽然,是不是所有的我的电话给run_task()
正在工作线程完成。 我不知道这是否是因为它没有进入队列或因为当创建它的线程退出任务消失。
所以我的问题是,是否有什么特别的东西我必须给boost::thread
,使其等待队列解锁? 什么是任务的预期寿命进入到一个队列? 完成的任务出门时创建它们的线程退出范围是什么? 如果是这样,我怎么能防止这种情况发生?
编辑:我做了我的代码以下更改:
template< typename Task >
void run_task( Task task ){ // add item to the queue
io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function< void() > ( task ) ) );
}
和我现在看到的所有条目被正确输入。 不过,我留下了一个挥之不去的问题:什么是任务的一生添加到队列? 难道他们不再存在,一旦创建它们退出线程?
好。 这真的很简单; 你拒绝的任务发布!
template< typename Task >
void run_task(task task){
boost::unique_lock<boost::mutex> lock( mutex_ );
if(0 < available_) {
--available_;
io_service_.post(boost::bind(&tpool::wrap_task, this, boost::function< void() > ( task )));
}
}
需要注意的是lock
“等待”,直到互斥不是由一个线程拥有。 这可能已经是情况下,可能当available_
已经是0。现在行
if(0 < available_) {
这条线路是简单的条件。 这不是“神奇”,因为你持有的mutex_
锁定。 (该计划甚至不知道之间存在的关系mutex_
和available_
)。 所以,如果available_ <= 0
,你只跳过发布工作。
解决方案#1
您应该使用io_service
排队等你。 这可能是你想在第一时间达到什么目的。 相反,保持的“可用”主题赛道, io_service
为你做的工作。 您可以控制多少个线程,它可以使用,通过运行io_service
尽可能多的线程。 简单。
由于io_service
已经是线程安全的,你可以不用锁。
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
public:
tpool( std::size_t tpool_size );
~tpool();
template<typename Task>
void run_task(Task task){
io_service_.post(task);
}
private:
// note the order of destruction of members
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
};
extern tpool dbpool;
#endif
#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
//#include "pool.h"
tpool::tpool(std::size_t tpool_size) : work_(io_service_) {
for (std::size_t i = 0; i < tpool_size; ++i)
{
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_)
);
}
}
tpool::~tpool() {
io_service_.stop();
try {
threads_.join_all();
}
catch(...) {}
}
void foo() { std::cout << __PRETTY_FUNCTION__ << "\n"; }
void bar() { std::cout << __PRETTY_FUNCTION__ << "\n"; }
int main() {
tpool dbpool(50);
dbpool.run_task(foo);
dbpool.run_task(bar);
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
对于停产的目的,你将需要启用“清算”的io_service::work
对象,否则您的池将永远不会退出。
解决方案#2
不要使用io_service
,而不是推出自己的队列实现了条件变量,通知被张贴的新作一个工作线程。 再次,工人的数量由该组中的线程的数目来确定。
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
private:
mutex mx;
condition_variable cv;
typedef function<void()> job_t;
std::deque<job_t> _queue;
thread_group pool;
boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}
public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}
void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));
cv.notify_one();
}
optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
if (_queue.empty())
return none;
auto job = std::move(_queue.front());
_queue.pop_front();
return std::move(job);
}
~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
};
void the_work(int id)
{
std::cout << "worker " << id << " entered\n";
// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done\n";
}
int main()
{
thread_pool pool; // uses 1 thread per core
for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
}