I have a class in which I am populating a map liveSocketsByDatacenter
from a single background thread every 30 seconds and then I have a method getNextSocket
which will be called by multiple reader threads to get a live socket available which uses the same map to get this info.
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>();
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads to get the next live socket
public Optional<SocketHolder> getNextSocket() {
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
Collections.shuffle(listOfEndPoints);
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
return Optional.of(obj);
}
}
}
return Optional.absent();
}
private void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
boolean status = SendToSocket.getInstance().execute(3, holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), liveUpdatedSockets);
}
}
}
As you can see in my above class:
- From a single background thread which runs every 30 seconds, I populate
liveSocketsByDatacenter
map with all the live sockets. - And then from multiple threads, I call
getNextSocket
method to give me live socket available which usesliveSocketsByDatacenter
map to get the required information.
Is my above code thread safe and all the reader threads will see liveSocketsByDatacenter
accurately? Since I am modifying liveSocketsByDatacenter
map every 30 seconds from a single background thread and then from a lot of reader threads, I am calling getNextSocket
method so I am not sure if I did anything wrong here.
It looks like there might be a thread safety issue in my "getLiveSocket" method as every read gets a shared ArrayList
out of the map and shuffles it? And there might be few more places as well which I might have missed. What is the best way to fix these thread safety issues in my code?
If there is any better way to rewrite this, then I am open for that as well.
As you can read in detail e.g. here, if multiple threads access a hash map concurrently, and at least one of the threads modifies the map structurally, it must be synchronized externally to avoid an inconsistent view of the contents. So to be thread safe you should use either Java Collections synchronizedMap() method or a ConcurrentHashMap.
or
As you have very highly concurrent application modifying and reading key value in different threads, you should also have a look at the Producer-Consumer principle, e.g. here.
To be thread-safe, your code must synchronize any access to all shared mutable state.
Here you share
liveSocketsByDatacenter
, an instance ofHashMap
a non thread-safe implementation of aMap
that can potentially be concurrently read (byupdateLiveSockets
andgetNextSocket
) and modified (byconnectToZMQSockets
andupdateLiveSockets
) without synchronizing any access which is already enough to make your code non thread safe. Moreover, the values of thisMap
are instances ofArrayList
a non thread-safe implementation of aList
that can also potentially be concurrently read (bygetNextSocket
andupdateLiveSockets
) and modified (bygetLiveSocket
more precisely byCollections.shuffle
).The simple way to fix your 2 thread safety issues could be to:
ConcurrentHashMap
instead of aHashMap
for your variableliveSocketsByDatacenter
as it is a natively thread safe implementation of aMap
.ArrayList
instances as value of your map usingCollections.unmodifiableList(List<? extends T> list)
, your lists would then be immutable so thread safe.For example:
getLiveSocket
to avoid callingCollections.shuffle
directly on your list, you could for example shuffle only the list of live sockets instead of all sockets or use a copy of your list (with for examplenew ArrayList<>(listOfEndPoints)
) instead of the list itself.For example:
For #1 as you seem to frequently read and rarely (only once every 30 seconds) modify your map, you could consider to rebuild your map then share its immutable version (using
Collections.unmodifiableMap(Map<? extends K,? extends V> m)
) every 30 seconds, this approach is very efficient in mostly read scenario as you no longer pay the price of any synchronization mechanism to access to the content of your map.Your code would then be:
Your field
liveSocketsByDatacenter
could also be of typeAtomicReference<Map<Datacenters, List<SocketHolder>>>
, it would then befinal
, your map will still be stored in avolatile
variable but within the classAtomicReference
.The previous code would then be:
It seems, that you can safely use
ConcurrentHashMap
here instead of regularHashMap
and it should work.In your current approach, using regular HashMap, you need to have synchronization of methods:
getNextSocket
,connectToZMQSockets
andupdateLiveSockets
(everywhere you update or read the HashMap) like asychronized
word before those methods or other lock on a monitor common for all these methods - And this is not because ofConcurrentModificationException
, but because without synchornization reading threads can see not updated values.There is also problem with concurrent modification in the getLiveSocket, one of the simplest way to avoid this problem is to copy the listOfEndpoints to a new list before shuffle, like this:
Using ConcurrentHashMap should make your code threadsafe. Alternatively use synchronized methods to access existing hashmap.