I an ETL process I'm retrieving a lot of entities from a Spring Data Repository. I'm then using a parallel stream to map the entities to different ones. I can either use a consumer to store those new entities in another repository one by one or collect them into a List and store that in a single bulk operation. The first is costly while the later might exceed the available memory.
Is there a good way to collect a certain amount of elements in the stream (like limit does), consume that chunk, and keep on going in parallel until all elements are processed?
You might be able to write your own
Collector
that accumulates entities and then performs bulk updates.The
Collector.accumulator()
method can add the entities to an internal temp cache until the cache grows too large. When the cache is large enough you can do a bulk store into your other repository.Collector.merge()
needs to combine 2 thread's Collector's caches into a single cache (and possibly merge)Finally, the
Collector.finisher()
method is called when the Stream is done so store anything left in the cache here too.Since you are already using a parallel stream and seem OK with doing multiple loads at the same time, I assume you have thread safety already handled.
UPDATE
My comment regarding thread safety and parallel streams was referring to the actual saving/storing into the repository, not concurrency in your temp collection.
Each Collector should (I think) be run in its own thread. A parallel stream should create multiple collector instances by calling
supplier()
multiple times. So you can treat a collector instance as single threaded and it should work fine.For example in the Javadoc for
java.util.IntSummaryStatistics
it says:This is maybe somewhat old-fashion but should achieve batching with a minimum of locking.
It will produce output as
Here is solution by My Library: AbacusUtil:
My approach to bulk operations with chunking is to use a partitioning spliterator wrapper, and another wrapper which overrides the default splitting policy (arithmetic progression of batch sizes in increments of 1024) to simple fixed-batch splitting. Use it like this:
Here is the full code:
You could use a custom collector to do this elegantly.
Please see my answer to a similar question here:
Custom batch processing collector
Then, you can simply batch process the stream in parallel using the above collector to store the records back in your repository, example usage: