Generating a reply when using an outbound channel

2019-07-14 05:54发布

问题:

I have the following simplified spring integration flow:

int-ws:inbound-gateway ----> int:transformer ----> int-kafka:outbound-channel-adapter

Basically:

  1. A web service endpoint is exposed using the int-ws:inbound-gateway
  2. Messages from this endpoint are put onto the input channel (the first --->)
  3. A custom transformer translates the payload JSON format and adds the MESSAGE_KEY header (required for kafka)
  4. The message is placed onto the inputToKafka channel (the second --->)
  5. The int-kafka:outbound-channel-adapter pushes the message to a kafka topic

The web service operation has a request and a response payload.
The request payload is what I'm transforming into the JSON message.
I would like to return a response payload (which will be marshalled etc) once the message is placed on the kafka topic by the int-kafka:outbound-channel-adapter

How do I do this?

At the moment, when I invoke the web service everything works as expected, but I have to set a reply-timeout on the int-ws:inbound-gateway so that it doesn't hang. When I do this, I simply get an empty response back on SOAPUI.

I understand the concepts in the section Gateway behavior when no response arrives - but in my case I do want to generate a response.

This is my integration context (without the kafka broker config etc):

<int-ws:inbound-gateway id="ws-inbound-gateway" request-channel="input"
                        marshaller="marshaller" unmarshaller="marshaller" reply-timeout="100"/>

<int:channel id="input"/>

<int:transformer input-channel="input" output-channel="inputToKafka" method="transform">
    <bean class="com.test.InputToJSONTransformer"/>
</int:transformer>

<int:channel id="inputToKafka"/>

<int-kafka:outbound-channel-adapter kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="true"
                                    channel="inputToKafka"
                                    order="1">
</int-kafka:outbound-channel-adapter>

回答1:

Change

<int:channel id="inputToKafka"/>

to

<int:publish-subscribe-channel id="inputToKafka"/>

Add a second subscriber to the channel.

<service-activator input-channel="inputToKafka" ... order="2" />

Where the service generates the response; it will be invoked after a successful send to kafka.

Do not include an output-channel; the framework will take care of routing the service output back to the ws gateway.



回答2:

What kind of a response are you expecting from Kafka outbound channel adapter, remember it is Adapter not a Gateway

You could however introduce another component, ServiceActivator With an input channel sendToInputToKafka now your transformer will output to this channel.

Your 'ServiceActivator' should have an @Autowire MessageChannel inputToKafka and should manually send a message to this channel programmatically. After sending that message, you will construct your desired response as a return type of the ServiceActivator and a response to your ws gateway