Why might this thread management pattern result in

2019-06-24 06:19发布

问题:

I'm using a common base class has_threads to manage any type that should be allowed to instantiate a boost::thread.

Instances of has_threads each own a set of threads (to support waitAll and interruptAll functions, which I do not include below), and should automatically invoke removeThread when a thread terminates to maintain this set's integrity.

In my program, I have just one of these. Threads are created on an interval every 10s, and each performs a database lookup. When the lookup is complete, the thread runs to completion and removeThread should be invoked; with a mutex set, the thread object is removed from internal tracking. I can see this working properly with the output ABC.

Once in a while, though, the mechanisms collide. removeThread is executed perhaps twice concurrently. What I can't figure out is why this results in a deadlock. All thread invocations from this point never output anything other than A. [It's worth noting that I'm using thread-safe stdlib, and that the issue remains when IOStreams are not used.] Stack traces indicate that the mutex is locking these threads, but why would the lock not be eventually released by the first thread for the second, then the second for the third, and so on?

Am I missing something fundamental about how scoped_lock works? Is there anything obvious here that I've missed that could lead to a deadlock, despite (or even due to?) the use of a mutex lock?

Sorry for the poor question, but as I'm sure you're aware it's nigh-on impossible to present real testcases for bugs like this.

class has_threads {
    protected:
        template <typename Callable>
        void createThread(Callable f, bool allowSignals)
        {
            boost::mutex::scoped_lock l(threads_lock);

            // Create and run thread
            boost::shared_ptr<boost::thread> t(new boost::thread());

            // Track thread
            threads.insert(t);

            // Run thread (do this after inserting the thread for tracking so that we're ready for the on-exit handler)
            *t = boost::thread(&has_threads::runThread<Callable>, this, f, allowSignals);
        }

    private:

        /**
         * Entrypoint function for a thread.
         * Sets up the on-end handler then invokes the user-provided worker function.
         */
        template <typename Callable>
        void runThread(Callable f, bool allowSignals)
        {
            boost::this_thread::at_thread_exit(
                boost::bind(
                    &has_threads::releaseThread,
                    this,
                    boost::this_thread::get_id()
                )
            );

            if (!allowSignals)
                blockSignalsInThisThread();


            try {
                f();
            }
            catch (boost::thread_interrupted& e) {

                // Yes, we should catch this exception!
                // Letting it bubble over is _potentially_ dangerous:
                // http://stackoverflow.com/questions/6375121

                std::cout << "Thread " << boost::this_thread::get_id() << " interrupted (and ended)." << std::endl;
            }
            catch (std::exception& e) {
                std::cout << "Exception caught from thread " << boost::this_thread::get_id() << ": " << e.what() << std::endl;
            }
            catch (...) {
                std::cout << "Unknown exception caught from thread " << boost::this_thread::get_id() << std::endl;
            }
        }

        void has_threads::releaseThread(boost::thread::id thread_id)
        {
            std::cout << "A";
            boost::mutex::scoped_lock l(threads_lock);

            std::cout << "B";
            for (threads_t::iterator it = threads.begin(), end = threads.end(); it != end; ++it) {

                if ((*it)->get_id() != thread_id)
                    continue;

                threads.erase(it);
                break;
            }
            std::cout << "C";
        }

        void blockSignalsInThisThread()
        {
            sigset_t signal_set;
            sigemptyset(&signal_set);
            sigaddset(&signal_set, SIGINT);
            sigaddset(&signal_set, SIGTERM);
            sigaddset(&signal_set, SIGHUP);
            sigaddset(&signal_set, SIGPIPE); // http://www.unixguide.net/network/socketfaq/2.19.shtml
            pthread_sigmask(SIG_BLOCK, &signal_set, NULL);
        }


        typedef std::set<boost::shared_ptr<boost::thread> > threads_t;
        threads_t threads;

        boost::mutex threads_lock;
};

struct some_component : has_threads {
    some_component() {
        // set a scheduler to invoke createThread(bind(&some_work, this)) every 10s
    }

    void some_work() {
        // usually pretty quick, but I guess sometimes it could take >= 10s
    }
};

回答1:

Well, a deadlock might occurs if the same thread lock a mutex it has already locked (unless you use a recursive mutex).

If the release part is called a second time by the same thread as it seems to happen with your code, you have a deadlock.

I have not studied your code in details, but you probably have to re-design your code (simplify ?) to be sure that a lock can not be acquired twice by the same thread. You can probably use a safeguard checking for the ownership of the lock ...

EDIT: As said in my comment and in IronMensan answer, one possible case is that the thread stop during creation, the at_exit being called before the release of the mutex locked in the creation part of your code.

EDIT2:

Well, with mutex and scoped lock, I can only imagine a recursive lock, or a lock that is not released. It can happen if a loop goes to infinite due to a memory corruption for instance.

I suggest to add more logs with a thread id to check if there is a recursive lock or something strange. Then I will check that my loop is correct. I will also check that the at_exit is only called once per thread ...

One more thing, check the effect of erasing (thus calling the destructor) of a thread while being in the at_exit function...

my 2 cents



回答2:

You may need to do something like this:

    void createThread(Callable f, bool allowSignals) 
    { 
        // Create and run thread 
        boost::shared_ptr<boost::thread> t(new boost::thread()); 

        {
            boost::mutex::scoped_lock l(threads_lock); 

            // Track thread 
            threads.insert(t);
        } 

        //Do not hold threads_lock while starting the new thread in case
        //it completes immediately

        // Run thread (do this after inserting the thread for tracking so that we're ready for the on-exit handler) 
        *t = boost::thread(&has_threads::runThread<Callable>, this, f, allowSignals); 
    } 

In other words, use thread_lock exclusively to protect threads.

Update:

To expand on something in the comments with speculation about how boost::thread works, the lock patterns could look something like this:

createThread:

  1. (createThread) obtain threads_lock
  2. (boost::thread::opeator =) obtain a boost::thread internal lock
  3. (boost::thread::opeator =) release a boost::thread internal lock
  4. (createThread) release threads_lock

thread end handler:

  1. (at_thread_exit) obtain a boost::thread internal lock
  2. (releaseThread) obtain threads_lock
  3. (releaseThread) release threads_lock
  4. (at_thread_exit) release a boost:thread internal lock

If those two boost::thread locks are the same lock, the potential for deadlock is clear. But this is speculation because much of the boost code scares me and I try not to look at it.

createThread could/should be reworked to move step 4 up between steps one and two and eliminate the potential deadlock.



回答3:

It is possible that the created thread is finishing before or during the assignment operator in createThread is complete. Using an event queue or some other structure that is might be necessary. Though a simpler, though hack-ish, solution might work as well. Don't change createThread since you have to use threads_lock to protect threads itself and the thread objects it points to. Instead change runThread to this:

    template <typename Callable> 
    void runThread(Callable f, bool allowSignals) 
    { 
        //SNIP setup

        try { 
            f(); 
        } 
        //SNIP catch blocks

        //ensure that createThread is complete before this thread terminates
        boost::mutex::scoped_lock l(threads_lock);
    }