Spring Reactive WebFlux reports empty flux when us

2019-08-27 03:29发布

问题:

I have a reactive core WebClient to post to a given endpoint. The payload is a flux of JsonNodes and the content-type is application/stream+json

JsonNode response = localEP.post().uri( "/createItem" )
    .contentType(MediaType.APPLICATION_STREAM_JSON)
    .body( BodyInserters.fromPublisher(itemData, JsonNode.class ))
    .retrieve()
    .bodyToMono( JsonNode.class )
    .block();

On the server end I have tried both a Spring Controller style and Spring Web Reactive FunctionHandler to process the payload of the above call with a payload that is a Flux.

  @PostMapping(path = "/dev/jobad/dynamo", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
  @ResponseStatus(HttpStatus.CREATED)
    public Flux<JsonNode> loadItems (@RequestBody Flux<JsonNode> items) {
       items.subscribe(storage::add);
       JsonNode response = new ObjectMapper().createObjectNode().put( "shady", "shade" );
      return Flux.just( response );
    }

The return to the client is always ok, however the server reports that the content of the flux is empty. If I change (@RequestBody Flux<JsonNode> items to (@RequestBody JsonNode items The payload is received fine. The WebClient logs appear to indicate that it has written the data on the wire and processed the response. However the body seems to empty

Reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.web.server.ServerWebInputException: Response status 400 with reason "Request body is missing: public reactor.core.publisher.Flux<com.fasterxml.jackson.databind.JsonNode> com.talroo.rest.JobResource.loadJobs(reactor.core.publisher.Flux<com.fasterxml.jackson.databind.JsonNode>)"
Caused by: org.springframework.web.server.ServerWebInputException: 
Response status 400 with reason "Request body is missing: public 
reactor.core.publisher.Flux<com.fasterxml.jackson.databind.JsonNode>

What do I need to do to be able to handle the request body of a post as a Flux?

回答1:

First, I don't think Spring officially supports reading/writing Jackson JsonNode instances directly from Controllers. Your application is supposed to ask for a domain object or something like a Map<String, String>.

Now in Jackson's model, a JsonNode represents any node in the JSON tree - as it is a tree, you can expect to get a Flux of nodes, but you are apparently able to get the root node - which explains the behavior you're seeing.

So I think your application should rather rely on higher-level classes and let Jackson deserialize them for you.

Note that your controller implementation is also breaking a few rules:

  • you should not call blocking operators, such as block, within a method that returns a reactive type (your controller is not breaking this one, but close)
  • you should not break the reactive pipeline and decouple the reading of the request and the writing of the response; chances are the HTTP exchange could be closed before your controller has a chance to read the whole request. Calling subscribe just does that.