Spring Integration jdbc:inbound-channel-adapter -

2019-03-06 14:44发布

问题:

I have a JDBC:inbound-channel-adapter : To set the 'max-rows-per-poll' dynamic to throttle the messages getting passed on the channel.

I have a QueueChannel which has a capacity of 200. The inbound-channel-adapter would be sending the message to this QueueChannel. I would like to set the 'max-rows-per-poll' value depending on the RemainingCapacity of the QueueChannel.

For this I tried to Inject the QueueChannel in a Bean but I get the error when deploying the war file.

Error: Cannot Inject the QueueChannel due to StateConversionError.

Is there any other way I could achieve this.

Update : I am using Spring-Integration-2.2.0.RC2

This is the config for jdbc-inbound-adapter:

<si-jdbc:inbound-channel-adapter id ="jdbcInboundAdapter" channel="queueChannel" data-source="myDataSource" auto-startup="true" query="${select.query}"
update="${update.query}" max-rows-per-poll="100"  row-mapper="rowMapper" update-per-row="true">
 <si:poller fixed-rate="5000">
    <si:transactional/>
     <si:advice-chain>
           <bean class="foo.bar.InboundAdapterPollingConfigurationImpl"/>
     </si:advice-chain>
  </si:poller>
</si-jdbc:inbound-channel-adapter>

Bean:

    @Service
public class InboundAdapterPollingConfigurationImpl implements InboundAdapterPollingConfiguration{

    private static final Logger logger = LoggerFactory.getLogger(InboundAdapterPollingConfigurationImpl.class);

    @Autowired
    QueueChannel queueChannel;
    @Autowired
    SourcePollingChannelAdapter jdbcInboundAdapter;

    public void setJdbcInboundAdapterMaxRowsPerPoll(){
        String size = String.valueOf(queueChannel.getRemainingCapacity());
        DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(jdbcInboundAdapter);      
        directFieldAccessor.setPropertyValue("setMaxRowsPerPoll", size);        
        String maxRowsPerPollSize = (String)directFieldAccessor.getPropertyValue("setMaxRowsPerPoll");
        System.out.println(maxRowsPerPollSize);
    }
}

The question is how to call the InboundAdapterPollingConfigurationImpl.setJdbcInboundAdapterMaxRowsPerPoll() method from the advice chain. Sorry for the naive question but t is my first time using the advice-chain. Also I am searching for an example but was not lucky yet.

Update2: Got the below error when this is executed:

JdbcPollingChannelAdapter source = (JdbcPollingChannelAdapter)dfa.getPropertyValue("source"); 

Error:

 java.lang.ClassCastException: $Proxy547 cannot be cast to org.springframework.integration.jdbc.JdbcPollingChannelAdapter – 

I have the JDK1.6_26. I read in one of the posts that this is happening in the early versions of JDK1.6.

回答1:

Well, let's try to investigate!

  1. max-rows-per-poll is a volatile property of JdbcPollingChannelAdapter with appropriate setter.
  2. As far as JdbcPollingChannelAdapter does the stuff within its receive() method just on the initiative of TaskScheduler.schedule() looks like changing that property at runtime is safe. It is a first point for our task
  3. QueueChannel has property getQueueSize(). As far as a capacity is your configuration option, so you can simply calculate a value for max-rows-per-poll
  4. And now how to get it worked? Actually you are interested in the value for max-rows-per-poll just on each poll. So, we should somehow to wedge into poller or polling task. Well, <poller> has advice-chain sub-element and we can write some Advice, which should change JdbcPollingChannelAdapter#setMaxRowsPerPoll before invoking receive() and the value should be based on QueueChannel#getQueueSize()
  5. Inject your QueueChannel to the bean of your Advice
  6. And now some bad point: how to inject JdbcPollingChannelAdapter bean? We provide a hook to register MessageSources as beans just only since Spring Integration 3.0. From here it's just enough to write this code:

    @Autowired @Qualifier("jdbcAdapter.source") private JdbcPollingChannelAdapter messageSource;

We are going to release 3.0.GA this week. So, let me do not consider the reflection 'forest' prior to Sring Integration 3.0. However you can do it using DirectFieldAccessor on injected SourcePollingChannelAdapter bean.

UPDATE

Your Advice may look like this:

public class MyAdvice implements MethodInterceptor {
       @Autowired
       QueueChannel queueChannel;

       @Autowired
       SourcePollingChannelAdapter jdbcInboundAdapter; 

      Object invoke(MethodInvocation invocation) throws Throwable {
            DirectFieldAccessor dfa = new DirectFieldAccessor(jdbcInboundAdapter);
            JdbcPollingChannelAdapter source = (JdbcPollingChannelAdapter) dfa.getPropertyValue("source");
            source.setMaxRowsPerPoll(queueChannel.getRemainingCapacity());
            return invocation.proceed();
      }
}

The theory is here: http://docs.spring.io/spring/docs/3.2.5.RELEASE/spring-framework-reference/htmlsingle/#aop