Do I need to implement blocking when using boost::

2019-07-08 21:50发布

问题:

My question is, if I run io_service::run () on multiple threads, do I need to implement blocking on these asynchronous functions?

example:

int i = 0;
int j = 0;

void test_timer(boost::system::error_code ec)
{
    //I need to lock up here ?
    if (i++ == 10)
    {
        j = i * 10;
    }
    timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(500));
    timer.async_wait(&test_timer);
}

void threadMain()
{
    io_service.run();
}

int main()
{
    boost::thread_group workers;
    timer.async_wait(&test_timer);

    for (int i = 0; i < 5; i++){
        workers.create_thread(&threadMain);
    }

    io_service.run();
    workers.join_all();
    return 0;
}

回答1:

The definition of async is that it is non-blocking.

If you mean to ask "do I have to synchronize access to shared objects from different threads" - that question is unrelated and the answer depends on the thread-safety documented for the object you are sharing.

For Asio, basically (rough summary) you need to synchronize concurrent access (concurrent as in: from multiple threads) to all types except boost::asio::io_context¹,².

Your Sample

Your sample uses multiple threads running the io service, meaning handlers run on any of those threads. This means that effectively you're sharing the globals and indeed they need protection.

However Because your application logic (the async call chain) dictates that only one operation is ever pending, and the next async operation on the shared timer object is always scheduled from within that chain, the access is logically all from a single thread (called an implicit strand. See Why do I need strand per connection when using boost::asio?

The simplest thing that would work:

Logical Strand

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>

boost::asio::io_service io_service;
boost::asio::deadline_timer timer { io_service };

struct state_t {
    int i = 0;
    int j = 0;
} state;

void test_timer(boost::system::error_code ec)
{
    if (ec != boost::asio::error::operation_aborted) {
        {
            if (state.i++ == 10) {
                state.j = state.i * 10;
                if (state.j > 100)
                    return; // stop after 5 seconds
            }
        }
        timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(50));
        timer.async_wait(&test_timer);
    }
}

int main()
{
    boost::thread_group workers;
    timer.expires_from_now(boost::posix_time::milliseconds(50));
    timer.async_wait(&test_timer);

    for (int i = 0; i < 5; i++){
        workers.create_thread([] { io_service.run(); });
    }

    workers.join_all();
    std::cout << "i = " << state.i << std::endl;
    std::cout << "j = " << state.j << std::endl;
}

Note I removed the io_service::run() from the main thread as it is redundant with the join() (unless you really wanted 6 threads running the handlers, not 5).

Prints

i = 11
j = 110

Caveat

There's a pitfall lurking here. Say, you didn't want to bail at a fixed number, like I did, but want to stop, you'd be tempted to do:

timer.cancel();

from main. That's not legal, because the deadline_timer object is not thread safe. You'd need to either

  • use a global atomic_bool to signal the request for termination
  • post the timer.cancel() on the same strand as the timer async chain. However, there is only an explicit strand, so you can't do it without changing the code to use an explicit strand.

More Timers

Let's complicate things by having two timers, with their own implicit strands. This means access to the timer instances still need not be synchronized, but access to i and j does need to be.

Note In this demo I use synchronized_value<> for elegance. You can write similar logic manually using mutex and lock_guard.

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>

boost::asio::io_service io_service;

struct state {
    int i = 0;
    int j = 0;
};

boost::synchronized_value<state> shared_state;

struct TimerChain {
    boost::asio::deadline_timer _timer;

    TimerChain() : _timer{io_service} {
        _timer.expires_from_now(boost::posix_time::milliseconds(50));
        resume();
    }

    void resume() {
        _timer.async_wait(boost::bind(&TimerChain::test_timer, this, _1));
    };

    void test_timer(boost::system::error_code ec)
    {
        if (ec != boost::asio::error::operation_aborted) {
            {
                auto state = shared_state.synchronize();
                if (state->i++ == 10) {
                    state->j = state->i * 10;
                }
                if (state->j > 100) return; // stop after some iterations
            }
            _timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
            resume();
        }
    }
};

int main()
{
    boost::thread_group workers;
    TimerChain timer1;
    TimerChain timer2;

    for (int i = 0; i < 5; i++){
        workers.create_thread([] { io_service.run(); });
    }

    workers.join_all();
    auto state = shared_state.synchronize();
    std::cout << "i = " << state->i << std::endl;
    std::cout << "j = " << state->j << std::endl;
}

Prints

i = 12
j = 110

Adding The Explicit Strands

Now it's pretty straight-forward to add them:

struct TimerChain {
    boost::asio::io_service::strand _strand;
    boost::asio::deadline_timer _timer;

    TimerChain() : _strand{io_service}, _timer{io_service} {
        _timer.expires_from_now(boost::posix_time::milliseconds(50));
        resume();
    }

    void resume() {
        _timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
    };

    void stop() { // thread safe
        _strand.post([this] { _timer.cancel(); });
    }

    // ...

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>

boost::asio::io_service io_service;

struct state {
    int i = 0;
    int j = 0;
};

boost::synchronized_value<state> shared_state;

struct TimerChain {
    boost::asio::io_service::strand _strand;
    boost::asio::deadline_timer _timer;

    TimerChain() : _strand{io_service}, _timer{io_service} {
        _timer.expires_from_now(boost::posix_time::milliseconds(50));
        resume();
    }

    void resume() {
        _timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
    };

    void stop() { // thread safe
        _strand.post([this] { _timer.cancel(); });
    }

    void test_timer(boost::system::error_code ec)
    {
        if (ec != boost::asio::error::operation_aborted) {
            {
                auto state = shared_state.synchronize();
                if (state->i++ == 10) {
                    state->j = state->i * 10;
                }
            }
            // continue indefinitely
            _timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
            resume();
        }
    }
};

int main()
{
    boost::thread_group workers;
    TimerChain timer1;
    TimerChain timer2;

    for (int i = 0; i < 5; i++){
        workers.create_thread([] { io_service.run(); });
    }

    boost::this_thread::sleep_for(boost::chrono::seconds(10));
    timer1.stop();
    timer2.stop();

    workers.join_all();

    auto state = shared_state.synchronize();
    std::cout << "i = " << state->i << std::endl;
    std::cout << "j = " << state->j << std::endl;
}

Prints

i = 400
j = 110

¹ (or using the legacy name boost::asio::io_service)

² lifetime mutations are not considered member operations in this respect (you have to manually synchronize construction/destruction of shared objects even for thread-safe objects)