I have bunch of keys and values that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.
Packet class:
public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private int pendingItems = 0;
public Packet(final RecordPartition recordPartition) {
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
.put(replicated);
}
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
itemBuffer.clear();
pendingItems = 0;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
}
Below is the way I am sending data. As of now my design only permits to send data asynchronously by calling sendToQueueAsync
method in above sendData()
method.
private void validateAndSend(final RecordPartition partition) {
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
final Packet packet = new Packet(partition);
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
}
packet.close();
}
Now I need to extend my design so that I can send data in three different ways. It is up to user to decide which way he wants to send data, either "sync" or "async".
- I need to send data asynchronously by calling
sender.sendToQueueAsync
method. - or I need to send data synchronously by calling
sender.sendToQueueSync
method. - or I need to send data synchronously but on a particular socket by calling
sender.sendToQueueSync
method. In this case I need to passsocket
variable somehow so thatsendData
knows about this variable.
SendRecord class:
public class SendRecord {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder {
private static final SendRecord INSTANCE = new SendRecord();
}
public static SendRecord getInstance() {
return Holder.INSTANCE;
}
private SendRecord() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetry();
}
}, 0, 1, TimeUnit.SECONDS);
}
private void handleRetry() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages) {
if (message.hasExpired()) {
if (message.shouldRetry()) {
message.markResent();
doSendAsync(message);
} else {
cache.invalidate(message.getAddress());
}
}
}
}
// called by multiple threads concurrently
public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
// called by above method and also by handleRetry method
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
// called by send method below
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(socket);
} finally {
msg.destroy();
}
}
// called by multiple threads to send data synchronously without passing socket
public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
// called by a threads to send data synchronously but with socket as the parameter
public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage record = cache.getIfPresent(address);
if (record != null) {
record.ackReceived();
cache.invalidate(address);
}
}
}
Callers will only call either of below three methods:
- sendToQueueAsync by passing two parameters
- sendToQueueSync by passing two parameters
- sendToQueueSync by passing three parameters
How should I design my Packet
and SendRecord
class so that I can tell Packet
class that this data needs to be send in either of above three ways to my messaging queue. It is up to user to decide which way he wants to send data to messaging queue. As of now the way my Packet
class is structured, it can send data only in one way.