Non-obvious lifetime issue with std::promise and s

2019-03-15 16:48发布

问题:

This question is very similar to a previous one here: race-condition in pthread_once()?

It is essentially the same issue - the lifetime of a std::promise ending during a call to promise::set_value (ie: after the associated future has been flagged, but before pthread_once has executed)

So I know that my usage has this issue, and that I therefore cannot use it in this way. However, I think this is non-obvious. (In the wise words of Scott Meyer: Make Interfaces Easy to Use Correctly and Hard to Use Incorrectly)

I present an exemplar below:

  • I have a thread (dispatcher) which spins on a queue, popping a 'job' (a std::function) and executing it.
  • I have a utility class called synchronous_job which blocks the calling thread until the 'job' has been executed on the dispatcher thread
  • The std::promise and std::future are members of synchronous_job - once the future is set, the blocked calling thread continues, which results in the synchronous_job popping off the stack and being destructed.
  • Unfortunately, at this time the dispatcher was context switched whilst inside promise::set_value; the future is flagged, but the call to pthread_once hasn't executed, and the pthread stack is somehow corrupted, meaning next time around: deadlock

I would expect a call to promise::set_value to be atomic; the fact that it needs to do more work after it has flagged the future will inevitably lead to this kind of issue when using these classes in this manner.

So my question is: How to achieve this kind of synchronisation using std::promise and std::future, keeping their lifetime associated with the class which provides this synchronisation mechanism?

@Jonathan Wakely, could you perhaps use some RAII-style class internally which sets the condition_variable in its destructor after it flags the future? This would mean that even if the promise is destructed in the midst of a call to set_value, the additional work of setting the condition variable would complete correctly. Just an idea, not sure if you can use it...

A full working example below, and the stack trace of the deadlocked app after:

#include <iostream>
#include <thread>
#include <future>
#include <queue>

struct dispatcher
{
    dispatcher()
    {
        _thread = std::move(std::thread(&dispatcher::loop, this));
    }
    void post(std::function<void()> job)
    {
        std::unique_lock<std::mutex> l(_mtx);
        _jobs.push(job);
        _cnd.notify_one();
    }
private:
    void loop()
    {
        for (;;)
        {
            std::function<void()> job;
            {
                std::unique_lock<std::mutex> l(_mtx);
                while (_jobs.empty())
                    _cnd.wait(l);
                job.swap(_jobs.front());
                _jobs.pop();
            }
            job();
        }
    }
    std::thread                       _thread;
    std::mutex                        _mtx;
    std::condition_variable           _cnd;
    std::queue<std::function<void()>> _jobs;
};
//-------------------------------------------------------------

struct synchronous_job
{
    synchronous_job(std::function<void()> job, dispatcher& d)
        : _job(job)
        , _d(d)
        , _f(_p.get_future())
    {
    }
    void run()
    {
        _d.post(std::bind(&synchronous_job::cb, this));
        _f.wait();
    }
private:
    void cb()
    {
        _job();
        _p.set_value();
    }
    std::function<void()> _job;
    dispatcher&           _d;
    std::promise<void>    _p;
    std::future<void>     _f;
};
//-------------------------------------------------------------

struct test
{
    test()
        : _count(0)
    {
    }
    void run()
    {
        synchronous_job job(std::bind(&test::cb, this), _d);
        job.run();
    }
private:
    void cb()
    {
        std::cout << ++_count << std::endl;
    }
    int _count;
    dispatcher _d;
};
//-------------------------------------------------------------

int main()
{
    test t;
    for (;;)
    {
        t.run();
    }
}

The stack trace of the deadlocked app:

Thread 1 (main thread)

#0  0x00007fa112ed750c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007fa112a308ec in __gthread_cond_wait (__mutex=<optimized out>, __cond=<optimized out>) at /hostname/tmp/syddev/Build/gcc-4.6.2/gcc-build/x86_64-unknown-linux-gnu/libstdc++-v3/include/x86_64-unknown-linux-gnu/bits/gthr-default.h:846
#2  std::condition_variable::wait (this=<optimized out>, __lock=...) at ../../../../libstdc++-v3/src/condition_variable.cc:56
#3  0x00000000004291d9 in std::condition_variable::wait<std::__future_base::_State_base::wait()::{lambda()#1}>(std::unique_lock<std::mutex>&, std::__future_base::_State_base::wait()::{lambda()#1}) (this=0x78e050, __lock=..., __p=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/condition_variable:93
#4  0x00000000004281a8 in std::__future_base::_State_base::wait (this=0x78e018) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:331
#5  0x000000000042a2d6 in std::__basic_future<void>::wait (this=0x7fff0ae515c0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:576
#6  0x0000000000428dd8 in synchronous_job::run (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:60
#7  0x0000000000428f97 in test::run (this=0x7fff0ae51660) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:83
#8  0x0000000000427ad6 in main () at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:99

Thread 2 (dispatcher thread)

#0  0x00007fa112ed8b5b in pthread_once () from /lib64/libpthread.so.0
#1  0x0000000000427946 in __gthread_once (__once=0x78e084, __func=0x4272d0 <__once_proxy@plt>) at /hostname/sdk/gcc470/suse11/x86_64/bin/../lib/gcc/x86_64-unknown-linux-gnu/4.7.0/../../../../include/c++/4.7.0/x86_64-unknown-linux-gnu/bits/gthr-default.h:718
#2  0x000000000042948b in std::call_once<void (std::__future_base::_State_base::*)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >, std::reference_wrapper<bool> >(std::once_flag&, void (std::__future_base::_State_base::*&&)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const&&, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >&&, std::reference_wrapper<bool>&&) (__once=..., __f=
    @0x7fa111ff6be0: (void (std::__future_base::_State_base::*)(std::__future_base::_State_base * const, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>()> &, bool &)) 0x42848a <std::__future_base::_State_base::_M_do_set(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&)>) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/mutex:819
#3  0x000000000042827d in std::__future_base::_State_base::_M_set_result(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>, bool) (this=0x78e018, __res=..., __ignore_failure=false) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:362
#4  0x00000000004288d5 in std::promise<void>::set_value (this=0x7fff0ae515a8) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:1206
#5  0x0000000000428e2a in synchronous_job::cb (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:66
#6  0x000000000042df53 in std::_Mem_fn<void (synchronous_job::*)()>::operator() (this=0x78c6e0, __object=0x7fff0ae51580) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:554
#7  0x000000000042d77c in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::__call<void, , 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) (this=0x78c6e0, __args=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1156
#8  0x000000000042cb28 in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::operator()<, void>() (this=0x78c6e0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1215
#9  0x000000000042b772 in std::_Function_handler<void (), std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)> >::_M_invoke(std::_Any_data const&) (__functor=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1926
#10 0x0000000000429f2c in std::function<void ()>::operator()() const (this=0x7fa111ff6da0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:2311
#11 0x0000000000428c3c in dispatcher::loop (this=0x7fff0ae51668) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:39

回答1:

std::promise is just like any other object: you can only access it from one thread at a time. In this case, you are calling set_value() and destroying the object from separate threads without sufficient synchronization: nowhere in the spec does it say that set_value will not touch the promise object after making the future ready.

However, since this future is used for a one-shot synchronization, you don't need to do that anyway: create the promise/future pair right in run(), and pass the promise to the thread:

struct synchronous_job
{
    synchronous_job(std::function<void()> job, dispatcher& d)
        : _job(job)
        , _d(d)
    {
    }
    void run(){
        std::promise<void> p;
        std::future<void> f=p.get_future();

        _d.post(
            [&]{
                cb(std::move(p));
            });

        f.wait();
    }
private:
    void cb(std::promise<void> p)
    {
        _job();
        p.set_value();
    }
    std::function<void()> _job;
    dispatcher&           _d;
};


回答2:

In direct answer to your question, the correct answer is to give the std::promise to the thread. That way, it's guaranteed to exist as long as the thread wants it.

Under the hood, the std::future and std::promise have a shared state that both point to, and is guaranteed to remain available until both sides a destroyed. Conceptually, this is similar to both the promise and the future both having individual copies of a shared_ptr to the same object. This object contains the necessary underlying mechanisms to pass state, block, and other operations.

As for attempting to signal on destruction, the problem is where would this condition variable exist? The shared area is destroyed once all of the associated futures and promises are destroyed. The deadlock is occurring because the area is being destroyed while it's still being used (because the compiler is unaware another thread is still accessing the promise as it's being destroyed). Adding additional condition variables to any shared state would not help, as they also would be destroyed.



回答3:

Answering my own question, to offer a workable solution. It doesn't use std::promise or std::future, but it achieves the synchronisation which I'm searching for.

Update synchronous_job to use a std::condition_variable and std::mutex instead:

Edit: Updated to include a boolean flag as suggested by Dave S

struct synchronous_job
{
    synchronous_job(std::function<void()> job, dispatcher& d)
        : _job(job)
        , _d(d)
        , _done(false)
    {
    }
    void run()
    {
        _d.post(std::bind(&synchronous_job::cb, this));
        std::unique_lock<std::mutex> l(_mtx);
        if (!_done)
            _cnd.wait(l);
    }
private:
    void cb()
    {
        _job();
        std::unique_lock<std::mutex> l(_mtx);
        _done = true;
        _cnd.notify_all();
    }
    std::function<void()>   _job;
    dispatcher&             _d;
    std::condition_variable _cnd;
    std::mutex              _mtx;
    bool                    _done;
};


回答4:

The canonical answer is to never std::bind to this but rather to a std::weak_ptr. When you get the callback, lock() it and check for NULL before invoking the callback.

Or, re-stated, never call a member function (from outside) from a scope that doesn't hold a shared_ptr to the object.