Is there an elegant way to process a stream in chu

2020-02-02 10:08发布

My exact scenario is inserting data to database in batches, so I want to accumulate DOM objects then every 1000, flush them.

I implemented it by putting code in the accumulator to detect fullness then flush, but that seems wrong - the flush control should come from the caller.

I could convert the stream to a List then use subList in an iterative fashion, but that too seems clunky.

It there a neat way to take action every n elements then continue with the stream while only processing the stream once?

7条回答
Ridiculous、
2楼-- · 2020-02-02 11:07

You can create a stream of chunks (List<T>) of a stream of items and a given chunk size by

  • grouping the items by the chunk index (element index / chunk size)
  • ordering the chunks by their index
  • reducing the map to their ordered elements only

Code:

public static <T> Stream<List<T>> chunked(Stream<T> stream, int chunkSize) {
    AtomicInteger index = new AtomicInteger(0);

    return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize))
            .entrySet().stream()
            .sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue);
}

Example usage:

Stream<Integer> stream = IntStream.range(0, 100).mapToObj(Integer::valueOf);
Stream<List<Integer>> chunked = chunked(stream, 8);
chunked.forEach(chunk -> System.out.println("Chunk: " + chunk));

Output:

Chunk: [0, 1, 2, 3, 4, 5, 6, 7]
Chunk: [8, 9, 10, 11, 12, 13, 14, 15]
Chunk: [16, 17, 18, 19, 20, 21, 22, 23]
Chunk: [24, 25, 26, 27, 28, 29, 30, 31]
Chunk: [32, 33, 34, 35, 36, 37, 38, 39]
Chunk: [40, 41, 42, 43, 44, 45, 46, 47]
Chunk: [48, 49, 50, 51, 52, 53, 54, 55]
Chunk: [56, 57, 58, 59, 60, 61, 62, 63]
Chunk: [64, 65, 66, 67, 68, 69, 70, 71]
Chunk: [72, 73, 74, 75, 76, 77, 78, 79]
Chunk: [80, 81, 82, 83, 84, 85, 86, 87]
Chunk: [88, 89, 90, 91, 92, 93, 94, 95]
Chunk: [96, 97, 98, 99]
查看更多
登录 后发表回答