I am working on my application which sends data to zeromq
. Below is what my application does:
- I have a class
SendToZeroMQ
that send data to zeromq. - Add same data to
retryQueue
in the same class so that it can be retried later on if acknowledgment is not received. It uses guava cache with maximumSize limit. - Have a separate thread which receives acknowledgement from the zeromq for the data that was sent earlier and if acknowledgement is not received, then
SendToZeroMQ
will retry sending that same piece of data. And if acknowledgement is received, then we will remove it fromretryQueue
so that it cannot be retried again.
Idea is very simple and I have to make sure my retry policy works fine so that I don't loose my data. This is very rare but in case if we don't receive acknolwedgements.
I am thinking of building two types of RetryPolicies
but I am not able to understand how to build that here corresponding to my program:
RetryNTimes:
In this it will retry N times with a particular sleep between each retry and after that, it will drop the record.ExponentialBackoffRetry:
In this it will exponentially keep retrying. We can set some max retry limit and after that it won't retry and will drop the record.
Below is my SendToZeroMQ
class which sends data to zeromq, also retry every 30 seconds from a background thread and start ResponsePoller
runnable which keeps running forever:
public class SendToZeroMQ {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
private final Cache<Long, byte[]> retryQueue =
CacheBuilder
.newBuilder()
.maximumSize(10000000)
.concurrencyLevel(200)
.removalListener(
RemovalListeners.asynchronous(new CustomListener(), executorService)).build();
private static class Holder {
private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}
public static SendToZeroMQ getInstance() {
return Holder.INSTANCE;
}
private SendToZeroMQ() {
executorService.submit(new ResponsePoller());
// retry every 30 seconds for now
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Entry<Long, byte[]> entry : retryQueue.asMap().entrySet()) {
sendTo(entry.getKey(), entry.getValue());
}
}
}, 0, 30, TimeUnit.SECONDS);
}
public boolean sendTo(final long address, final byte[] encodedRecords) {
Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
return sendTo(address, encodedRecords, liveSockets.get().getSocket());
}
public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedByteArray);
boolean sent = msg.send(socket);
msg.destroy();
// adding to retry queue
retryQueue.put(address, encodedByteArray);
return sent;
}
public void removeFromRetryQueue(final long address) {
retryQueue.invalidate(address);
}
}
Below is my ResponsePoller
class which polls all the acknowledgement from the zeromq. And if we get an acknowledgement back from the zeromq then we will remove that record from the retry queue so that it doesn't get retried otherwise it will get retried.
public class ResponsePoller implements Runnable {
private static final Random random = new Random();
@Override
public void run() {
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.PULL);
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.bind("tcp://" + TestUtils.getIpaddress() + ":8076");
PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};
while (!Thread.currentThread().isInterrupted()) {
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; centitick++) {
ZMQ.poll(items, 10);
if (items[0].isReadable()) {
ZMsg msg = ZMsg.recvMsg(client);
Iterator<ZFrame> it = msg.iterator();
while (it.hasNext()) {
ZFrame frame = it.next();
try {
long address = TestUtils.getAddress(frame.getData());
// remove from retry queue since we got the acknowledgment for this record
SendToZeroMQ.getInstance().removeFromRetryQueue(address);
} catch (Exception ex) {
// log error
} finally {
frame.destroy();
}
}
msg.destroy();
}
}
}
ctx.destroy();
}
}
Question:
As you can see above, I am sending encodedRecords
to zeromq using SendToZeroMQ
class and then it gets retried every 30 seconds depending on whether we got an acknolwedgement back from ResponsePoller
class or not.
For each encodedRecords
there is a unique key called address
and that's what we will get back from zeromq as an acknowledgement.
How can I go ahead and extend this example to build two retry policies that I mentioned above and then I can pick what retry policy I want to use while sending data. I came up with below interface but then I am not able understand how should I move forward to implement those retry policies and use it in my above code.
public interface RetryPolicy {
/**
* Called when an operation has failed for some reason. This method should return
* true to make another attempt.
*/
public boolean allowRetry(int retryCount, long elapsedTimeMs);
}
Can I use guava-retrying or failsafe here becuase these libraries already have many retry policies which I can use?
You can use apache camel. It provides a component for zeromq, and tools like errohandler, redeliverypolicy, deadletter channel and such things are natively provided.
not a perfect way, but can be achieved by below way as well.
}
Create two implementation. For RetryNTimes
For ExponentialBackoffRetry
You need to make some changes in SendToZeroMQ class
I am not able to work out all the details regarding how to use the relevant API-s, but as for algorithm, you could try:
scheduleAtFixedRate
, find the message in the retry queue which has the lowestwhen_is_next_retry
(possibly by sorting on absolute retry-timestamp and picking the first), and let the executorService reschedule itself usingschedule
and thetime_to_next_retry
when_is_next_retry
(if the RetryPolicy returns -1, it could mean that the message shall not be retried any more)Here is a working little simulation of your environment that shows how this can be done. Note the Guava cache is the wrong data structure here, since you aren't interested in eviction (I think). So I'm using a concurrent hashmap: