Multiple message processed

2019-08-06 06:22发布

问题:

I have a spring xd source module which pulls file from s3 and splits line by line.I have my spring config as below.But I have 3 container and 1 admin server.Now I see duplicate message being processed by each container as each of them is downloading there own copy. I can solve with making source s3 module deployment count as 1 but my processing of message is getting slow.?Any inputs to solve this?

   <int:poller fixed-delay="${fixedDelay}" default="true">
    <int:advice-chain>
            <ref bean="pollAdvise"/>

            </int:advice-chain>
    </int:poller>


    <bean id="pollAdvise" 

    </bean>






    <bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
        <property name="accessKey" value="#{encryptedDatum.decryptBase64Encoded('${accessKey}')}"/>
        <property name="secretKey" value="${secretKey}"/>
    </bean>


    <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
        <property name="proxyHost" value="${proxyHost}"/>
        <property name="proxyPort" value="${proxyPort}"/>
    <property name="preemptiveBasicProxyAuth" value="false"/> 
    </bean>

    <bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
        <constructor-arg index="0" ref="credentials"/>
        <constructor-arg index="1" ref="clientConfiguration"/>
        <property name="awsEndpoint" value="s3.amazonaws.com"/>
        <property name="temporaryDirectory" value="${temporaryDirectory}"/>
        <property name="awsSecurityKey"  value="${awsSecurityKey}"/>
    </bean>

    <bean id="encryptedDatum" class="abc"/>

    <!-- aws-endpoint="https://s3.amazonaws.com"  -->
    <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
                                        bucket="${bucket}"
                                        s3-operations="s3Operations"
                                        credentials-ref="credentials"
                                        file-name-wildcard="${fileNameWildcard}"
                                        remote-directory="${remoteDirectory}"
                                        channel="splitChannel"
                                        local-directory="${localDirectory}"
                                        accept-sub-folders="false"
                                        delete-source-files="true"
                                        archive-bucket="${archiveBucket}"
                                        archive-directory="${archiveDirectory}">
    </int-aws:s3-inbound-channel-adapter>

  <int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">

        <int-file:request-handler-advice-chain>
            <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                <property name="onSuccessExpression" value="payload.delete()"/>
            </bean>
        </int-file:request-handler-advice-chain>

</int-file:splitter>

    <int:channel id="output"/>

[Updated] I added the idempotency as suggested by you with a metadata store.But since my xd is running in 3 container cluster with rabbit will simple metadatastore work?I think I should use reds/mongo metadata source.If I use mongo/redis metadatastore howcan i evict/remove the messages because messages will pile up over time?

<int:idempotent-receiver id="expressionInterceptor" endpoint="output"
                              metadata-store="store"
                             discard-channel="nullChannel"
                             throw-exception-on-rejection="false"
                              key-expression="payload"/>

    <bean id="store" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

回答1:

I can suggest you to take a look to the Idempotent Receiver.

With that you can use shared MetadataStore and don't accept duplicate files.

The <idempotent-receiver> should be configured for that your <int-file:splitter>. And yes: with the discard logic to avoid duplicate message.

UPDATE

.But since my xd is running in 3 container cluster with rabbit will simple metadatastore work?

That doesn't matter because you start the stream from the S3 MessageSource, so you should filter files already there. Therefore you need external shared MetadataStore.

.If I use mongo/redis metadatastore howcan i evict/remove the messages because messages will pile up over time?

That's correct. It is a side affect of the Idempotent Receiver logic. Not sure how it is a problem for you if you use a DataBase...

You can clean the collection/keys by some periodic task. Maybe once a week...