I've got a C++ application that is using ZeroMQ for some messaging. But it also has to provide a SGCI connection for an AJAX / Comet based web service.
For this I need a normal TCP socket. I could do that by normal Posix sockets, but to stay cross platform portable and make my life easier (I hope...) I was thinking of using Boost::ASIO.
But now I have the clash of ZMQ wanting to use it's own zmq_poll()
and ASIO it's io_service.run()
...
Is there a way to get ASIO to work together with the 0MQ zmq_poll()
?
Or is there an other recommended way to achieve such a setup?
Note: I could solve that by using multiple threads - but it's only a little single core / CPU box that'll run that program with a very low amount of SCGI traffic, so multithreading would be a waste of resources...
After reading the documentation here and here, specifically this paragraph
ZMQ_FD: Retrieve file descriptor associated with the socket The ZMQ_FD
option shall retrieve the file descriptor associated with the
specified socket. The returned file descriptor can be used to
integrate the socket into an existing event loop; the ØMQ library
shall signal any pending events on the socket in an edge-triggered
fashion by making the file descriptor become ready for reading.
I think you can use null_buffers
for every zmq_pollitem_t
and defer the event loop to an io_service
, completely bypassing zmq_poll()
altogether. There appear to be some caveats in the aforementioned documentation however, notably
The ability to read from the returned file descriptor does not
necessarily indicate that messages are available to be read from, or
can be written to, the underlying socket; applications must retrieve
the actual event state with a subsequent retrieval of the ZMQ_EVENTS
option.
So when the handler for one of your zmq sockets is fired, you'll have to do a little more work before handling the event I think. Uncompiled pseudo-code is below
const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
boost::asio::null_buffers(),
[=](const boost::system::error_code& error)
{
if (!error) {
// handle data ready to be read
}
}
);
note you don't have to use a lambda here, boost::bind
to a member function would be sufficient.
In the end I figured out there are two possible solutions:
- Sam Miller's where we use the event loop of ASIO
- The ZeroMQ's event loop by getting the ASIO file descriptors though the
.native()
methods of the acceptor
and the socket
and inserting them into the array of zmq_pollitem_t
I have accepted the answer of Sam Miller as that's for me the best solution in SCGI case where constantly new connections are created and ended. Handling the thus every changing zmq_pollitem_t
array is big hassle that can be avoided by using the ASIO event loop.
Obtaining the socket to ZeroMQ is the smallest part of the battle. ZeroMQ is based on a protocol which is layered over TCP, so you will have to reimplement ZeroMQ within a custom Boost.Asio io_service if you go this route. I ran into the same problem when creating an asynchronous ENet service using Boost.Asio by first simply trying to catch traffic from an ENet client using a Boost.Asio UDP service. ENet is a TCP like protocol layered over UDP, so all I achieved at that point was catching packets in a virtually useless state.
Boost.Asio is template based, and the built-in io_service's use templates to basically wrap the system socket library to create TCP and UDP service. My final solution was to create a custom io_service that wrapped the ENet library rather than the systems socket library, allowing it to use ENet's transport functions rather than having to reimplement them using the built-in UDP transport.
The same can be done for ZeroMQ, but ZeroMQ is already a very high performance network library in it's own right that already provides async I/O. I think you can create a viable solution by receiving messages using ZeroMQ's existing API and passing the messages into a io_service thread pool. That way messages/tasks will still be handled asynchronously using Boost.Asio's reactor pattern without having to re-write anything. ZeroMQ will provide the async I/O, Boost.Asio will provide the async task handlers/workers.
The existing io_service can still be coupled to an existing TCP socket as well, allowing the threadpool to handle both TCP (HTTP in your case) and ZeroMQ. It's entirely possible in such a setup for the ZeroMQ task handlers to access the TCP services session objects, allowing you to send results of the ZeroMQ message/task back to a TCP client.
The following is just to illustrate the concept.
// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
threads.push_back(thread);
}
while (1) {
char buffer [10];
zmq_recv (responder_, buffer, 10, 0);
io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}
2 years after this question someone posted a project which does exactly this. The project is here: https://github.com/zeromq/azmq. The blog post discussing the design is here: https://rodgert.github.io/2014/12/24/boost-asio-and-zeromq-pt1/.
Here is the sample code copied from the readme:
#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <array>
namespace asio = boost::asio;
int main(int argc, char** argv) {
asio::io_service ios;
azmq::sub_socket subscriber(ios);
subscriber.connect("tcp://192.168.55.112:5556");
subscriber.connect("tcp://192.168.55.201:7721");
subscriber.set_option(azmq::socket::subscribe("NASDAQ"));
azmq::pub_socket publisher(ios);
publisher.bind("ipc://nasdaq-feed");
std::array<char, 256> buf;
for (;;) {
auto size = subscriber.receive(asio::buffer(buf));
publisher.send(asio::buffer(buf));
}
return 0;
}
Looks nice. If you try, let me know in the comments if it still works in 2019 [I will probably try in a couple of months and then update this answer] (the repo is stale, last commit was a year ago)
The solution is to poll your io_service as well instead of run().
Check out this solution for some poll() info.
Using poll instead of run will allow you to poll zmq's connections without any blocking issues.