Boost - periodic task scheduler

2019-08-05 19:41发布

问题:

I'm trying to figure out a simple scheduler for periodic tasks. The idea is to provide a mean to schedule periodic execution of std::function<void()> with any given time interval which would be a multiplication of a second. I was trying to write it with the use of boost::asio, but so far I end up with strange behaviour - only one of two scheduled tasks is being repeatedly executed, but it don't follow the interval.

Here is the code:

#include <functional>
#include <iostream>

#include <boost/asio.hpp>
#include <boost/bind.hpp>

class PeriodicTask
{
public: 
     PeriodicTask(boost::asio::io_service * ioService, int interval, std::function<void()> task)
     : ioService(ioService), 
       interval(interval), 
       task(std::make_shared<std::function<void()>>(task)),
       timer(std::make_shared<boost::asio::deadline_timer>(*ioService, boost::posix_time::seconds(interval)))
    {}

    void execute()
    {
        task->operator()();
        timer->expires_at(timer->expires_at() + boost::posix_time::seconds(interval));
        timer->async_wait(boost::bind(&PeriodicTask::execute,this));
    }

private:
     std::shared_ptr<boost::asio::io_service> ioService;
     std::shared_ptr<boost::asio::deadline_timer> timer;
     std::shared_ptr<std::function<void()>> task;
     int interval;
};

class PeriodicScheduler
{
public:
    void run()
    {
        for each (auto task in tasks)
        {
            task.execute();
        }
        io_service.run();
    }
    void  addTask(std::function<void()> task, int interval)
    {
        tasks.push_back(PeriodicTask(&io_service, interval, task));
    }
    boost::asio::io_service io_service;

private:
    std::vector<PeriodicTask> tasks;
};


void printCPUUsage()
{
    std::cout << "CPU usage: " << std::endl;
}

void printMemoryUsage()
{
    std::cout << "CPU usage: " << std::endl;
}
int _tmain(int argc, _TCHAR* argv[])
{   
    PeriodicScheduler scheduler;

    scheduler.addTask(printCPUUsage, 5);
    scheduler.addTask(printMemoryUsage, 10);

    scheduler.run();

    return 0;
}

Does anyone know what could cause the problem ? Or happen to know better approach to the problem ?

Many Thanks!

回答1:

Analysis

The main culprit seems to be in the non-standard for each (auto task in tasks) (a Microsoft extension), which is basically equivalent of for (auto task : tasks). This means that you copy the elements of the tasks vector as you iterate over them, and work with the copy inside the loop body.

This becomes relevant in PeriodicTask::execute, specifically in

timer->async_wait(boost::bind(&PeriodicTask::execute, this));

where this points to the aforementioned copy, not the object stored in the vector.

We can add some simple debugging traces, to print the address of the objects in the vector as well as the address of the object on which execute is being invoked. Also reserve some space in the vector, so that no reallocations happen to simplify things.

When we run it, we'll see something like this in the console:

>example.exe
02-11-2016 20-04-36 created this=22201304
02-11-2016 20-04-36 created this=22201332
02-11-2016 20-04-36 execute this=19922484
02-11-2016 20-04-36 CPU usage
02-11-2016 20-04-36 execute this=19922484
02-11-2016 20-04-36 Memory usage
02-11-2016 20-04-46 execute this=19922484
02-11-2016 20-04-46 Memory usage
02-11-2016 20-04-46 execute this=19922484
02-11-2016 20-04-46 Memory usage
02-11-2016 20-04-46 execute this=19922484
02-11-2016 20-04-46 Memory usage
02-11-2016 20-04-46 execute this=19922484
.... and so on and on and on....

Let's analyse it a little. Let us assume that t refers to the starting time.

  • Line 1: Created CPU timer @ address 22201304, set to expire at t + 5 seconds.
  • Line 2: Created Memory timer @ address 22201332, set to expire at t + 10 seconds.
  • Lines 3,4: Made copy of CPU timer to address 19922484. Ran the handler. Scheduled CPU timer to run execute on object at address 19922484 at t + 5 + 5 seconds.
  • Lines 5,6: Made copy of Memory timer to address 19922484. Ran the handler. Scheduled Memory timer to run execute on object at address 19922484 in t + 10 + 10 seconds.

At this stage, we have two timers pending, one in 10 seconds, one in 20 seconds from startup. Both of them are scheduled to run member function execute on an object at address 19922484, which doesn't exist anymore at that point (it was a temporary in the for loop). By chance, the memory still contains the data from the last object that occupied that location -- the copy of the Memory task.

Time passes...

  • Lines 7,8: The CPU timer fires, and runs execute on object at address 19922484. As explained above, this means the method is running in context of the copy of the Memory task. Therefore we see "Memory usage" printed.

At this point, a timer is rescheduled. Due to our context, instead of rescheduling the CPU timer, we reschedule the still pending Memory timer. This causes the pending asynchronous wait operation to be cancelled, which will in turn result in the expiration handler being called and passed the error code boost::asio::error::operation_aborted. Your expiration handler, however, ignores the error codes. Thus

  • Lines 9,10: Cancellation triggers the Memory timer expiration handler, execute runs on object at address 19922484. As explained above, this means the method is running in context of the copy of the Memory task. Therefore we see "Memory usage" printed. There is already a pending asynchronous wait on the Memory timer, so we cause another cancellation when rescheduling.

  • Lines 11,12: Cancellation ... you get the gist.

Simple Fix

Change the for loop to use a reference.

for (auto& task : tasks) {
    // ....
}

Console output:

