How do I convert this spring-integration configura

2019-08-28 19:07发布

问题:

This particular piece makes sense to implement in the application rather than XML because it is a constant across the entire cluster, not localized to a single job.

From dissecting the XSD, it looks to me like the xml for int-kafka:outbound-channel-adapter constructs a KafkaProducerMessageHandler.

There is no visible way to set the channel, the topic, or most of the other attributes.

Note to potential downvoters - (rant on) I have been RTFM'ing for a week and am more confused than when I started. My choice of language has graduated from adjectives through adverbs, and I'm starting to borrow words from other languages. The answer may be in there. But if it is, it is not locatable by mere mortals. (rant off)

XML configuration:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="kafkaTemplate"
                                    auto-startup="false"
                                    channel="outbound-staging"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

If so, then I would expect the java config to look something like this:

@Bean
public KafkaProducerMessageHandler kafkaOutboundChannelAdapter () {
    KafkaProducerMessageHandler result = new KafkaProducerMessageHandler(kafkaTemplate());

    result.set????? ();    // WTH?? No methods for most of the attributes?!!!

    return result;
}

EDIT: Additional information about the high level problem being solved

As a part of a larger project, I am trying to implement the textbook example from https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-partitioning , with Kafka backing instead of JMS backing.

I believe the final integration flow should be something like this:

partitionHandler -> messagingTemplate -> outbound-requests (DirectChannel) -> outbound-staging (KafkaProducerMessageHandler) -> kafka

kafka -> executionContainer (KafkaMessageListenerContainer) -> inboundKafkaRequests (KafkaMessageDrivenChannelAdapter) -> inbound-requests (DirectChannel) -> serviceActivator (StepExecutionRequestHandler)

serviceActivator (StepExecutionRequestHandler) -> reply-staging (KafkaProducerMessageHandler) -> kafka

kafka -> replyContainer (KafkaMessageListenerContainer) -> inboundKafkaReplies (KafkaMessageDrivenChannelAdapter) -> inbound-replies (DirectChannel) -> partitionhandler

回答1:

Not sure what you mean that they are missed, but this is what I see in the source code of that KafkaProducerMessageHandler:

public void setTopicExpression(Expression topicExpression) {
    this.topicExpression = topicExpression;
}

public void setMessageKeyExpression(Expression messageKeyExpression) {
    this.messageKeyExpression = messageKeyExpression;
}

public void setPartitionIdExpression(Expression partitionIdExpression) {
    this.partitionIdExpression = partitionIdExpression;
}

/**
 * Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
 * The resulting value should be a {@link Long} type representing epoch time in milliseconds.
 * @param timestampExpression the {@link Expression} for timestamp to wait for result
 * fo send operation.
 * @since 2.3
 */
public void setTimestampExpression(Expression timestampExpression) {
    this.timestampExpression = timestampExpression;
}

and so on.

You also have access to the super class setters, for example a setSync() for your XML variant.

The input-channel is not a MessageHandler responsibility. It goes to the Endpoint and can be confgigured via @ServiceActivator alongside with that @Bean.

See more info in the Core Spring Integration Reference Manual: https://docs.spring.io/spring-integration/reference/html/#annotations_on_beans

Also there is very important chapter in the beginning: https://docs.spring.io/spring-integration/reference/html/#programming-tips

In addition it might be better to consider to use Java DSL instead of direct MessageHandler usage:

             Kafka
                .outboundChannelAdapter(producerFactory)
                .sync(true)
                .messageKey(m -> m
                        .getHeaders()
                        .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .headerMapper(mapper())
                .partitionId(m -> 0)
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic))
                .get();

See more info about Java DSL in the mentioned Spring Integration Docs: https://docs.spring.io/spring-integration/reference/html/#java-dsl