How to connect signal to boost::asio::io_service w

2019-09-04 07:34发布

问题:

I'm trying to use a boost::lockfree queue to manage tasks. These tasks retrieve data and would be processed on a worker thread. Once data is retrieved, a signal should be sent to the main thread with the data. The worker thread is spawned at the start of the application and just keeps polling the queue. I'm new to Boost::Asio but from my research, it seems to be the best mechanism for sending signals between threads.

I've looked at several examples, in particular:

  • Confused when boost::asio::io_service run method blocks/unblocks
  • boost asio post not working , io_service::run exits right after post

Here is my code:

#include "stdafx.h"
#include <thread>

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
#include <boost/signals2.hpp>

typedef boost::signals2::signal<void(int)> signal_type;

class Task
{
public:
    Task(int handle) : _handle(handle) {};
   ~Task() {};

   virtual void Execute()
   {
      int result = _handle * 2;
   }

private:
   int _handle;
};


class Manager
{
public:
   Manager() 
   {
      _mainService = std::make_shared<boost::asio::io_service>();
      _workerService = std::make_shared<boost::asio::io_service>();
      _work = std::make_shared<boost::asio::io_service::work>(*_workerService);

      _threadStarted = false;
      Start();
   };

   ~Manager() {};

   void WorkerMain()
   {
      _workerService->poll();
   }

   void Start()
   {
      if (_threadStarted) return;

      _workerThread = std::thread(&Manager::WorkerMain, this);
      _threadStarted = true;
   }

   void Stop()
   {
      if (_threadStarted == false) return;

      _mainService->stop();
      _workerThread.join();
      _mainService.reset();
   }

   void OnSignalFetchCompleted(int value)
   {
      int asdf = 0; //do stuff with data on main thread
   }

   void ProcessData(signal_type& signal)
   {
      int i = 0;

      do
      {
         _queue.consume_one([&](std::shared_ptr<Task> task)
         {
            task->Execute();
            //get data from task; send out signal with data
         });

         i++;
      } while (i < 3);
   }

   void QueueData(int handle)
   {
      _signalFetchCompleted.connect(boost::bind(&Manager::OnSignalFetchCompleted, this, _1));
      _workerService->post(boost::bind(&Manager::ProcessData, boost::ref(_signalFetchCompleted))); //!!does not compile

      std::shared_ptr<Task> task = std::make_shared<Task>(handle);
      _queue.push(task);
   }

private:
   boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> _queue;
   std::thread _workerThread;
   bool _threadStarted;

   std::shared_ptr<boost::asio::io_service> _mainService;
   std::shared_ptr<boost::asio::io_service> _workerService;
   std::shared_ptr<boost::asio::io_service::work> _work;

   signal_type _signalFetchCompleted;
};


int _tmain(int argc, _TCHAR* argv[])
{
   std::shared_ptr<Manager> mgr = std::make_shared<Manager>();
   mgr->QueueData(5);
   mgr->QueueData(10);

   mgr->Stop();

   return 0;
}

I'm getting a compile error on the _workerService->Post line that I haven't been able to resolve:

1>C:\Boost\boost/bind/mem_fn.hpp(333): error C2784: 'T *boost::get_pointer(const boost::scoped_ptr<T> &)' : could not deduce template argument for 'const boost::scoped_ptr<T> &' from 'const signal_type'
1>          C:\Boost\boost/smart_ptr/scoped_ptr.hpp(150) : see declaration of 'boost::get_pointer'
1>          C:\Boost\boost/bind/mem_fn.hpp(352) : see reference to function template instantiation 'R (__cdecl &boost::_mfi::dm<R,Manager>::call<const U>(U &,const void *) const)' being compiled
1>          with
1>          [
1>              R=void (signal_type &)
1>  ,            U=signal_type
1>          ]
1>          C:\Boost\boost/bind/mem_fn.hpp(352) : see reference to function template instantiation 'R (__cdecl &boost::_mfi::dm<R,Manager>::call<const U>(U &,const void *) const)' being compiled
1>          with
1>          [
1>              R=void (signal_type &)
1>  ,            U=signal_type
1>          ]
1>          C:\Boost\boost/bind/bind.hpp(243) : see reference to function template instantiation 'R (__cdecl &boost::_mfi::dm<R,Manager>::operator ()<T>(const U &) const)' being compiled
1>          with
1>          [
1>              R=void (signal_type &)
1>  ,            T=signal_type
1>  ,            U=signal_type
1>          ]

Any help resolving this compile error or general comments on this approach would be greatly appreciated. Thanks.

回答1:

In light of new information, the problem is with your boost::bind. You are trying to call a member function without an object to call it on: you are trying to call ProcessData but you haven't told the bind on which object you wish to call it on. You need to give it a Manager to call it on:

_workerService->post(boost::bind(&Manager::ProcessData, this, boost::ref(_signalFetchCompleted)));

This will call ProcessData on this and pass in a reference to _signalFetchCompleted



回答2:

The compiler error seems to be talking about you constructing a boost::asio::io_service::work object and that you are passing it incorrect parameters:

error C2664: 'boost::asio::io_service::work::work(const boost::asio::io_service::work &)' : cannot convert argument 1 from 'std::shared_ptr<boost::asio::io_service>' to 'boost::asio::io_service &'

boost::asio::io_service::work has a constructor which takes a boost::asio::io_service& and a copy constructor; however, you are passing it a std::shared_ptr<boost::asio::io_service>:

_work = std::make_shared<boost::asio::io_service::work>(_workerService);

Here, _workerService is a std::shared_ptr<boost::asio::io_service>, but you need a boost::asio::io_service&. Try the following instead:

_work = std::make_shared<boost::asio::io_service::work>(*_workerService);


回答3:

I think that boost::asio is not the best solution for your task. Have you read about conditional variables? They are much more simple and can be used to achieve your goal.