Java 8: Lambda-Streams, Filter by Method with Exce

2019-01-02 17:24发布

问题:

I have a problem trying out the Lambda expressions of Java 8. Usually it works fine, but now I have methods that throw IOException's. It's best if you look at the following code:

class Bank{
    ....
    public Set<String> getActiveAccountNumbers() throws IOException {
        Stream<Account> s =  accounts.values().stream();
        s = s.filter(a -> a.isActive());
        Stream<String> ss = s.map(a -> a.getNumber());
        return ss.collect(Collectors.toSet());
    }
    ....
}

interface Account{
    ....
    boolean isActive() throws IOException;
    String getNumber() throws IOException;
    ....
}

The problem is, it doesn't compile, because I have to catch the possible exceptions of the isActive- and the getNumber-Methods. But even if I explicitly use a try-catch-Block like below, it still doesn't compile because I don't catch the Exception. So either there is a bug in JDK, or I don't know how to catch these Exceptions.

class Bank{
    ....
    //Doesn't compile either
    public Set<String> getActiveAccountNumbers() throws IOException {
        try{
            Stream<Account> s =  accounts.values().stream();
            s = s.filter(a -> a.isActive());
            Stream<String> ss = s.map(a -> a.getNumber());
            return ss.collect(Collectors.toSet());
        }catch(IOException ex){
        }
    }
    ....
}

How can I get it work? Can someone hint me to the right solution?

回答1:

You must catch the exception before it escapes the lambda:

s = s.filter(a -> { try { return a.isActive(); } 
                    catch (IOException e) { throw new UncheckedIOException(e); }}});

Consider the fact that the lambda isn't evaluated at the place you write it, but at some completely unrelated place, within a JDK class. So that would be the point where that checked exception would be thrown, and at that place it isn't declared.

You can deal with it by using a wrapper of your lambda that translates checked exceptions to unchecked ones:

public static <T> T uncheckCall(Callable<T> callable) {
  try { return callable.call(); }
  catch (RuntimeException e) { throw e; }
  catch (Exception e) { throw new RuntimeException(e); }
}

Your example would be written as

return s.filter(a -> uncheckCall(a::isActive))
        .map(Account::getNumber)
        .collect(toSet());

In my projects I deal with this issue without wrapping; instead I use a method which effectively defuses compiler's checking of exceptions. Needless to say, this should be handled with care and everybody on the project must be aware that a checked exception may appear where it is not declared. This is the plumbing code:

public static <T> T uncheckCall(Callable<T> callable) {
  try { return callable.call(); }
  catch (Exception e) { return sneakyThrow(e); }
}
public static void uncheckRun(RunnableExc r) {
  try { r.run(); } catch (Exception e) { sneakyThrow(e); }
}
public interface RunnableExc { void run() throws Exception; }


@SuppressWarnings("unchecked")
private static <T extends Throwable> void sneakyThrow(Throwable t) throws T {
  throw (T) t;
}

and you can expect to get an IOException thrown in your face, even though collect does not declare it. In most, but not all real-life cases you would want to just rethrow the exception, anyway, and handle it as a generic failure. In all those cases, nothing is lost in clarity or correctness. Just beware of those other cases, where you would actually want to react to the exception on the spot. The developer will not be made aware by the compiler that there is an IOException to catch there and the compiler will in fact complain if you try to catch it because we have fooled it into believing that no such exception can be thrown.



回答2:

You can also propagate your static pain with lambdas, so the whole thing looks readable:

s.filter(a -> propagate(a::isActive))

propagate here receives java.util.concurrent.Callable as a parameter and converts any exception caught during the call into RuntimeException. There is a similar conversion method Throwables#propagate(Throwable) in Guava.

This method seems being essential for lambda method chaining, so I hope one day it will be added to one of the popular libs or this propagating behavior would be by default.

public class PropagateExceptionsSample {
    // a simplified version of Throwables#propagate
    public static RuntimeException runtime(Throwable e) {
        if (e instanceof RuntimeException) {
            return (RuntimeException)e;
        }

        return new RuntimeException(e);
    }

    // this is a new one, n/a in public libs
    // Callable just suits as a functional interface in JDK throwing Exception 
    public static <V> V propagate(Callable<V> callable){
        try {
            return callable.call();
        } catch (Exception e) {
            throw runtime(e);
        }
    }

