Consider the following method:
public void add(final List<ReportingSTG> message) {
if(stopRequested.get()) {
synchronized (this) {
if(stopRequested.get()) {
retryQueue.put(message);
}
}
}
messages.add(message);
if(messages.size() >= batchSize && waitingThreads.get() == 0) {
synchronized (this) {
if(messages.size() >= batchSize && waitingThreads.get() == 0) {
final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
messages.clear();
if(processors.size()>=numOfProcessors) {
waitingThreads.incrementAndGet();
waitForProcessor();
waitingThreads.decrementAndGet();
}
startProcessor(clone);
}
}
}
}
Particularly these 2 lines:
1: final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
2: messages.clear();
If thread A enters synchronized block and acquires a lock on current object, does this means that state of instance properties of this object can't be changed by other threads outside synchronized block (while thread A is in synchronized block)?
For example, thread A executed line 1 -> thread B entered the method and added new list entry (messages.add(message)) -> Thread a executed line 2 -> entry what was added by thread B removed (together with other entries). Is this scenario possible? Or thread B will wait while lock is released by thread A and only then will remove the List entry
messages is a non-static synchronizedList
UPD: updated method, possible solution:
public void add(final List<ReportingSTG> message) {
if(stopRequested.get()) {
synchronized (this) {
if(stopRequested.get()) {
retryQueue.put(message);
}
}
}
while (addLock.get()){
try {
Thread.sleep(1);
} catch (InterruptedException e) {}
}
messages.add(message);
if(messages.size() >= batchSize && waitingThreads.get() == 0) {
synchronized (this) {
if(messages.size() >= batchSize && waitingThreads.get() == 0) {
addLock.set(true);
final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
messages.clear();
addLock.set(false);
if(processors.size()>=numOfProcessors) {
waitingThreads.incrementAndGet();
waitForProcessor();
waitingThreads.decrementAndGet();
}
startProcessor(clone);
}
}
}
}
addLock - AtomicBoolean, false by default
The described scenario is possible. i.e. you may loose messages.
The synchronized
keyword ensure that you never have 2 threads running the synchronized
section simultaneously. It doesn't prevent modification by another thread of objects that are manipulated inside the synchronized
block (as soon as this other thread have access to them).
This is a possible solution since it synchronize the add and the clear.
private Object lock = new Object();
public void add(final List<ReportingSTG> message) {
if(stopRequested.get()) {
synchronized (this) {
if(stopRequested.get()) {
retryQueue.put(message);
}
}
}
synchronized(lock){
messages.add(message);
if(messages.size() >= batchSize && waitingThreads.get() == 0) {
final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
messages.clear();
if(processors.size()>=numOfProcessors) {
waitingThreads.incrementAndGet();
waitForProcessor();
waitingThreads.decrementAndGet();
}
startProcessor(clone);
}
}
}
I put together a DoubleBufferedList
class recently. Perhaps using that would avoid your issue completely. As it's name suggests it implements the double-buffering algorithm but for lists.
This class allow you to have many producer threads and many consumer threads. Each producer thread can add to the current list. Each consumer thread gets the whole current list for processing.
This also uses no locks, just atomics so it should run efficiently.
Note that much of this is test code. You can remove everything after the // TESTING
comment but you may find the rigour of the tests comforting.
public class DoubleBufferedList<T> {
// Atomic reference so I can atomically swap it through.
// Mark = true means I am adding to it so unavailable for iteration.
private AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);
// Factory method to create a new list - may be best to abstract this.
protected List<T> newList() {
return new ArrayList<>();
}
// Get and replace the current list.
public List<T> get() {
// Atomically grab and replace the list with an empty one.
List<T> empty = newList();
List<T> it;
// Replace an unmarked list with an empty one.
if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
// Failed to replace!
// It is probably marked as being appended to but may have been replaced by another thread.
// Return empty and come back again soon.
return Collections.EMPTY_LIST;
}
// Successfull replaced an unmarked list with an empty list!
return it;
}
// Grab and lock the list in preparation for append.
private List<T> grab() {
List<T> it;
// We cannot fail so spin on get and mark.
while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
// Spin on mark.
}
return it;
}
// Release the list.
private void release(List<T> it) {
// Unmark it. Should never fail because once marked it will not be replaced.
if (!list.attemptMark(it, false)) {
throw new IllegalMonitorStateException("it changed while we were adding to it!");
}
}
// Add an entry to the list.
public void add(T entry) {
List<T> it = grab();
try {
// Successfully marked! Add my new entry.
it.add(entry);
} finally {
// Always release after a grab.
release(it);
}
}
// Add many entries to the list.
public void add(List<T> entries) {
List<T> it = grab();
try {
// Successfully marked! Add my new entries.
it.addAll(entries);
} finally {
// Always release after a grab.
release(it);
}
}
// Add a number of entries.
public void add(T... entries) {
// Make a list of them.
add(Arrays.asList(entries));
}
// TESTING.
// How many testers to run.
static final int N = 10;
// The next one we're waiting for.
static final AtomicInteger[] seen = new AtomicInteger[N];
// The ones that arrived out of order.
static final Set<Widget>[] queued = new ConcurrentSkipListSet[N];
static {
// Populate the arrays.
for (int i = 0; i < N; i++) {
seen[i] = new AtomicInteger();
queued[i] = new ConcurrentSkipListSet();
}
}
// Thing that is produced and consumed.
private static class Widget implements Comparable<Widget> {
// Who produced it.
public final int producer;
// Its sequence number.
public final int sequence;
public Widget(int producer, int sequence) {
this.producer = producer;
this.sequence = sequence;
}
@Override
public String toString() {
return producer + "\t" + sequence;
}
@Override
public int compareTo(Widget o) {
// Sort on producer
int diff = Integer.compare(producer, o.producer);
if (diff == 0) {
// And then sequence
diff = Integer.compare(sequence, o.sequence);
}
return diff;
}
}
// Produces Widgets and feeds them to the supplied DoubleBufferedList.
private static class TestProducer implements Runnable {
// The list to feed.
final DoubleBufferedList<Widget> list;
// My ID
final int id;
// The sequence we're at
int sequence = 0;
// Set this at true to stop me.
public volatile boolean stop = false;
public TestProducer(DoubleBufferedList<Widget> list, int id) {
this.list = list;
this.id = id;
}
@Override
public void run() {
// Just pump the list.
while (!stop) {
list.add(new Widget(id, sequence++));
}
}
}
// Consumes Widgets from the suplied DoubleBufferedList
private static class TestConsumer implements Runnable {
// The list to bleed.
final DoubleBufferedList<Widget> list;
// My ID
final int id;
// Set this at true to stop me.
public volatile boolean stop = false;
public TestConsumer(DoubleBufferedList<Widget> list, int id) {
this.list = list;
this.id = id;
}
@Override
public void run() {
// The list I am working on.
List<Widget> l = list.get();
// Stop when stop == true && list is empty
while (!(stop && l.isEmpty())) {
// Record all items in list as arrived.
arrived(l);
// Grab another list.
l = list.get();
}
}
private void arrived(List<Widget> l) {
for (Widget w : l) {
// Mark each one as arrived.
arrived(w);
}
}
// A Widget has arrived.
private static void arrived(Widget w) {
// Which one is it?
AtomicInteger n = seen[w.producer];
// Don't allow multi-access to the same producer data or we'll end up confused.
synchronized (n) {
// Is it the next to be seen?
if (n.compareAndSet(w.sequence, w.sequence + 1)) {
// It was the one we were waiting for! See if any of the ones in the queue can now be consumed.
for (Iterator<Widget> i = queued[w.producer].iterator(); i.hasNext();) {
Widget it = i.next();
// Is it in sequence?
if (n.compareAndSet(it.sequence, it.sequence + 1)) {
// Done with that one too now!
i.remove();
} else {
// Found a gap! Stop now.
break;
}
}
} else {
// Out of sequence - Queue it.
queued[w.producer].add(w);
}
}
}
}
// Main tester
public static void main(String args[]) {
try {
System.out.println("DoubleBufferedList:Test");
// Create my test buffer.
DoubleBufferedList<Widget> list = new DoubleBufferedList<>();
// All threads running - Producers then Consumers.
List<Thread> running = new LinkedList<>();
// Start some producer tests.
List<TestProducer> producers = new ArrayList<>();
for (int i = 0; i < N; i++) {
TestProducer producer = new TestProducer(list, i);
Thread t = new Thread(producer);
t.setName("Producer " + i);
t.start();
producers.add(producer);
running.add(t);
}
// Start the same number of consumers.
List<TestConsumer> consumers = new ArrayList<>();
for (int i = 0; i < N; i++) {
TestConsumer consumer = new TestConsumer(list, i);
Thread t = new Thread(consumer);
t.setName("Consumer " + i);
t.start();
consumers.add(consumer);
running.add(t);
}
// Wait for a while.
Thread.sleep(5000);
// Close down all.
for (TestProducer p : producers) {
p.stop = true;
}
for (TestConsumer c : consumers) {
c.stop = true;
}
// Wait for all to stop.
for (Thread t : running) {
System.out.println("Joining " + t.getName());
t.join();
}
// What results did we get?
for (int i = 0; i < N; i++) {
// How far did the producer get?
int gotTo = producers.get(i).sequence;
// The consumer's state
int seenTo = seen[i].get();
Set<Widget> queue = queued[i];
if (seenTo == gotTo && queue.isEmpty()) {
System.out.println("Producer " + i + " ok.");
} else {
// Different set consumed as produced!
System.out.println("Producer " + i + " Failed: gotTo=" + gotTo + " seenTo=" + seenTo + " queued=" + queue);
}
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}