I have developed asynchronous Spring Cloud Stream services, and I am trying to develop an edge service that uses @MessagingGateway to provide synchronous access to services that are async by nature.
I am currently getting the following stack trace:
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted
My @MessagingGateway:
@EnableBinding(AccountChannels.class)
@MessagingGateway
public interface AccountService {
@Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}
If I consume the message on the reply channel via a @StreamListener, it works just fine:
@HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
@StreamListener(AccountChannels.ACCOUNT_CREATED)
public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
try {
if (log.isInfoEnabled()) {
log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
}
} catch (JsonProcessingException e) {
log.error(e.getMessage(), e);
}
}
On the producer side, I am configuring requiredGroups
to ensure that multiple consumers can process the message, and correspondingly, the consumers have matching group
configurations.
Consumer:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
requiredGroups: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
group: accounts-edge-account-created
Producer:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
group: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
requiredGroups: accounts-edge-account-created
The bit of code on the producer side that processes the request and sends the response:
accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());
I can debug and see that the request is received and processed, but when the response is sent to the reply channel, that's when the error occurs.
To get the @MessagingGateway working, what configurations and/or code am I missing? I know I'm combining Spring Integration and Spring Cloud Gateway, so I'm not sure if using them together is causing the issues.
With Artem's help, I've found the solution I was looking for. I have taken the code Artem posted and split it into two services, a Gateway service and a CloudStream service. I also added a
@RestController
for testing purposes. This essentially mimics what I was wanting to do with durable queues. Thanks Artem for your assistance! I really appreciate your time! I hope this helps others who want to do the same thing.Gateway Code
Gateway Config (application.yml)
CloudStream Code
CloudStream Config (application.yml)
Hmm, I am a bit confused as well as to what you are trying to accomplish, but let's se if we can figure this out. Mixing SI and SCSt is definitely natural as one builds on another so all should work: Here is an example code snippet I just dug up from an old sample that exposes REST endpoint yet delegates (via Gateway) to Source's output channel. See if that helps:
It's good question and really good idea. But it isn't going to work so easy.
First of all we have to determine for ourselves that
gateway
meansrequest/reply
, thereforecorrelation
. And this available in@MessagingGateway
viareplyChannel
header in face ofTemporaryReplyChannel
instance. Even if you have an explicitreplyChannel = AccountChannels.ACCOUNT_CREATED
, the correlation is done only via the mentioned header and its value. The fact that thisTemporaryReplyChannel
is not serializable and can't be transferred over the network to the consumer on another side.Luckily Spring Integration provide some solution for us. It is a part of the
HeaderEnricher
and itsheaderChannelsToString
option behindHeaderChannelRegistry
:https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-transformation-chapter.html#header-enricher
But in this case you have to introduce an internal channel from the gateway to the
HeaderEnricher
and only the last one will send the message to theAccountChannels.CREATE_ACCOUNT_REQUEST
. So, thereplyChannel
header will be converted to a string representation and be able to travel over the network. On the consumer side when you send a reply you should ensure that you transfer thatreplyChannel
header as well, as it is. So, when the message will arrive to theAccountChannels.ACCOUNT_CREATED
on the producer side, where we have that@MessagingGateway
, the correlation mechanism is able to convert a channel identificator to the properTemporaryReplyChannel
and correlate the reply to the waiting gateway call.Only the problem here that your producer application must be as single consumer in the group for the
AccountChannels.ACCOUNT_CREATED
- we have to ensure that only one instance in the cloud is operating at a time. Just because only one instance has thatTemporaryReplyChannel
in its memory.More info about gateway: https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway
UPDATE
Some code for help:
UPDATE
Some simple application to demonstrate the PoC:
The
application.yml
:I use
spring-cloud-starter-stream-rabbit
.The
Does the trick copying request headers to the reply message. So, the gateway is able on the reply side to convert channel identifier in the headers to the appropriate
TemporaryReplyChannel
to convey the reply properly to the caller of gateway.The SCSt issue on the matter: https://github.com/spring-cloud/spring-cloud-stream/issues/815