    public static void main(String[] args) {
        class Account{
            String name;    
            Account(String name) { this.name = name;}

            public boolean isActive() throws IOException {
                return name.startsWith("a");
            }
        }


        List<Account> accounts = new ArrayList<>(Arrays.asList(new Account("andrey"), new Account("angela"), new Account("pamela")));

        Stream<Account> s = accounts.stream();

        s
          .filter(a -> propagate(a::isActive))
          .map(a -> a.name)
          .forEach(System.out::println);
    }
}


回答3:

This UtilException helper class lets you use any checked exceptions in Java streams, like this:

Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
      .map(rethrowFunction(Class::forName))
      .collect(Collectors.toList());

Note Class::forName throws ClassNotFoundException, which is checked. The stream itself also throws ClassNotFoundException, and NOT some wrapping unchecked exception.

public final class UtilException {

@FunctionalInterface
public interface Consumer_WithExceptions<T, E extends Exception> {
    void accept(T t) throws E;
    }

@FunctionalInterface
public interface BiConsumer_WithExceptions<T, U, E extends Exception> {
    void accept(T t, U u) throws E;
    }

@FunctionalInterface
public interface Function_WithExceptions<T, R, E extends Exception> {
    R apply(T t) throws E;
    }

@FunctionalInterface
public interface Supplier_WithExceptions<T, E extends Exception> {
    T get() throws E;
    }

@FunctionalInterface
public interface Runnable_WithExceptions<E extends Exception> {
    void run() throws E;
    }

/** .forEach(rethrowConsumer(name -> System.out.println(Class.forName(name)))); or .forEach(rethrowConsumer(ClassNameUtil::println)); */
public static <T, E extends Exception> Consumer<T> rethrowConsumer(Consumer_WithExceptions<T, E> consumer) throws E {
    return t -> {
        try { consumer.accept(t); }
        catch (Exception exception) { throwAsUnchecked(exception); }
        };
    }

public static <T, U, E extends Exception> BiConsumer<T, U> rethrowBiConsumer(BiConsumer_WithExceptions<T, U, E> biConsumer) throws E {
    return (t, u) -> {
        try { biConsumer.accept(t, u); }
        catch (Exception exception) { throwAsUnchecked(exception); }
        };
    }

/** .map(rethrowFunction(name -> Class.forName(name))) or .map(rethrowFunction(Class::forName)) */
public static <T, R, E extends Exception> Function<T, R> rethrowFunction(Function_WithExceptions<T, R, E> function) throws E {
    return t -> {
        try { return function.apply(t); }
        catch (Exception exception) { throwAsUnchecked(exception); return null; }
        };
    }

/** rethrowSupplier(() -> new StringJoiner(new String(new byte[]{77, 97, 114, 107}, "UTF-8"))), */
public static <T, E extends Exception> Supplier<T> rethrowSupplier(Supplier_WithExceptions<T, E> function) throws E {
    return () -> {
        try { return function.get(); }
        catch (Exception exception) { throwAsUnchecked(exception); return null; }
        };
    }

/** uncheck(() -> Class.forName("xxx")); */
public static void uncheck(Runnable_WithExceptions t)
    {
    try { t.run(); }
    catch (Exception exception) { throwAsUnchecked(exception); }
    }

/** uncheck(() -> Class.forName("xxx")); */
public static <R, E extends Exception> R uncheck(Supplier_WithExceptions<R, E> supplier)
    {
    try { return supplier.get(); }
    catch (Exception exception) { throwAsUnchecked(exception); return null; }
    }

/** uncheck(Class::forName, "xxx"); */
public static <T, R, E extends Exception> R uncheck(Function_WithExceptions<T, R, E> function, T t) {
    try { return function.apply(t); }
    catch (Exception exception) { throwAsUnchecked(exception); return null; }
    }

@SuppressWarnings ("unchecked")
private static <E extends Throwable> void throwAsUnchecked(Exception exception) throws E { throw (E)exception; }

}

Many other examples on how to use it (after statically importing UtilException):

@Test
public void test_Consumer_with_checked_exceptions() throws IllegalAccessException {
    Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
          .forEach(rethrowConsumer(className -> System.out.println(Class.forName(className))));

    Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
          .forEach(rethrowConsumer(System.out::println));
    }

@Test
public void test_Function_with_checked_exceptions() throws ClassNotFoundException {
    List<Class> classes1
          = Stream.of("Object", "Integer", "String")
                  .map(rethrowFunction(className -> Class.forName("java.lang." + className)))
                  .collect(Collectors.toList());

    List<Class> classes2
          = Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
                  .map(rethrowFunction(Class::forName))
                  .collect(Collectors.toList());
    }

