Regarding the answer posted for How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels?, what is the correct way to handle errors at the @MessagingGateway that can be returned from Spring Cloud Stream services?
To recap, I have a @MessagingGateway that provides synchronous access to asynchronous services built using Spring Cloud Stream. When an error occurs within my Spring Cloud Stream service layer, I create an error response and send it through a SubscribableChannel to other @StreamListener services that process the errors.
For example, when an account is created, I send a message to the accountCreated
channel. When an error occurs I send an error response to the accountNotCreated
channel.
This works fine, but I also want send an error response to the client of the @MessagingGateway so they receive the error response synchronously. The @MessagingGateway annotation has an errorChannel
attribute, but the @Gateway annotation does not. So, the client of the @MessagingGateway should be able to block and wait for either 1) an account to be created or 2) an error response.
Again, the goal here is to build "backend" services that utilize Spring Cloud Stream for transactional services (i.e., those that create, update, or delete data) while at the same time provide our clients "gateway" access that block and wait for the responses to be returned. The solution Artem Bilan provided me works for the happy path, but when an error occurs, that's where I am not clear on how Spring Integration is best suited to handle this.
UPDATE with code example
GatewayApplication.java
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {
@Component
public interface GatewayChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Input(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Output(TO_UPPERCASE_REQUEST)
SubscribableChannel toUppercaseRequest();
}
@MessagingGateway
public interface StreamGateway {
public static final String ENRICH = "enrich";
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
StringWrapper process(StringWrapper payload) throws MyException;
}
@RestController
public class UppercaseController {
@Autowired
StreamGateway gateway;
@GetMapping(value = "/string/{string}",
produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
public ResponseEntity<StringWrapper> getUser(@PathVariable("string") String string) {
try {
StringWrapper result = gateway.process(new StringWrapper(string));
// Instead of catching the exception in the below catch clause, here we have just a string
// representation of the stack trace when an exception occurs.
return new ResponseEntity<StringWrapper>(result, HttpStatus.OK);
} catch (MyException e) {
// Why is the exception not caught here?
return new ResponseEntity<StringWrapper>(new StringWrapper("An error has occurred"),
HttpStatus.INTERNAL_SERVER_ERROR);
}
}
}
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(StreamGateway.ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
}
}
application.yml
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
content-type: application/json
group: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
content-type: application/json
producer:
required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
port: 8088
StringWrapper.java (use across all three projects)
package com.example.demo;
import com.fasterxml.jackson.annotation.JsonProperty;
public class StringWrapper {
@JsonProperty
private String string;
@JsonProperty
private long time = System.currentTimeMillis();
public StringWrapper() {
super();
}
public StringWrapper(String string) {
this.string = string;
}
public String getString() {
return string;
}
public long getTime() {
return time;
}
public void setString(String string) {
this.string = string;
}
@Override
public String toString() {
return "StringWrapper [string=" + string + ", time=" + time + "]";
}
}
CloudStreamApplication.java
package com.example.demo;
import java.util.Random;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {
@Component
interface CloudStreamChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Output(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Input(TO_UPPERCASE_REQUEST)
SubscribableChannel toUppercaseRequest();
}
@Component
public class Processor {
@Autowired
CloudStreamChannels channels;
@StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
public void process(Message<StringWrapper> request) {
StringWrapper uppercase = null;
try {
uppercase = toUppercase(request);
} catch (MyException e) {
channels.toUppercaseReply()
.send(MessageBuilder.withPayload(e).setHeader("__TypeId__", e.getClass().getName())
.copyHeaders(request.getHeaders()).build());
}
if (uppercase != null) {
channels.toUppercaseReply()
.send(MessageBuilder.withPayload(uppercase)
.setHeader("__TypeId__", StringWrapper.class.getName())
.copyHeaders(request.getHeaders()).build());
}
}
private StringWrapper toUppercase(Message<StringWrapper> request) throws MyException {
Random random = new Random();
int number = random.nextInt(50) + 1;
if (number > 25) {
throw new MyException("An error occurred.");
}
return new StringWrapper(request.getPayload().getString().toUpperCase());
}
}
public static void main(String[] args) {
SpringApplication.run(CloudStreamApplication.class, args);
}
}
application.yml
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
content-type: application/json
group: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
content-type: application/json
producer:
required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
port: 8088
StreamListenerApplication.java
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@EnableBinding({StreamListenerApplication.CloudStreamChannels.class})
@SpringBootApplication
public class StreamListenerApplication {
@Component
interface CloudStreamChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
@Input(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
}
public static void main(String[] args) {
SpringApplication.run(StreamListenerApplication.class, args);
}
@Autowired
CloudStreamChannels channels;
@StreamListener(CloudStreamChannels.TO_UPPERCASE_REPLY)
public void processToUppercaseReply(Message<StringWrapper> message) {
System.out.println("Processing message: " + message.getPayload());
}
}
application.yml
spring:
cloud:
stream:
bindings:
to-uppercase-reply:
destination: to-uppercase-reply
content-type: application/json
group: stream-listener-to-uppercase-reply
server:
port: 8089