Consider the following method:
public void add(final List<ReportingSTG> message) {
if(stopRequested.get()) {
synchronized (this) {
if(stopRequested.get()) {
retryQueue.put(message);
}
}
}
messages.add(message);
if(messages.size() >= batchSize && waitingThreads.get() == 0) {
synchronized (this) {
if(messages.size() >= batchSize && waitingThreads.get() == 0) {
final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
messages.clear();
if(processors.size()>=numOfProcessors) {
waitingThreads.incrementAndGet();
waitForProcessor();
waitingThreads.decrementAndGet();
}
startProcessor(clone);
}
}
}
}
Particularly these 2 lines:
1: final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
2: messages.clear();
If thread A enters synchronized block and acquires a lock on current object, does this means that state of instance properties of this object can't be changed by other threads outside synchronized block (while thread A is in synchronized block)?
For example, thread A executed line 1 -> thread B entered the method and added new list entry (messages.add(message)) -> Thread a executed line 2 -> entry what was added by thread B removed (together with other entries). Is this scenario possible? Or thread B will wait while lock is released by thread A and only then will remove the List entry
messages is a non-static synchronizedList
UPD: updated method, possible solution:
public void add(final List<ReportingSTG> message) {
if(stopRequested.get()) {
synchronized (this) {
if(stopRequested.get()) {
retryQueue.put(message);
}
}
}
while (addLock.get()){
try {
Thread.sleep(1);
} catch (InterruptedException e) {}
}
messages.add(message);
if(messages.size() >= batchSize && waitingThreads.get() == 0) {
synchronized (this) {
if(messages.size() >= batchSize && waitingThreads.get() == 0) {
addLock.set(true);
final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
messages.clear();
addLock.set(false);
if(processors.size()>=numOfProcessors) {
waitingThreads.incrementAndGet();
waitForProcessor();
waitingThreads.decrementAndGet();
}
startProcessor(clone);
}
}
}
}
addLock - AtomicBoolean, false by default
The described scenario is possible. i.e. you may loose messages.
The
synchronized
keyword ensure that you never have 2 threads running thesynchronized
section simultaneously. It doesn't prevent modification by another thread of objects that are manipulated inside thesynchronized
block (as soon as this other thread have access to them).This is a possible solution since it synchronize the add and the clear.
I put together a
DoubleBufferedList
class recently. Perhaps using that would avoid your issue completely. As it's name suggests it implements the double-buffering algorithm but for lists.This class allow you to have many producer threads and many consumer threads. Each producer thread can add to the current list. Each consumer thread gets the whole current list for processing.
This also uses no locks, just atomics so it should run efficiently.
Note that much of this is test code. You can remove everything after the
// TESTING
comment but you may find the rigour of the tests comforting.