I am trying to achieve synchronization operation for hardware devices controlled by my C++ code.
Suppose Two types of devices are there on which I can perform Open/Close
.
What I need to achieve is Open one type of device for Specified Duration
. Same is true for Second type Of device.
I have written code with boost::deadline_timer
:
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
class Test : public std::enable_shared_from_this <Test>
{
public:
Test() :io_(), timerOne_(io_),timerTwo_(io_){}
void Open(int num);
void Close(int num);
void TimedOpen(int num, int dur);
void Run();
private:
boost::asio::io_service io_;
boost::asio::deadline_timer timerOne_;
boost::asio::deadline_timer timerTwo_;
};
void Test::Open(int type)
{
std::cout << "Open for Number : " << type << std::endl;
}
void Test::Close(int type)
{
std::cout << "Close for Number : " << type << std::endl;
}
void Test::TimedOpen(int type, int dur)
{
switch (type)
{
case 1:
{
io_.reset();
auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
fn(type);
timerOne_.expires_from_now(boost::posix_time::seconds(dur));
timerOne_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
Run();
std::cout << "Function Exiting" << std::endl;
std::cout << "-----------------------------------------------" << std::endl;
return;
}
case 2:
{
io_.reset();
auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
fn(type);
timerTwo_.expires_from_now(boost::posix_time::seconds(dur));
timerTwo_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
Run();
std::cout << "Function Exiting" << std::endl;
std::cout << "-----------------------------------------------" << std::endl;
return;
}
}
}
void Test::Run()
{
boost::thread th(boost::bind(&boost::asio::io_service::run, &io_));
}
int main()
{
auto t = std::make_shared<Test>();
t->TimedOpen(1, 60);
t->TimedOpen(2, 30);
t->TimedOpen(1, 5);
t->TimedOpen(2, 2);
char line[128];
while (std::cin.getline(line, 128))
{
if (strcmp(line, "\n")) break;
}
return 0;
}
The Output is:
Open for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Function Exiting
-----------------------------------------------
Open for Number : 1
Close for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Close for Number : 2
Function Exiting
-----------------------------------------------
Close for Number : 2
Close for Number : 1
For timerOne_
It does not wait for previous wait
to expire i.e. as soon as t->TimedOpen(1, 5)
is executed the previous action t->TimedOpen(1, 60)
is cancelled.
So Close for Number : 1
appears in output without waiting for t->TimedOpen(1, 60)
.
What I want to achieve is that if multiple waits are encountered
for any type of timer
, all the operations should be queued i.e.
If I type:
t->TimedOpen(1, 60);
t->TimedOpen(1, 10);
t->TimedOpen(1, 5);
It should do TimedOpen Operation
for 60+10+5
seconds. Currently it does only for 5 secs. Also It should be non blocking i.e. I can not use wait() instead of async_wait()
.
How do I achieve it?
Summary:
My requirement is to schedule operations on a boost::deadline_timer()
i.e. multiple operations on it will be queued unless previous wait is expired.
Like was mentioned in a comment, you will want to have queues per "type".
Let's name the per-type queue a "session".
By chaining all async waits from a single queue on a single strand
¹ you get effective serialization (also avoids synchronization on the queue/session).
The only tricky bit is to start async wait when none is in flight. The invariant is that async operations are in flight iff !queue_.empty()
:
struct Session : std::enable_shared_from_this<Session> {
Session(boost::asio::io_service &io, int type) : strand_(io), timer_(io), type(type) {}
void Enqueue(int duration) {
auto This = shared_from_this();
strand_.post([This,duration,this] {
std::cout << "t0 + " << std::setw(4) << mark() << "ms Enqueue for Number: " << type << " (dur:" << duration << ")\n";
This->queue_.push(duration);
if (This->queue_.size() == 1)
This->Wait();
});
}
private:
boost::asio::strand strand_;
boost::asio::deadline_timer timer_;
int type;
std::queue<int> queue_;
void Close() {
assert(!queue_.empty());
std::cout << "t0 + " << std::setw(4) << mark() << "ms Close for Number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n";
queue_.pop();
Wait();
}
void Wait() {
if (!queue_.empty()) {
std::cout << "t0 + " << std::setw(4) << mark() << "ms Open for Number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n";
timer_.expires_from_now(boost::posix_time::milliseconds(queue_.front()));
timer_.async_wait(strand_.wrap(std::bind(&Session::Close, shared_from_this())));
}
}
};
Now the Test
class becomes much simpler (in fact it doesn't need to be "shared" at all, but I've left that detail as the proverbial exercise for the reader):
class Test : public std::enable_shared_from_this<Test> {
using guard = boost::lock_guard<boost::mutex>;
public:
Test() : io_(), work_(boost::asio::io_service::work(io_)) {
io_thread = boost::thread([this] { io_.run(); });
}
void TimedOpen(int num, int duration);
void Stop() {
{
guard lk(mx_);
if (work_) work_.reset();
}
io_thread.join();
}
~Test() {
Stop();
guard lk(mx_);
timers_ex_.clear();
}
private:
mutable boost::mutex mx_;
boost::asio::io_service io_;
boost::optional<boost::asio::io_service::work> work_;
std::map<int, std::shared_ptr<Session> > timers_ex_;
boost::thread io_thread;
};
void Test::TimedOpen(int type, int duration) {
guard lk(mx_);
auto &session = timers_ex_[type];
if (!session) session = std::make_shared<Session>(io_, type);
session->Enqueue(duration);
}
As you can see I've
- extrapolated to any number of types
- made operations thread-safe
- added relative timestamps in milliseconds since
t0
- fixed the completely broken
io_service
lifetime. Now, construction starts the service. The work_
variable keeps it alive when idle.
Stop()
shuts it down (draining the session queues first).
- Destruction calls
Stop()
implicitly
Here's a test run:
Live On Coliru
int main() {
auto t = std::make_shared<Test>();
t->TimedOpen(1, 300);
t->TimedOpen(2, 150);
t->TimedOpen(1, 50);
t->TimedOpen(2, 20);
boost::this_thread::sleep_for(boost::chrono::milliseconds(400));
std::cout << "================\n";
t->TimedOpen(1, 50);
t->TimedOpen(2, 20);
t->TimedOpen(1, 300);
t->TimedOpen(2, 150);
t->Stop();
}
Prints
t0 + 0ms Enqueue for Number: 1 (dur:300)
t0 + 0ms Open for Number : 1 (dur:300) (depth 1)
t0 + 0ms Enqueue for Number: 2 (dur:150)
t0 + 0ms Open for Number : 2 (dur:150) (depth 1)
t0 + 0ms Enqueue for Number: 1 (dur:50)
t0 + 0ms Enqueue for Number: 2 (dur:20)
t0 + 150ms Close for Number : 2 (dur:150) (depth 2)
t0 + 150ms Open for Number : 2 (dur:20) (depth 1)
t0 + 170ms Close for Number : 2 (dur:20) (depth 1)
t0 + 300ms Close for Number : 1 (dur:300) (depth 2)
t0 + 300ms Open for Number : 1 (dur:50) (depth 1)
t0 + 350ms Close for Number : 1 (dur:50) (depth 1)
================
t0 + 400ms Enqueue for Number: 1 (dur:50)
t0 + 400ms Open for Number : 1 (dur:50) (depth 1)
t0 + 400ms Enqueue for Number: 2 (dur:20)
t0 + 400ms Open for Number : 2 (dur:20) (depth 1)
t0 + 400ms Enqueue for Number: 1 (dur:300)
t0 + 400ms Enqueue for Number: 2 (dur:150)
t0 + 420ms Close for Number : 2 (dur:20) (depth 2)
t0 + 420ms Open for Number : 2 (dur:150) (depth 1)
t0 + 450ms Close for Number : 1 (dur:50) (depth 2)
t0 + 450ms Open for Number : 1 (dur:300) (depth 1)
t0 + 570ms Close for Number : 2 (dur:150) (depth 1)
t0 + 750ms Close for Number : 1 (dur:300) (depth 1)
¹ Why do I need strand per connection when using boost::asio?