Producer/Consumer - producer adds data to collecti

2020-07-29 17:24发布

问题:

I have a Producer/Consumer usecase which is a bit unusual. I have a real world use case with some producers which I want them to be able to add objects into a collection without blocking. The consumer (just one) should block until a certain amount of objects are available in the collection (eg. 500) and then consume them in bulk. While there are less than 500 it should block and wait for the collection to fill. I don't mind if the queue exceeds this value (700, 1000 etc.) for short amount of times.

I currently don't seem to find a solution to fix this exact problem. I was thinking about using a ConcurrentLinkedQueue and to have the consumer periodically check if the queue has enough data, but that seems rather counterproductive.

Another idea is to use a LinkedBlockingQueue. The producers are not going to block (unless the queue is full which means it has Integer.MAX_VALUE values - this is not my case, so it's all good). The consumer will do a queue.take() and add the elements into an internal collection. When the internal collection reaches 500 elements it is going to consume them in batch.

Do you have any tips?

Thanks!

回答1:

In the end I went with the LinkedBlockingQueue.

The producers add items into the queue.

The consumer will do a queue.poll() and keep the items into an internal collection. When the internal collection reaches 500 elements it is going to consume them in bulk. I can also set some timeouts. For instance, if X seconds has passed I'll consume the collection even if it has less items than required (eg. 220).



回答2:

You could simply introduce an intermediate queue and consumer/producer, which would get the items from the queue filled by the producer, store them in a list, and once the list size is 500, put the list itself in the queue read by the consumer. The consumer would block until the next list of 500 items is available.

Of course you could also encapsulate this logic inside an object shared by the producers: the producers add their item to a "Batcher" object. The batcher object adds the items to a private list, and once the list size is 500, the batcher adds the list to the queue. Just make sure the batcher's addItem() method is synchronized. This would avoid the additional thread and queue.