using zmq::poll in multithreaded c++0x11 program i

2019-06-04 02:40发布

问题:

For a custom server I intent to use the int zmq::poll( zmq_pollitemt_t * items, int nitems, long timeout = -1). function which is I think is a wrapper around the unix poll function but includes zmq::socket_t next to file descriptors. The function works as I expected until I press ctrl+x or run $kill my_server_pid in the terminal. I would expect that the poll to terminate with -1 or throws a zmq::error_t (which derives from std::exception) which includes a errno and the strerr message. This should indicate there was a interrupt. Then my server should handle the signal gracefully and save some data and shut down.

Below I have a fragment of code that demonstrates the problem. First I show a bit of my environment and how I compile it:

    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -v
    Using built-in specs.
    COLLECT_GCC=g++
    COLLECT_LTO_WRAPPER=/usr/lib/gcc/x86_64-linux-gnu/4.6/lto-wrapper
    Target: x86_64-linux-gnu
    Configured with: ../src/configure -v --with-pkgversion='Ubuntu/Linaro      4.6.3-1ubuntu5' --with-bugurl=file:///usr/share/doc/gcc-4.6/README.Bugs --enable-languages=c,c++,fortran,objc,obj-c++ --prefix=/usr --program-suffix=-4.6 --enable-shared --enable-linker-build-id --with-system-zlib --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --with-gxx-include-dir=/usr/include/c++/4.6 --libdir=/usr/lib --enable-nls --with-sysroot=/ --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --enable-gnu-unique-object --enable-plugin --enable-objc-gc --disable-werror --with-arch-32=i686 --with-tune=generic --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu
    Thread model: posix
    gcc version 4.6.3 (Ubuntu/Linaro 4.6.3-1ubuntu5) 
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ pkg-config --modversion `enter code here`libzmq
    2.2.0
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -pthread -std=c++0x -Wall -Wextra -pedantic -o poll polling.cpp $(pkg-config --cflags --libs libzmq )
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$

and now the code of polling.cpp :

#include <zmq.hpp>
#include <thread>
#include <cstdlib>
#include <string>
#include <iostream>
#include <signal.h>

const char* bind_addres = "tcp://*:2345";
const char* connect_addres = "tcp://localhost:2345";

inline void
send_str( zmq::socket_t& sock, const std::string& s) throw (zmq::error_t) {
    zmq::message_t msg ( s.size() );
    memcpy( msg.data(), s.c_str(), msg.size() );
    sock.send( msg );
}

inline void recv_str( zmq::socket_t& sock, std::string& s) throw( zmq::error_t) {
    zmq::message_t msg;
    sock.recv(&msg);
    s = std::string( static_cast<const char*>(msg.data()), msg.size());
}
static int interrupted = 0;

static void handle_signal ( int signal ) 
{
    interrupted = signal;
    std::cerr << "Interrupted by signal: " << signal << std::endl;
}

void catch_signals (void) 
{
    struct sigaction action;
    action.sa_handler = handle_signal;
    action.sa_flags = 0;
    sigemptyset(&action.sa_mask);

    sigaction(SIGINT,  &action, NULL);
    sigaction(SIGTERM, &action, NULL);
}

int get_interrupted(void)
{
    std::cout << "interrupted = " << interrupted << std::endl;
    return interrupted;
}

void req_thread ( zmq::context_t* c ){


    zmq::socket_t sock( *c, ZMQ_REQ);
    sock.connect(connect_addres);

    zmq::pollitem_t items[]{
        { sock, 0, ZMQ_POLLIN, 0}
    };

    while (true){
        try {
            // zmq 3.x.x takes ms instead of us so change to eg 1000 or be patient.
            int rc = zmq::poll(items, 1, 1000000);
            if (rc > 0){
                if ( items[0].revents & ZMQ_POLLIN){
                    std::string s;
                    recv_str(sock, s);
                    std::cout << s << std::endl;
                }
            }
            else if ( rc == 0){ //timeout
                send_str( sock, "Hello");
            }
            else{
                std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl;
            }
        }
        catch( zmq::error_t& e ){
            std::cout << __func__ << " " << __LINE__ << e.what() << std::endl;
        }
    }
}

void rep_thread ( zmq::context_t* c ){

    zmq::socket_t sock( *c, ZMQ_REP);
    sock.bind(bind_addres);

    zmq::pollitem_t items[]{
        { sock, 0, ZMQ_POLLIN, 0}
    };

    while (true){
        try{
            int rc = zmq::poll(items, 1 , -1);
            if (rc > 0){
                if ( items[0].revents & ZMQ_POLLIN){
                    std::string s;
                    recv_str(sock, s);
                    s+=" world!";
                    send_str(sock, s);
                }
            }
            else{
                std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl;
            }
        }
        catch( zmq::error_t& e ){
            std::cout << __func__ << " " << __LINE__ << e.what() << std::endl;
        }
    }
}

int main(){

    zmq::context_t context(1);
    catch_signals();

    std::thread t1 ( rep_thread, &context);
    std::thread t2 ( req_thread, &context);

    t1.join();
    t2.join();

    return 0;
}

and finally I show some example output that demonstrates my issue that the zmq_poll does not seem to be affected by pressing ctrl+c in the terminal:

    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./poll 
    Hello world!
    Hello world!
    Hello world!
    ^CInterrupted by signal: 2
    Hello world!
    Hello world!
    ^Z
    [1]+  Stopped                 ./poll
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ kill -9 %1

    [1]+  Stopped                 ./poll
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ 

So here one could see by the output on the terminal that no zmq::error_t is thrown neither zmq::poll() returns -1;

how it should work is in the next example simple_poll.cpp one can see that a zmq::error_t is thrown:

#include<zmq.hpp>
#include<signal.h>
#include<iostream>


static int interrupted = 0;

static void handle_signal ( int signal ) 
{
    interrupted = signal;
    /*just to show that the signal handler works*/
    std::cerr << "Interrupted by signal: " << signal << std::endl;
}

void catch_signals (void) 
{
    struct sigaction action;
    action.sa_handler = handle_signal;
    action.sa_flags = 0;
    sigemptyset(&action.sa_mask);

    sigaction(SIGINT,  &action, NULL);
    sigaction(SIGTERM, &action, NULL);
}

using namespace std;

int main(){

    zmq::context_t context(1);
    catch_signals();
    zmq::socket_t sock( context, ZMQ_REP );
    /*listen on port 2346 on all available interfaces.*/
    sock.bind("tcp://*:2346");

    zmq::pollitem_t items[] = {
        {sock, 0 , ZMQ_POLLIN, 0}
    };

    try {
        /*wait for a event*/
        zmq::poll( items, 1, -1);
        /*for zmq users read message and respond*/
    }
    catch (zmq::error_t& e){
        cout << "error occured: " <<e.what() << endl;
        cout << "We were interrupted by: " << interrupted << endl;
    }
    return 0;
}

This yields the following results on ctrl+x in the terminal showing that the zmq::error_t is caught and the signal has been handled.

mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./simple_poll ^CInterrupted by signal: 2
error occured: Interrupted system call
We were interrupted by: 2
mja@gijs:~/Desktop/sample_programs/zeromq/poll$

回答1:

You've got a signal handler - but you don't do anything in it. In your handler, interrupt the polling loop (rather than true, check for some condition that you set in your signal handler.)

Let's say for argument's sake you are using c++11, try something like..

// Global which indicates that we are running..
std::atomic<bool> running = true;


// In your handler - reset this flag
static void handle_signal ( int signal ) {
  running = false;
}

Now your loops become:

while (running)
{
:
}