>so02.exe
02-11-2016 20-39-30 created this=19628176
02-11-2016 20-39-30 created this=19628204
02-11-2016 20-39-30 execute this=19628176
02-11-2016 20-39-30 CPU usage
02-11-2016 20-39-30 execute this=19628204
02-11-2016 20-39-30 Memory usage
02-11-2016 20-39-40 execute this=19628176
02-11-2016 20-39-40 CPU usage
02-11-2016 20-39-45 execute this=19628176
02-11-2016 20-39-45 CPU usage
02-11-2016 20-39-50 execute this=19628176
02-11-2016 20-39-50 CPU usage
02-11-2016 20-39-50 execute this=19628204
02-11-2016 20-39-50 Memory usage
02-11-2016 20-39-55 execute this=19628176
02-11-2016 20-39-55 CPU usage

Further Analysis

We have fixed one small problem, however there are several other more or less serious issues with the code you presented.

A bad one is that you initialize a std::shared_ptr<boost::asio::io_service> with an address to already existing io_service instance (the member of PeriodicScheduler).

The code is in essence like:

boost::asio::io_service io_service;
std::shared_ptr<boost::asio::io_service> ptr1(&io_service);
std::shared_ptr<boost::asio::io_service> ptr2(&io_service);

which creates 3 owners of that object that don't know about each other.

Class PeriodicTask shouldn't be copyable -- it doesn't make sense, and would avoid the primary problem solved above. My guess would be that those shared pointers in it were an attempt to solve a problem with it being copied (and io_service being non-copyable itself).

Finally, the completion handler for the timer should have a boost::system::error_code const& parameter and at the least handle cancellation correctly.

Complete Solution

Let's start with includes, and a little convenience logging function.

#include <ctime>
#include <iostream>
#include <iomanip>
#include <functional>

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/noncopyable.hpp>

void log_text(std::string const& text)
{
    auto t = std::time(nullptr);
    auto tm = *std::localtime(&t);
    std::cout << std::put_time(&tm, "%d-%m-%Y %H-%M-%S") << " " << text << std::endl;
}

Next, let's make PeriodicTask explicitly non-copyable and hold a reference to the io_service instance. This means we can avoid the other shared pointers as well. We can write a separate method to start the timer first time, and post it on the io_service, so that it's executed by run(). Finally, let's modify our completion handler to handle the error states, and behave correctly when cancelled.

class PeriodicTask : boost::noncopyable
{
public:
    typedef std::function<void()> handler_fn;

    PeriodicTask(boost::asio::io_service& ioService
        , std::string const& name
        , int interval
        , handler_fn task)
        : ioService(ioService)
        , interval(interval)
        , task(task)
        , name(name)
        , timer(ioService)
    {
        log_text("Create PeriodicTask '" + name + "'");
        // Schedule start to be ran by the io_service
        ioService.post(boost::bind(&PeriodicTask::start, this));
    }

    void execute(boost::system::error_code const& e)
    {
        if (e != boost::asio::error::operation_aborted) {
            log_text("Execute PeriodicTask '" + name + "'");

            task();

            timer.expires_at(timer.expires_at() + boost::posix_time::seconds(interval));
            start_wait();
        }
    }

    void start()
    {
        log_text("Start PeriodicTask '" + name + "'");

        // Uncomment if you want to call the handler on startup (i.e. at time 0)
        // task();

        timer.expires_from_now(boost::posix_time::seconds(interval));
        start_wait();
    }

private:
    void start_wait()
    {
        timer.async_wait(boost::bind(&PeriodicTask::execute
            , this
            , boost::asio::placeholders::error));
    }

private:
    boost::asio::io_service& ioService;
    boost::asio::deadline_timer timer;
    handler_fn task;
    std::string name;
    int interval;
};

Let's have PeriodicScheduler keep a vector of unique_ptr<PeriodicTask>. Since PeriodicTask now handles getting started itself, we can simplify the run method. Finally, let's also make it non-copyable, since copying it doesn't really make much sense.

class PeriodicScheduler : boost::noncopyable
{
public:
    void run()
    {
        io_service.run();
    }

    void addTask(std::string const& name
        , PeriodicTask::handler_fn const& task
        , int interval)
    {
        tasks.push_back(std::make_unique<PeriodicTask>(std::ref(io_service)
            , name, interval, task));
    }

private:
    boost::asio::io_service io_service;
    std::vector<std::unique_ptr<PeriodicTask>> tasks;
};

Now, let's put it all together and try it out.

int main()
{
    PeriodicScheduler scheduler;

    scheduler.addTask("CPU", boost::bind(log_text, "* CPU USAGE"), 5);
    scheduler.addTask("Memory", boost::bind(log_text, "* MEMORY USAGE"), 10);

    log_text("Start io_service");

    scheduler.run();

    return 0;
}

Console output:

>example.exe
02-11-2016 19-20-42 Create PeriodicTask 'CPU'
02-11-2016 19-20-42 Create PeriodicTask 'Memory'
02-11-2016 19-20-42 Start io_service
02-11-2016 19-20-42 Start PeriodicTask 'CPU'
02-11-2016 19-20-42 Start PeriodicTask 'Memory'
02-11-2016 19-20-47 Execute PeriodicTask 'CPU'
02-11-2016 19-20-47 * CPU USAGE
02-11-2016 19-20-52 Execute PeriodicTask 'CPU'
02-11-2016 19-20-52 * CPU USAGE
02-11-2016 19-20-52 Execute PeriodicTask 'Memory'
02-11-2016 19-20-52 * MEMORY USAGE
02-11-2016 19-20-57 Execute PeriodicTask 'CPU'
02-11-2016 19-20-57 * CPU USAGE
02-11-2016 19-21-02 Execute PeriodicTask 'CPU'
02-11-2016 19-21-02 * CPU USAGE
02-11-2016 19-21-02 Execute PeriodicTask 'Memory'
02-11-2016 19-21-02 * MEMORY USAGE