c++ lambda callback to trigger event

2019-05-24 12:01发布

I have been trying to wrap my head around the callback functionality in c++. What I am trying to achieve is the following:

I have two objects, each one with its own thread. One object A has a pointer to the second object B. See example:

class A
{
  public:
   // ...
  private:
   std::unique_ptr<B> b;
};

class B
{
  public:
   void add_message(MessageType msg);
   // ...
};

What I am trying to achieve is having object A add a message using the pointer to B and then continue doing other stuff, but having a callback or handler or something that gets triggered when B has a reply to that message. B does some processing with the message and might pass it to other objects for processing on its own thread but eventually will come up with a reply. So how can I know when B has a reply to my message, for example:

// In class A
MessageType m();
b->add_message(m)
// A's thread continues doing other stuff
...
// some notification that b has a reply?

I know I might have to use std::function for a callback which I would like to use but I can't get my head around how exactly to do this by looking at a lot of examples already. Any help is appreciated and note that I have looked at a lot of examples but can't tie it back to what I am trying to achieve or am not understanding...

1条回答
爱情/是我丢掉的垃圾
2楼-- · 2019-05-24 12:18

Threads are sequences of execution. They behave roughly like linear C++ programs, embedded within a memory model that lets them communicate and notice state changes caused by other threads of execution.

A callback to a thread cannot take over a sequence of execution without cooperation from the thread. The thread you want to notify has to explicitly check to see if a message has arrived and process it.


There are two common ways to handle responses to messages.

The first is a std::future like method. In it, the caller gets a token of some kind, and that token represents the answer that may or will be produced in the future.

The second is to just use messaging again. You send a message to B requesting a response. B sends a message back to A containing the response. The same way that B recieves messages, A recieves messages back. The message may contain a "return target" of some kind to help A link it to the original message.

In a message-based system, it is common to have an "event loop". Instead of a large, linear program, you have a thread that repeatedly returns back to the "event loop". There it checks a queue for messages, and if none are there waits for some.

Tasks have to be broken down into bite sized chunks under such a system, so that you check the event loop often enough to be responsive.

One way to do this is with coroutines, a state of execution without owning its own executor (like a thread, which owns both). Coroutines periodically give up priority and "save their state for later".


The future solution is often the easiest, but it relies on A periodically checking for a response.

First, a threaded_queue<T>, which lets any number of producers and consumers pass things into a queue and eat them off the front:

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};

Now, we want to pass tasks into such a queue, and have the one passing the task in get a result back. Here is a use of the above with packaged tasks:

template<class...Args>
struct threaded_task_queue {
  threaded_task_queue() = default;
  threaded_task_queue( threaded_task_queue&& ) = delete;
  threaded_task_queue& operator=( threaded_task_queue&& ) = delete;
  ~threaded_task_queue() = default;
  template<class F, class R=std::result_of_t<F&(Args...)>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R(Args...)> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  void terminate() {
    tasks.terminate();
  }
  std::function<void(Args...)> pop_task() {
    auto task = tasks.pop_front();
    if (!task) return {};
    auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task));
    return [task_ptr](Args...args){
      (*task_ptr)(std::forward<Args>(args)...);
    };
  }
private:
  threaded_queue<std::packaged_task<void(Args...)>> tasks;
};

If I did that right, it works like this:

  • A sends queues a task to B in the form of a lambda. This lambda takes some fixed set of arguments (provided by B), and returns some value.

  • B pops the queue, and gets a std::function that takes the arguments. It invokes it; it returns void in B's context.

  • A was given a future<R> when it queued the task. It can query this to see if it is finished.

You'll note that A cannot be "notified" that things are done. That requires a different solution. But if A eventually gets to a point where it cannot progress without waiting on the result from B, this system works.

On the other hand, if A accumulates a large supply of such messages and sometimes needs to wait on input from many such Bs until any one of them return data (or the user does something), you need something more advanced than a std::future<R>. The general pattern -- having a token that represents future computation to be delivered -- is solid. But you need to augment it to play well with multiple sources of future computation and message loops and the like.

Code not tested.

One approach for threaded_task_queue when you are sending messages is:

template<class Signature>
struct message_queue;
template<class R, class...Args>
struct message_queue<R(Args...) :
  threaded_task_queue< std::function<R(Args...)> >
{
  std::future<R> queue_message(Args...args) {
    return this->queue_task(
      [tup = std::make_tuple(std::forward<Args>(args)...)]
      ( std::function<R(Args...)> f ) mutable
      {
        return std::apply( f, std::move(tup) );
      }
    );
  }
  bool consume_message( std::function<R(Args...)> f )
  {
    auto task = pop_task();
    if (!task) return false;
    task( std::move(f) );
    return true;
  }
};

where on the provider side, you provide Args..., and on the consumer side you consume Args... and return R, and on the provider side you have a future<R> to get the result once the consumer is done.

This may be more natural than the raw threaded_task_queue I wrote.

std::apply is C++17 but there are implementations in the wild for C++11 and C++14.

查看更多
登录 后发表回答