@Test
public void test_Supplier_with_checked_exceptions() throws ClassNotFoundException {
    Collector.of(
          rethrowSupplier(() -> new StringJoiner(new String(new byte[]{77, 97, 114, 107}, "UTF-8"))),
          StringJoiner::add, StringJoiner::merge, StringJoiner::toString);
    }

@Test    
public void test_uncheck_exception_thrown_by_method() {
    Class clazz1 = uncheck(() -> Class.forName("java.lang.String"));

    Class clazz2 = uncheck(Class::forName, "java.lang.String");
    }

@Test (expected = ClassNotFoundException.class)
public void test_if_correct_exception_is_still_thrown_by_method() {
    Class clazz3 = uncheck(Class::forName, "INVALID");
    }

But don't use it before understanding the following advantages, disadvantages, and limitations:

• If the calling-code is to handle the checked exception you MUST add it to the throws clause of the method that contains the stream. The compiler will not force you to add it anymore, so it's easier to forget it.

• If the calling-code already handles the checked exception, the compiler WILL remind you to add the throws clause to the method declaration that contains the stream (if you don't it will say: Exception is never thrown in body of corresponding try statement).

• In any case, you won't be able to surround the stream itself to catch the checked exception INSIDE the method that contains the stream (if you try, the compiler will say: Exception is never thrown in body of corresponding try statement).

• If you are calling a method which literally can never throw the exception that it declares, then you should not include the throws clause. For example: new String(byteArr, "UTF-8") throws UnsupportedEncodingException, but UTF-8 is guaranteed by the Java spec to always be present. Here, the throws declaration is a nuisance and any solution to silence it with minimal boilerplate is welcome.

• If you hate checked exceptions and feel they should never be added to the Java language to begin with (a growing number of people think this way, and I am NOT one of them), then just don't add the checked exception to the throws clause of the method that contains the stream. The checked exception will, then, behave just like an UNchecked exception.

• If you are implementing a strict interface where you don't have the option for adding a throws declaration, and yet throwing an exception is entirely appropriate, then wrapping an exception just to gain the privilege of throwing it results in a stacktrace with spurious exceptions which contribute no information about what actually went wrong. A good example is Runnable.run(), which does not throw any checked exceptions. In this case, you may decide not to add the checked exception to the throws clause of the method that contains the stream.

• In any case, if you decide NOT to add (or forget to add) the checked exception to the throws clause of the method that contains the stream, be aware of these 2 consequences of throwing CHECKED exceptions:

1) The calling-code won't be able to catch it by name (if you try, the compiler will say: Exception is never thrown in body of corresponding try statement). It will bubble and probably be catched in the main program loop by some "catch Exception" or "catch Throwable", which may be what you want anyway.

2) It violates the principle of least surprise: it will no longer be enough to catch RuntimeException to be able to guarantee catching all possible exceptions. For this reason, I believe this should not be done in framework code, but only in business code that you completely control.

In conclusion: I believe the limitations here are not serious, and the UtilException class may be used without fear. However, it's up to you!

  • References:
    • http://www.philandstuff.com/2012/04/28/sneakily-throwing-checked-exceptions.html
    • http://www.mail-archive.com/javaposse@googlegroups.com/msg05984.html
    • Project Lombok annotation: @SneakyThrows
    • Brian Goetz opinion (against) here: How can I throw CHECKED exceptions from inside Java 8 streams?
    • https://softwareengineering.stackexchange.com/questions/225931/workaround-for-java-checked-exceptions?newreg=ddf0dd15e8174af8ba52e091cf85688e *


回答4:

You can potentially roll your own Stream variant by wrapping your lambda to throw an unchecked exception and then later unwrapping that unchecked exception on terminal operations:

@FunctionalInterface
public interface ThrowingPredicate<T, X extends Throwable> {
    public boolean test(T t) throws X;
}

@FunctionalInterface
public interface ThrowingFunction<T, R, X extends Throwable> {
    public R apply(T t) throws X;
}

@FunctionalInterface
public interface ThrowingSupplier<R, X extends Throwable> {
    public R get() throws X;
}

public interface ThrowingStream<T, X extends Throwable> {
    public ThrowingStream<T, X> filter(
            ThrowingPredicate<? super T, ? extends X> predicate);

    public <R> ThrowingStream<T, R> map(
            ThrowingFunction<? super T, ? extends R, ? extends X> mapper);

    public <A, R> R collect(Collector<? super T, A, R> collector) throws X;

    // etc
}

