Can I use block() method of Flux returned from Spr

2020-03-18 06:31发布

I created Spring Boot 2.0 demo application which contains two applications that communicate using WebClient. And I'm suffering that they often stop communicating when I use block() method of Flux from the WebClient's response. I want to use List not Flux by some reasons.

The server side application is like this. It just returns Flux object.

@GetMapping
public Flux<Item> findAll() {
    return Flux.fromIterable(items);
}

And the client side (or BFF side) application is like this. I get Flux from the server and convert it to List by calling block() method.

@GetMapping
public List<Item> findBlock() {
    return webClient.get()
        .retrieve()
        .bodyToFlux(Item.class)
        .collectList()
        .block(Duration.ofSeconds(10L));
}

While it works well at first, findBlock() won't respond and timeouts after several times access. When I modify the findBlock() method to return Flux deleting collectList() and block(), it works well. Then I assume that block() method cause this problem.
And, when I modify the findAll() method to return List, nothing changes.

Source code of the entire example application is here.
https://github.com/cero-t/webclient-example

"resource" is the server application, and "front" is the client application. After running both application, when I access to localhost:8080 it works well and I can reload any times, but when I access to localhost:8080/block it seems to work well but after several reloads it won't respond.


By the way, when I add "spring-boot-starter-web" dependency to the "front" applications's (not resource application's) pom.xml, which means I use tomcat, this problem never happens. Is this problem due to Netty server?

Any guidance would be greatly appreciated.

1条回答
我欲成王,谁敢阻挡
2楼-- · 2020-03-18 07:16

First, let me point that using Flux.fromIterable(items) is advised only if items has been fetched from memory, no I/O involved. Otherwise chances are you'd be using a blocking API to get it - and this can break your reactive application. In this case, this is an in-memory list, so no problem. Note that you can also go Flux.just(item1, item2, item3).

Using the following is the most efficient:

@GetMapping("/")
public Flux<Item> findFlux() {
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class);
}

Item instances will be read/written, decoded/encoded on the fly in a very efficient way.

On the other hand, this is not the preferred way:

@GetMapping("/block")
public List<Item> findBlock() {
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class)
    .collectList()
    .block(Duration.ofSeconds(10L));
}

In this case, your front application is buffering in memory the whole items list with collectList but is also blocking one of the few server threads available. This might cause very poor performance because your server might be blocked waiting for that data and can't service other requests at the same time.

In this particular case it's worse, since the application totally breaks. Looking at the console, we can see the following:

WARN 3075 --- [ctor-http-nio-7] io.netty.util.concurrent.DefaultPromise  : An exception was thrown by reactor.ipc.netty.channel.PooledClientContextHandler$$Lambda$532/356589024.operationComplete()

reactor.core.Exceptions$BubblingException: java.lang.IllegalArgumentException: Channel [id: 0xab15f050, L:/127.0.0.1:59350 - R:localhost/127.0.0.1:8081] was not acquired from this ChannelPool
    at reactor.core.Exceptions.bubble(Exceptions.java:154) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]

This is probably linked to a reactor-netty client connection pool issue that should be fixed in 0.7.4.RELEASE. I don't know the specifics of this, but I suspect the whole connection pool gets corrupted as HTTP responses aren't properly read from the client connections.

Adding spring-boot-starter-web does make your application use Tomcat, but it mainly turns your Spring WebFlux application into a Spring MVC application (which now supports some reactive return types, but has a different runtime model). If you wish to test your application with Tomcat, you can add spring-boot-starter-tomcat to your POM and this will use Tomcat with Spring WebFlux.

查看更多
登录 后发表回答