I have bunch of keys and values that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.
Packet class:
public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private int pendingItems = 0;
public Packet(final RecordPartition recordPartition) {
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
.put(replicated);
}
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
itemBuffer.clear();
pendingItems = 0;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
}
Below is the way I am sending data. As of now my design only permits to send data asynchronously by calling sendToQueueAsync
method in above sendData()
method.
private void validateAndSend(final RecordPartition partition) {
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
final Packet packet = new Packet(partition);
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
}
packet.close();
}
Now I need to extend my design so that I can send data in three different ways. It is up to user to decide which way he wants to send data, either "sync" or "async".
- I need to send data asynchronously by calling
sender.sendToQueueAsync
method. - or I need to send data synchronously by calling
sender.sendToQueueSync
method. - or I need to send data synchronously but on a particular socket by calling
sender.sendToQueueSync
method. In this case I need to passsocket
variable somehow so thatsendData
knows about this variable.
SendRecord class:
public class SendRecord {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder {
private static final SendRecord INSTANCE = new SendRecord();
}
public static SendRecord getInstance() {
return Holder.INSTANCE;
}
private SendRecord() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetry();
}
}, 0, 1, TimeUnit.SECONDS);
}
private void handleRetry() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages) {
if (message.hasExpired()) {
if (message.shouldRetry()) {
message.markResent();
doSendAsync(message);
} else {
cache.invalidate(message.getAddress());
}
}
}
}
// called by multiple threads concurrently
public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
// called by above method and also by handleRetry method
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
// called by send method below
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(socket);
} finally {
msg.destroy();
}
}
// called by multiple threads to send data synchronously without passing socket
public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
// called by a threads to send data synchronously but with socket as the parameter
public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage record = cache.getIfPresent(address);
if (record != null) {
record.ackReceived();
cache.invalidate(address);
}
}
}
Callers will only call either of below three methods:
- sendToQueueAsync by passing two parameters
- sendToQueueSync by passing two parameters
- sendToQueueSync by passing three parameters
How should I design my Packet
and SendRecord
class so that I can tell Packet
class that this data needs to be send in either of above three ways to my messaging queue. It is up to user to decide which way he wants to send data to messaging queue. As of now the way my Packet
class is structured, it can send data only in one way.
I don't see the definition of
sender
inPacket
. I assume it is defined as a private instance variable?The design indeed needs to be fixed. By having the
Packet
class do the sending, the design violates the Single responsibility principle. There should be a separate (possibly abstract) class that prepares the data to be sent (prepares ajava.nio.Buffer
instance) and it can have one or more sub classes, one of which returns ajava.nio.ByteBuffer
instance.A separate class that gets a
Buffer
and performs the sending. This (possibly abstract) class can have sub classes for the different sending platforms and methods.then, you need another class that implements the Builder pattern. Clients that wish to send packets, use the builder to specify concrete
Packet
andSender
(and possibly other needed properties, like a socket number) and then callsend()
that does the sending.First of you you need to have a clear answer to the question of who (or what part of your code) is responsible for deciding what sending method is to be used.
(Just to name a few possibilities)
The answer will determine what structure would be most appropriate.
Nevertheless, It is clear that current
sendData()
method is the place to put the decision to effect. Thus, this method needs to be provided the implementation to use. The actualsend()
likely is similar in all cases. It suggest to encapsulate the sending functionality into an interface that does provide thesend()
method signature:If the target socket is to be determined from the actual message data, then you might prefer a general signature of
and make that socket value optional or use a specific value for encoding "no specific socket" cases. Otherwise the you could use a specific
Sender
instance that has the socket passed in via constructor.I currently do not see a valid reason from what you have provided that calls for implementing the three different send methods as three different methods within one class. If common code is a reason then using a common base class will allow for appropriate sharing.
That leaves the question how the specific instance of the appropriate
Sender
implementation is to be made available withinsendData()
.If the sending strategy is to be determined outside of
sendData()
the implementation has to be handed in. Either as a parameter or as a field from the current class instance. If the local data is what is determining the sending strategy you should delegate the determination of the proper implementation to a selection class that will return the proper implementation. The call will then look similar to:Though, without having a clearer picture on what is fixed and what is variable in the execution, it is hard suggesting best approach.
In case the decision is based on data, the whole selection and diversion process is local to
Packet
class.If the decision is made externally to
Packet
you might want to get a sending strategy implementation at that location and pass that as parameter down toaddAndSendJunked()
(or more precisely down tosendData()
.You could have an enum class, say PacketTransportionMode which would have a 'send' method overridden for different types of enum values (SYNC,ASYNC,SYNC_ON_SOCKET), example: .
Also, in packet class, introduce transportationMode variable. In packet.send() implementation, this.packetTransportationMode.send(this) can be called
Client can create packet object and set its transportationMode in the beginning, similar to setting RecordPartition. Then client can call packet.send();
Or instead of putting transportationMode variable inside packet class and calling this.packetTransportationMode.send(this), client can also create Packet object and call PacketTransportionMode.SYNC.send(packet) directly.
Strategy. Difference from Kerri Brown's answer is that Packet should not make decision between strategies. Instead, decide it outside of the Packet class.
Single sending strategy interface should be implemented by 3 different classes, each corresponding to one of the mentioned sending approaches. Inject strategy interface into the Packet, so that Packet does not have to change regardless of which strategy it deals with.
You said it must be based on user's choice. So you can ask user upfront, what is the choice, and then based on that, instantiate the implementation of a sending strategy interface which corresponds to the user's choice. Then, instantiate the Packet with the selected sending strategy instance.
If you feel that later on choice may not depend on the user, then make that a Factory. So then your solution becomes combination of Factory and Strategy.
In that case, Packet can have Factory interface injected. Packet asks Factory to give it the sending strategy. Next it sends using the strategy acquired from the factory. Factory asks for user's input, which later on can be replaced by making a choice based on some other condition, and not the user input. You achieve that by implementing Factory interface differently in the future and injecting that new factory instead of this one (i.e. user input based factory vs some other condition based factory).
Both approaches will give you a code following Open/Close principle. But try not to overengineer if you don't really need a factory.
use enum varibale for defining the types of send message
I think your best option is the Strategy pattern (https://en.wikipedia.org/wiki/Strategy_pattern).
Using this pattern, you can encapsulate the behaviour of each type of "send", for example, an AsynchronousSend class, a SynchronousSend class and an AsynchronousSocketSend class. (You could probably come up with better names). The
Packet
class can then decide, based on some logic, which class to use to send the data to the queue.