takeWhile() working differently with flatmap

2020-05-13 20:51发布

I am creating snippets with takeWhile to explore its possibilities. When used in conjunction with flatMap, the behaviour is not in line with the expectation. Please find the code snippet below.

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

Actual Output:

Sample1
Sample2
Sample3
Sample5

ExpectedOutput:

Sample1
Sample2
Sample3

Reason for the expectation is that takeWhile should be executing till the time the condition inside turns true. I have also added printout statements inside flatmap for debugging. The streams are returned just twice which is inline with the expectation.

However, this works just fine without flatmap in the chain.

String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

Actual Output:

Sample3

Here the actual output matches with the expected output.

Disclaimer: These snippets are just for code practise and does not serve any valid usecases.

Update: Bug JDK-8193856: fix will be available as part of JDK 10. The change will be to correct whileOps Sink::accept

@Override 
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

Changed Implementation:

@Override
public void accept(T t) {
    if (take && (take = predicate.test(t))) {
        downstream.accept(t);
    }
}

4条回答
Juvenile、少年°
2楼-- · 2020-05-13 20:56

This is a bug in JDK 9 - from issue #8193856:

takeWhile is incorrectly assuming that an upstream operation supports and honors cancellation, which unfortunately is not the case for flatMap.

Explanation

If the stream is ordered, takeWhile should show the expected behavior. This is not entirely the case in your code because you use forEach, which waives order. If you care about it, which you do in this example, you should use forEachOrdered instead. Funny thing: That doesn't change anything.

查看更多
beautiful°
3楼-- · 2020-05-13 21:00

This is a bug no matter how I look at it - and thank you Holger for your comments. I did not want to put this answer in here (seriously!), but none of the answer clearly states that this is a bug.

People are saying that this has to with ordered/un-ordered, and this is not true as this will report true 3 times:

Stream<String[]> s1 = Arrays.stream(strArray);
System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s2 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream));
System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s3 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream))
            .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED));

It's very interesting also that if you change it to:

String[][] strArray = { 
         { "Sample1", "Sample2" }, 
         { "Sample3", "Sample5", "Sample4" }, // Sample4 is the last one here
         { "Sample7", "Sample8" } 
};

then Sample7 and Sample8 will not be part of the output, otherwise they will. It seems that flatmap ignores a cancel flag that would be introduced by dropWhile.

查看更多
虎瘦雄心在
4楼-- · 2020-05-13 21:01

If you look at the documentation for takeWhile:

if this stream is ordered, [returns] a stream consisting of the longest prefix of elements taken from this stream that match the given predicate.

if this stream is unordered, [returns] a stream consisting of a subset of elements taken from this stream that match the given predicate.

Your stream is coincidentally ordered, but takeWhile doesn't know that it is. As such, it is returning 2nd condition - the subset. Your takeWhile is just acting like a filter.

If you add a call to sorted before takeWhile, you'll see the result you expect:

Arrays.stream(strArray)
      .flatMap(indStream -> Arrays.stream(indStream))
      .sorted()
      .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
      .forEach(ele -> System.out.println(ele));
查看更多
Deceive 欺骗
5楼-- · 2020-05-13 21:19

The reason for that is the flatMap operation also being an intermediate operations with which (one of) the stateful short-circuiting intermediate operation takeWhile is used.

The behavior of flatMap as pointed by Holger in this answer is certainly a reference one shouldn't miss out to understand the unexpected output for such short-circuiting operations.

Your expected result can be achieved by splitting these two intermediate operations by introducing a terminal operation to deterministically use an ordered stream further and performing them for a sample as :

List<String> sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList());
sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
            .forEach(System.out::println);

Also, there seems to be a related Bug#JDK-8075939 to trace this behavior already registered.

Edit: This can be tracked further at JDK-8193856 accepted as a bug.

查看更多
登录 后发表回答