WebFlux not sending data when using application/st

2019-09-02 16:10发布

问题:

I have a reactive core WebClient to post to a given endpoint. The payload is a flux of Job objects and the content-type is application/stream+json Flux jobFlux = Flux.just(new Job());

Mono<JsonNode> response = localEP.post().uri( "/dev/job" )
    .contentType(MediaType.APPLICATION_STREAM_JSON)
    .body( BodyInserters.fromObject(jobFlux))
    .retrieve()
    .bodyToMono( JsonNode.class );

On the server end I have tried both a Spring Controller style and Spring Web Reactive FunctionHandler to process the payload of the above call with a payload that is a Flux.

  @PostMapping(path = "/dev/job", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
  @ResponseStatus( HttpStatus.CREATED )
 public Mono<Void> loadJobs (@RequestBody Flux<Job> jobs) {
    return this.repository.create(jobs); 
 }

The domain class Job creates and id when a new object is instantiated:

  public Job() {
    UUID guid = UUID.randomUUID();
    id = guid.toString();
    title = "Random String";
  }

The repository currently is just a stub:

@Repository
public class DemoJobRepository implements ReactiveRepository<Job> {
   private static Logger logger = LoggerFactory.getLogger(DemoJobRepository.class);
   private final List<Job> jobs = Lists.newArrayList();

@Override
public Mono<Void> create(Publisher<Job> jobStream) {
    return Flux.from(jobStream).doOnNext(jobs::add).then();
}

@Override
public Flux<Job> getAll() {
    return Flux.fromIterable(jobs);
}

@Override
public Mono<Job> findById(String id) {
    return null;
}
}

I don't see the client attempting to send the requestbody. We I called block on the client to get the result, I see the client send the request, however the server endpoint always sees an empty flux. Please, any help is very appreciate.

回答1:

With reactive types, nothing happens until you subscribe - building the reactive pipeline will not execute what's it's supposed to do, only subscribing to it will start the process.

There are several ways to subscribe:

  • calling any of the subscribe variants. Many of them take lambdas as parameters that are executed when the processing is done, or ended with an error. Using the empty variant is a bit risky because it just launches the execution but you get no callback whatsoever. Technically, nothing is waiting on it so the JVM could exit before the processing is done
  • calling any of the block/collect methods. This not only subscribes but also returns the expected value(s).

    Both of those choices should never be done within a method that returns a reactive type, otherwise this will lead to serious problems in your application.