First a little context: I'm in the process of learning about threading in C++11 and for this purpose, I'm trying to build a small actor
class, essentially (I left the exception handling and propagation stuff out) like so:
class actor {
private: std::atomic<bool> stop;
private: std::condition_variable interrupt;
private: std::thread actor_thread;
private: message_queue incoming_msgs;
public: actor()
: stop(false),
actor_thread([&]{ run_actor(); })
{}
public: virtual ~actor() {
// if the actor is destroyed, we must ensure the thread dies too
stop = true;
// to this end, we have to interrupt the actor thread which is most probably
// waiting on the incoming_msgs queue:
interrupt.notify_all();
actor_thread.join();
}
private: virtual void run_actor() {
try {
while(!stop)
// wait for new message and process it
// but interrupt the waiting process if interrupt is signaled:
process(incoming_msgs.wait_and_pop(interrupt));
}
catch(interrupted_exception) {
// ...
}
};
private: virtual void process(const message&) = 0;
// ...
};
Every actor runs in its own actor_thread
, waits on a new incoming message on incoming_msgs
and -- when a message arrives -- processes it.
The actor_thread
is created together with the actor
and has to die together with it, which is why I need some kind of interrupt mechanism in the message_queue::wait_and_pop(std::condition_variable interrupt)
.
Essentially, I require that wait_and_pop
blocks until either
a) a new message
arrives or
b) until the interrupt
is fired, in which case -- ideally -- an interrupted_exception
is to be thrown.
The arrival of a new message in the message_queue
is presently modeled also by a std::condition_variable new_msg_notification
:
// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
std::unique_lock<std::mutex> lock(mutex);
// How to interrupt the following, when interrupt fires??
new_msg_notification.wait(lock,[&]{
return !queue.empty();
});
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
To cut the long story short, the question is this: How do I interrupt the waiting for a new message in new_msg_notification.wait(...)
when the interrupt
is triggered (without introducing a time-out)?
Alternatively, the question may be read as: How do I wait until any one of two std::condition_variable
s are signaled?
One naive approach seems to be not to use std::condition_variable
at all for the interrupt and instead just use an atomic flag std::atomic<bool> interrupted
and then busy wait on new_msg_notification
with a very small time-out until either a new message has arrived or until true==interrupted
. However, I would very much like to avoid busy waiting.
EDIT:
From the comments and the answer by pilcrow, it looks like there are basically two approaches possible.
- Enqueue a special "Terminate" message, as proposed by Alan, mukunda and pilcrow. I decided against this option because I have no idea about the size of the queue at the time I want the actor to terminate. It may very well be (as it is mostly the case when I want something to quickly terminate) that there are thousands of messages left to process in the queue and it seems unacceptable to wait for them to be processed until finally the terminate message gets its turn.
- Implement a custom version of a condition variable, that may be interrupted by another thread by forwarding the notification to the condition variable that the first thread is waiting on. I opted for this approach.
For those of you interested, my implementation goes as follows. The condition variable in my case is actually a semaphore
(because I like them more and because I liked the exercise of doing so). I equipped this semaphore with an associated interrupt
which can be obtained from the semaphore via semaphore::get_interrupt()
. If now one thread blocks in semaphore::wait()
, another thread has the possibility to call semaphore::interrupt::trigger()
on the interrupt of the semaphore, causing the first thread to unblock and propagate an interrupt_exception
.
struct
interrupt_exception {};
class
semaphore {
public: class interrupt;
private: mutable std::mutex mutex;
// must be declared after our mutex due to construction order!
private: interrupt* informed_by;
private: std::atomic<long> counter;
private: std::condition_variable cond;
public:
semaphore();
public:
~semaphore() throw();
public: void
wait();
public: interrupt&
get_interrupt() const { return *informed_by; }
public: void
post() {
std::lock_guard<std::mutex> lock(mutex);
counter++;
cond.notify_one(); // never throws
}
public: unsigned long
load () const {
return counter.load();
}
};
class
semaphore::interrupt {
private: semaphore *forward_posts_to;
private: std::atomic<bool> triggered;
public:
interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
assert(forward_posts_to);
std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
forward_posts_to->informed_by = this;
}
public: void
trigger() {
assert(forward_posts_to);
std::lock_guard<std::mutex>(forward_posts_to->mutex);
triggered = true;
forward_posts_to->cond.notify_one(); // never throws
}
public: bool
is_triggered () const throw() {
return triggered.load();
}
public: void
reset () throw() {
return triggered.store(false);
}
};
semaphore::semaphore() : counter(0L), informed_by(new interrupt(this)) {}
// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw() {
delete informed_by;
}
void
semaphore::wait() {
std::unique_lock<std::mutex> lock(mutex);
if(0L==counter) {
cond.wait(lock,[&]{
if(informed_by->is_triggered())
throw interrupt_exception();
return counter>0;
});
}
counter--;
}
Using this semaphore
, my message queue implementation now looks like this (using the semaphore instead of the std::condition_variable
I could get rid of the std::mutex
:
class
message_queue {
private: std::queue<message> queue;
private: semaphore new_msg_notification;
public: void
push(message&& msg) {
queue.push(std::move(msg));
new_msg_notification.post();
}
public: const message
wait_and_pop() {
new_msg_notification.wait();
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
public: semaphore::interrupt&
get_interrupt() const { return new_msg_notification.get_interrupt(); }
};
My actor
, is now able to interrupt its thread with very low latency in its thread. The implementation presently like this:
class
actor {
private: message_queue
incoming_msgs;
/// must be declared after incoming_msgs due to construction order!
private: semaphore::interrupt&
interrupt;
private: std::thread
my_thread;
private: std::exception_ptr
exception;
public:
actor()
: interrupt(incoming_msgs.get_interrupt()), my_thread(
[&]{
try {
run_actor();
}
catch(...) {
exception = std::current_exception();
}
})
{}
private: virtual void
run_actor() {
while(!interrupt.is_triggered())
process(incoming_msgs.wait_and_pop());
};
private: virtual void
process(const message&) = 0;
public: void
notify(message&& msg_in) {
incoming_msgs.push(std::forward<message>(msg_in));
}
public: virtual
~actor() throw (interrupt_exception) {
interrupt.trigger();
my_thread.join();
if(exception)
std::rethrow_exception(exception);
}
};