Spring Integration manually start/stop channel ada

2019-01-15 08:39发布

Is there anyway to manually start/init a channel adapter?

I have two pairs of inbound/outbound adapters in my context.xml and would like to decide at runtime which one of them I want to get started.

EDIT:

The concrete scenario:
I have a client, that can be configured at runtime to be an mqtt publisher or subscriber.
My context.xml looks like this:

<int-mqtt:message-driven-channel-adapter 
    client-id="foo"
    auto-startup="true"
    url="tcp://192.168.97.164:1883"
    topics="testtopic/#"
    channel="writeToFile" />

<file:outbound-channel-adapter
    id="writeToFile"
    auto-startup="true"
    directory="./test/out"
    delete-source-files="false"/>

<int:transformer id="Transformer"
    ref="MessageTransformer"
    input-channel="readFromFile"
    output-channel="mqttOut"
    method="bytesFromFile" />

<bean id="MessageTransformer" class="MessageTransformer"/>

<int-mqtt:outbound-channel-adapter 
    id="mqttOut"
    client-id="foo"
    url="tcp://192.168.97.164:1883"
    auto-startup="false"
    default-qos="1"
    default-retained="true"
    default-topic="testtopic/bla"
    />

    <file:inbound-channel-adapter
    auto-startup="false" 
    id="readFromFile"
    directory="./test/in"
    filename-pattern="myFile*">
    <int:poller id="poller"
        fixed-rate="5000" />     
</file:inbound-channel-adapter>


As you can see, I have two settings:
1. Subscriber case: Read mqtt message -> Write to file
2. Publisher case: Poll a file from directory -> Send via mqtt

I decide at runtime what setting is to be applied.

So can you kindly tell me how this control-bus thing would fit here exactly?

3条回答
我只想做你的唯一
2楼-- · 2019-01-15 08:59

I was looking for same example using spring integration Java DSL, but haven't found anything, so I've created my own. It shows to be pretty simple to configure.

@Bean
public IntegrationFlow controlBus() {
    return IntegrationFlows.from(controlChannel())
            .controlBus()
            .get();
}

@Bean
public MessageChannel controlChannel() {
    return MessageChannels.direct().get();
}

To Stop it:

controlChannel.send(new GenericMessage<>("@myInboundAdapter.stop()"));

https://github.com/CauchyPeano/sftp-poller-control-bus

查看更多
Root(大扎)
3楼-- · 2019-01-15 09:00

To achieve this, you need to first set the channel-adapter auto-startup property to false auto-startup="false" and then using control bus start/stop the adapter

See here control bus example - https://github.com/spring-projects/spring-integration-samples/tree/master/basic/control-bus

查看更多
放我归山
4楼-- · 2019-01-15 09:12

Set autoStartup="false" and either directly start()/stop() them, or use a <control-bus/> (send @myAdapter.start()).

Getting a direct reference (autowire etc), depends on the endpoint type. If it's a polled endpoint, inject a SourcePollingChannelAdapter; message-driven adapters vary, but generally are a MessageProducerSupport or MessagingGatewaySupport.

EDIT:

Read about the control-bus here.

Give the inbound adapter an id attribute.

Add <control-bus input-channel="control"/>

Add <int:gateway service-interface="foo.Controller" default-request-channel="control"/>

Create a gateway interface

public interface Controller {

    void control(String command);

}

@Autowire the gateway (or use context.getBean(Controller.class)).

Then, when you are ready to start the adapter, call, e.g. gateway.control("@mqttOut.start()").

You don't need auto-startup="false" on the outbound adapters.

However, for a simple use case like this, you might want to investigate using Spring profiles instead (put the adapters in a profile and enable the profile at runtime.

查看更多
登录 后发表回答