C++ multiple multicast receiver with boost asio

2019-04-02 14:16发布

问题:

I have to implement a multicast receiver able to join a list of multicast groups and process received data in a specific thread using boost. I did try the following code.....

boost::asio::io_service m_io_service;
boost::asio::ip::udp::socket m_multicast_socket(m_io_service);

// listen address
boost::asio::ip::address listen_address  
     = boost::asio::ip::address::from_string("0.0.0.0");

// listen port
unsigned short multicast_port = m_configuration->m_multicast_interface_port;

boost::asio::ip::udp::endpoint listen_endpoint( listen_address, multicast_port );

// open socket
m_multicast_socket.open( listen_endpoint.protocol() );

// set socket buffer size
m_multicast_socket.set_option( 
       boost::asio::ip::udp::socket::receive_buffer_size
               ( m_configuration->m_receiving_socket_buffer_size ) );

// other sockets could bind to listen_address
m_multicast_socket.set_option( boost::asio::ip::udp::socket::reuse_address(true) );

boost::asio::socket_base::bytes_readable num_of_bytes_readable(true);

m_multicast_socket.io_control(num_of_bytes_readable);

m_multicast_socket.bind(listen_endpoint);


// joining a list of multicast group
for ( size_t i=0; i < multicast_groups.size(); ++i )
{
    boost::asio::ip::address multicast_address 
         = boost::asio::ip::address::from_string( multicast_groups[i] );

    m_multicast_socket.set_option( 
        boost::asio::ip::multicast::join_group(
            multicast_address ) );

    std::cout << multicast_groups[i] << " multicast group joined!" << std::endl;
}

And then to read data an infinite loop.......

while ( !m_exit )
{
    while ( !num_of_bytes_readable.get() )
    {
        boost::this_thread::sleep( boost::posix_time::milliseconds( 1 ) );
    }

    boost::asio::ip::udp::endpoint sender_endpoint;

    size_t bytes_received = m_multicast_socket.receive_from(
        boost::asio::buffer( m_reading_buffer.get(), m_configuration->m_reading_buffer_size )
            , sender_endpoint );

    if ( bytes_received > 0 )
    {
       // process
    }

    boost::this_thread::yield();
}

But no data is received and the loop.....

while ( !num_of_bytes_readable.get() )
{
    boost::this_thread::sleep( boost::posix_time::milliseconds( 1 ) );
}

never exits.

I also did try the code of the multicast receiver example from the boost asio documentation but the async_recv_from never returns.

回答1:

There are a few changes that need to occur:

  • As suggested in the comments, use a socket per multicast group.
  • If the sender and receiver are on the same machine, then verify that the ip::multicast::enable_loopback option is true.
  • The socket_base::bytes_readable is an IO control command. socket.io_control executes a command on the socket, and bytes_readable.get() returns the value of the command. Thus, the command needs to be executed each time to query how many bytes are available for reading. An alternative, and arguable slightly more readable solution, is to use the socket.available() function.

Here is a simple example that demonstrates the usage of socket_base::bytes_readable.

// Standard includes.
#include <iostream>

// 3rd party includes.
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>

void read(boost::asio::ip::udp::socket& socket)
{
  boost::asio::ip::udp::endpoint sender;
  std::vector<char> buffer;
  std::size_t bytes_readable = 0;
  for (int i=0; i < 3; ++i)
  {
    // Poll until data is available.
    while (!bytes_readable)
    {
      // Issue command to socket to get number of bytes readable.
      boost::asio::socket_base::bytes_readable num_of_bytes_readable(true);
      socket.io_control(num_of_bytes_readable);

      // Get the value from the command.
      bytes_readable = num_of_bytes_readable.get();

      // If there is no data available, then sleep.
      if (!bytes_readable)
      {
        boost::this_thread::sleep(boost::posix_time::seconds(1));
      }
    }

    // Resize the buffer to store all available data.
    buffer.resize(bytes_readable);

    // Read available data.
    socket.receive_from(
      boost::asio::buffer(buffer, bytes_readable),
      sender);

    // Extract data from the buffer.
    std::string message(buffer.begin(), buffer.end());

    // Output data.
    std::cout << "Received message: ";
    std::cout << message << std::endl;
  }
}

void write(boost::asio::ip::udp::socket& socket,
           boost::asio::ip::udp::endpoint& destination)
{
  std::string message;
  for (unsigned int i=0; i < 3; ++i)
  {
    std::ostringstream stream;
    stream << i;
    message = stream.str();
    socket.send_to(boost::asio::buffer(message), destination);
    std::cout << "Sent message: " << message << std::endl;
  }
}

int main(int argc, char* argv[])
{
  // Extract command-line arguments.
  bool receiver = std::string(argv[1]) == "receive";
  boost::asio::ip::address address =
    boost::asio::ip::address::from_string(argv[2]);
  unsigned short port = boost::lexical_cast<unsigned short>(argv[3]);

  // Create socket.
  using boost::asio::ip::udp;
  boost::asio::io_service service;
  udp::socket socket(service);
  socket.open(boost::asio::ip::udp::v4());

  // Allow other processes to reuse the address, permitting other processes on
  // the same machine to use the multicast address.
  socket.set_option(udp::socket::reuse_address(true));

  // Guarantee the loopback is enabled so that multiple processes on the same
  // machine can receive data that originates from the same socket.
  socket.set_option(boost::asio::ip::multicast::enable_loopback(true));
  socket.bind(
    udp::endpoint(boost::asio::ip::address_v4::any(),
    receiver ? port /* same as multicast port */
             : 0 /* any */));
  udp::endpoint destination(address, port);

  // Join group.
  namespace ip = boost::asio::ip;
  socket.set_option(ip::multicast::join_group(address));

  // Start read or write loops based on command line options.
  if (receiver) read(socket);
  else          write(socket, destination);
}

Usage:

$ ./a.out receive 235.55.55.55 55555 &
$ sleep 1
$ ./a.out send 235.55.55.55 55555
Sent message: 0
Sent message: 1
Sent message: 2
Received message: 0
Received message: 1
Received message: 2