class StreamAdapter<T, X extends Throwable> implements ThrowingStream<T, X> {
    private static class AdapterException extends RuntimeException {
        public AdapterException(Throwable cause) {
            super(cause);
        }
    }

    private final Stream<T> delegate;
    private final Class<X> x;

    StreamAdapter(Stream<T> delegate, Class<X> x) {
        this.delegate = delegate;
        this.x = x;
    }

    private <R> R maskException(ThrowingSupplier<R, X> method) {
        try {
            return method.get();
        } catch (Throwable t) {
            if (x.isInstance(t)) {
                throw new AdapterException(t);
            } else {
                throw t;
            }
        }
    }

    @Override
    public ThrowingStream<T, X> filter(ThrowingPredicate<T, X> predicate) {
        return new StreamAdapter<>(
                delegate.filter(t -> maskException(() -> predicate.test(t))), x);
    }

    @Override
    public <R> ThrowingStream<R, X> map(ThrowingFunction<T, R, X> mapper) {
        return new StreamAdapter<>(
                delegate.map(t -> maskException(() -> mapper.apply(t))), x);
    }

    private <R> R unmaskException(Supplier<R> method) throws X {
        try {
            return method.get();
        } catch (AdapterException e) {
            throw x.cast(e.getCause());
        }
    }

    @Override
    public <A, R> R collect(Collector<T, A, R> collector) throws X {
        return unmaskException(() -> delegate.collect(collector));
    }
}

Then you could use this the same exact way as a Stream:

Stream<Account> s = accounts.values().stream();
ThrowingStream<Account, IOException> ts = new StreamAdapter<>(s, IOException.class);
return ts.filter(Account::isActive).map(Account::getNumber).collect(toSet());

This solution would require quite a bit of boilerplate, so I suggest you take a look at the library I already made which does exactly what I have described here for the entire Stream class (and more!).



回答5:

Use #propagate() method. Sample non-Guava implementation from Java 8 Blog by Sam Beran:

public class Throwables {
    public interface ExceptionWrapper<E> {
        E wrap(Exception e);
    }

    public static <T> T propagate(Callable<T> callable) throws RuntimeException {
        return propagate(callable, RuntimeException::new);
    }

    public static <T, E extends Throwable> T propagate(Callable<T> callable, ExceptionWrapper<E> wrapper) throws E {
        try {
            return callable.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw wrapper.wrap(e);
        }
    }
}


回答6:

It can be resolved by below simple code with Stream and Try in AbacusUtil:

Stream.of(accounts).filter(a -> Try.call(a::isActive)).map(a -> Try.call(a::getNumber)).toSet();

Disclosure: I'm the developer of AbacusUtil.



回答7:

To properly add IOException (to RuntimeException) handling code, your method will look like this:

Stream<Account> s =  accounts.values().stream();

s = s.filter(a -> { try { return a.isActive(); } 
  catch (IOException e) { throw new RuntimeException(e); }});

Stream<String> ss = s.map(a -> { try { return a.getNumber() }
  catch (IOException e) { throw new RuntimeException(e); }});

return ss.collect(Collectors.toSet());

The problem now is that the IOException will have to be captured as a RuntimeException and converted back to an IOException -- and that will add even more code to the above method.

Why use Stream when it can be done just like this -- and the method throws IOException so no extra code is needed for that too:

Set<String> set = new HashSet<>();
for(Account a: accounts.values()){
  if(a.isActive()){
     set.add(a.getNumber());
  } 
}
return set;


回答8:

Extending @marcg solution, you can normally throw and catch a checked exception in Streams; that is, compiler will ask you to catch/re-throw as is you were outside streams!!

@FunctionalInterface
public interface Predicate_WithExceptions<T, E extends Exception> {
    boolean test(T t) throws E;
}

/**
 * .filter(rethrowPredicate(t -> t.isActive()))
 */
public static <T, E extends Exception> Predicate<T> rethrowPredicate(Predicate_WithExceptions<T, E> predicate) throws E {
    return t -> {
        try {
            return predicate.test(t);
        } catch (Exception exception) {
            return throwActualException(exception);
        }
    };
}

@SuppressWarnings("unchecked")
private static <T, E extends Exception> T throwActualException(Exception exception) throws E {
    throw (E) exception;
}

Then, your example would be written as follows (adding tests to show it more clearly):

@Test
public void testPredicate() throws MyTestException {
    List<String> nonEmptyStrings = Stream.of("ciao", "")
            .filter(rethrowPredicate(s -> notEmpty(s)))
            .collect(toList());
    assertEquals(1, nonEmptyStrings.size());
    assertEquals("ciao", nonEmptyStrings.get(0));
}

