I have the following simplified spring integration flow:
int-ws:inbound-gateway ----> int:transformer ----> int-kafka:outbound-channel-adapter
Basically:
- A web service endpoint is exposed using the
int-ws:inbound-gateway
- Messages from this endpoint are put onto the
input
channel (the first--->
) - A custom transformer translates the payload JSON format and adds the MESSAGE_KEY header (required for kafka)
- The message is placed onto the
inputToKafka
channel (the second--->
) - The
int-kafka:outbound-channel-adapter
pushes the message to a kafka topic
The web service operation has a request and a response payload.
The request payload is what I'm transforming into the JSON message.
I would like to return a response payload (which will be marshalled etc) once the message is placed on the kafka topic by the int-kafka:outbound-channel-adapter
How do I do this?
At the moment, when I invoke the web service everything works as expected, but I have to set a reply-timeout
on the int-ws:inbound-gateway
so that it doesn't hang. When I do this, I simply get an empty response back on SOAPUI.
I understand the concepts in the section Gateway behavior when no response arrives - but in my case I do want to generate a response.
This is my integration context (without the kafka broker config etc):
<int-ws:inbound-gateway id="ws-inbound-gateway" request-channel="input"
marshaller="marshaller" unmarshaller="marshaller" reply-timeout="100"/>
<int:channel id="input"/>
<int:transformer input-channel="input" output-channel="inputToKafka" method="transform">
<bean class="com.test.InputToJSONTransformer"/>
</int:transformer>
<int:channel id="inputToKafka"/>
<int-kafka:outbound-channel-adapter kafka-producer-context-ref="kafkaProducerContext"
auto-startup="true"
channel="inputToKafka"
order="1">
</int-kafka:outbound-channel-adapter>
Change
<int:channel id="inputToKafka"/>
to
<int:publish-subscribe-channel id="inputToKafka"/>
Add a second subscriber to the channel.
<service-activator input-channel="inputToKafka" ... order="2" />
Where the service generates the response; it will be invoked after a successful send to kafka.
Do not include an
output-channel
; the framework will take care of routing the service output back to the ws gateway.What kind of a response are you expecting from Kafka outbound channel adapter, remember it is
Adapter
not aGateway
You could however introduce another component,
ServiceActivator
With an input channelsendToInputToKafka
now your transformer will output to this channel.Your 'ServiceActivator' should have an
@Autowire MessageChannel inputToKafka
and should manually send a message to this channel programmatically. After sending that message, you will construct your desired response as a return type of theServiceActivator
and a response to yourws gateway