private class MyTestException extends Exception { }

private boolean notEmpty(String value) throws MyTestException {
    if(value==null) {
        throw new MyTestException();
    }
    return !value.isEmpty();
}

@Test
public void testPredicateRaisingException() throws MyTestException {
    try {
        Stream.of("ciao", null)
                .filter(rethrowPredicate(s -> notEmpty(s)))
                .collect(toList());
        fail();
    } catch (MyTestException e) {
        //OK
    }
}


回答9:

This does not directly answer the question (there are many other answers that do) but tries to avoid the problem in the first place:

In my experience the need to handle exceptions in a Stream (or other lambda expression) often comes from the fact that the exceptions are declared to be thrown from methods where they should not be thrown. This often comes from mixing business logic with in- and output. Your Account interface is a perfect example:

interface Account {
    boolean isActive() throws IOException;
    String getNumber() throws IOException;
}

Instead of throwing an IOException on each getter, consider this design:

interface AccountReader {
    Account readAccount(…) throws IOException;
}

interface Account {
    boolean isActive();
    String getNumber();
}

The method AccountReader.readAccount(…) could read an account from a database or a file or whatever and throw an exception if it does not succeed. It constructs an Account object that already contains all values, ready to be used. As the values have already been loaded by readAccount(…), the getters would not throw an exception. Thus you can use them freely in lambdas without the need of wrapping, masking or hiding the exceptions.

Of course it is not always possible to do it the way I described, but often it is and it leads to cleaner code altogether (IMHO):

  • Better separation of concerns and following single responsibility principle
  • Less boilerplate: You don't have to clutter your code with throws IOException for no use but to satisfy the compiler
  • Error handling: You handle the errors where they happen - when reading from a file or database - instead of somewhere in the middle of your business logic only because you want to get a fields value
  • You may be able to make Account immutable and profit from the advantages thereof (e.g. thread safety)
  • You don't need "dirty tricks" or workarounds to use Account in lambdas (e.g. in a Stream)


回答10:

Keeping this issue in mind I developed a small library for dealing with checked exceptions and lambdas. Custom adapters allow you to integrate with existing functional types:

stream().map(unchecked(URI::new)) //with a static import

https://github.com/TouK/ThrowingFunction/



回答11:

Your example can be written as:

import utils.stream.Unthrow;

class Bank{
   ....
   public Set<String> getActiveAccountNumbers() {
       return accounts.values().stream()
           .filter(a -> Unthrow.wrap(() -> a.isActive()))
           .map(a -> Unthrow.wrap(() -> a.getNumber()))
           .collect(Collectors.toSet());
   }
   ....
}

The Unthrow class can be taken here https://github.com/SeregaLBN/StreamUnthrower



回答12:

If you don't mind using 3rd party libraries, AOL's cyclops-react lib, disclosure::I am a contributor, has a ExceptionSoftener class that can help here.

 s.filter(softenPredicate(a->a.isActive()));


回答13:

The functional interfaces in Java don’t declare any checked or unchecked exception. We need to change the signature of the methods from:

boolean isActive() throws IOException; 
String getNumber() throwsIOException;

To:

boolean isActive();
String getNumber();

Or handle it with try-catch block:

public Set<String> getActiveAccountNumbers() {
  Stream<Account> s =  accounts.values().stream();
  s = s.filter(a -> 
    try{
      a.isActive();
    }catch(IOException e){
      throw new RuntimeException(e);
    }
  );
  Stream<String> ss = s.map(a -> 
    try{
      a.getNumber();
    }catch(IOException e){
      throw new RuntimeException(e);
    }
  );
  return ss.collect(Collectors.toSet());
}

Another option is to write a custom wrapper or use a library like ThrowingFunction. With the library we only need to add the dependency to our pom.xml:

<dependency>
    <groupId>pl.touk</groupId>
    <artifactId>throwing-function</artifactId>
    <version>1.3</version>
</dependency>

And use the specific classes like ThrowingFunction, ThrowingConsumer, ThrowingPredicate, ThrowingRunnable, ThrowingSupplier.

At the end the code looks like this:

public Set<String> getActiveAccountNumbers() {
  return accounts.values().stream()
    .filter(ThrowingPredicate.unchecked(Account::isActive))
    .map(ThrowingFunction.unchecked(Account::getNumber))
    .collect(Collectors.toSet());
}


标签: