spring integration ftp inbound adapter in distribu

2019-08-13 01:01发布

问题:

I am using Spring Integration: FTP Inbound Channel Adapter to read files from remote FTP server. My problem is, will it able to handle around 5 millions files per day?

If I deploy my project war on 2 different servers in an distributed mode, then will it problematic? Because on both the servers FTP Inbound Channel Adapter will be running. Then both the adapters will read same file twice. Please help me setting up this system in distributed mode.

EDIT:
I have set up my Spring Integration project war on 2 servers. It is using FTP Inbound Channel Adapter. Both servers' adapter remote-location is pointing to same ftp file location. When I start both the servers, then both the servers starts transferring same files and generates messages multiple times. I am using Redis MetaData Store as per Gary's suggestion. My Ftp Inbound Channel Adapter on both the servers looks like this:

<bean id="redisMessageStore" class="org.springframework.integration.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>

<bean name="metadataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

<bean id="fileSystemPersistantFilter" class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter"> 
    <constructor-arg name="store" ref="metadataStore"/> <constructor-arg name="prefix" value="" />
</bean>

<bean id="ftpPersistantFilter" class="org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter"> 
    <constructor-arg name="store" ref="metadataStore"/> <constructor-arg name="prefix" value="" />
</bean>
<int-ftp:inbound-channel-adapter id="ftpInboundAdapter"
    session-factory="ftpClientFactory" channel="ftpChannel"
    filter="ftpPersistantFilter"
    local-filter="fileSystemPersistantFilter" delete-remote-files="false"
    remote-directory="${ftp.remote_directory}/test/" local-directory="${ftp.local_directory}/test/"
    temporary-file-suffix=".writing" auto-create-local-directory="true">
    <int:poller fixed-rate="1000" max-messages-per-poll="-1" />
</int-ftp:inbound-channel-adapter>

The output log of 1st server is:

19-Feb-2016 10:34:41.634 INFO [task-scheduler-1] org.springframework.integration.file.FileReadingMessageSource.receive Created message: [GenericMessage [payload=/home/harsh/test/test_input_file1.txt, headers={id=1793c207-2d8a-542c-c5a7-eac9165e4cc5, timestamp=1455858281634}]]
19-Feb-2016 10:34:42.886 INFO [task-scheduler-4] org.springframework.integration.file.FileReadingMessageSource.receive Created message: [GenericMessage [payload=/home/harsh/test/test_input_file1.txt, headers={id=c909b6cc-9f78-2f6f-2a27-036f0186b959, timestamp=1455858282886}]]
File /home/harsh/test/test_input_file1.txt transformed by 1st war 1793c207-2d8a-542c-c5a7-eac9165e4cc5
File /home/harsh/test/test_input_file1.txt transformed by 1st war c909b6cc-9f78-2f6f-2a27-036f0186b959
19-Feb-2016 10:34:47.892 INFO [task-scheduler-4] org.springframework.integration.file.FileReadingMessageSource.receive Created message: [GenericMessage [payload=/home/harsh/test/test_input_file1.txt, headers={id=8c5c8941-fbfd-91d8-9a25-75d46e450930, timestamp=1455858287892}]]
19-Feb-2016 10:34:49.325 INFO [task-scheduler-2] org.springframework.integration.file.FileReadingMessageSource.receive Created message: [GenericMessage [payload=/home/harsh/test/test_input_file1.txt, headers={id=dbdddd0f-1ac5-0753-8873-f0f9c77cb48b, timestamp=1455858289325}]]
Service Activator /home/harsh/test/test_input_file1.txt 1st war 24632436-d297-db0c-c9ea-ac596c57a91e
19-Feb-2016 10:34:50.372 INFO [task-scheduler-2] org.springframework.integration.file.FileReadingMessageSource.receive Created message: [GenericMessage [payload=/home/harsh/test/test_input_file1.txt, headers={id=5cc843ae-c1d7-814f-b9fd-a7c5c2515674, timestamp=1455858290372}]]
19-Feb-2016 10:34:51.759 INFO [task-scheduler-2] org.springframework.integration.file.FileReadingMessageSource.receive Created message: [GenericMessage [payload=/home/harsh/test/test_input_file1.txt, headers={id=428ba015-e2f3-6948-fc13-ca0df31ee9c0, timestamp=1455858291759}]]
19-Feb-2016 10:34:53.670 INFO [task-scheduler-2] org.springframework.integration.file.FileReadingMessageSource.receive Created message: [GenericMessage [payload=/home/harsh/test/test_input_file1.txt, headers={id=ac1fca37-838f-39fc-f9ed-cc373f8f8b12, timestamp=1455858293670}]]
19-Feb-2016 10:34:55.648 INFO [task-scheduler-8] org.springframework.integration.file.FileReadingMessageSource.receive Created message: [GenericMessage [payload=/home/harsh/test/test_input_file1.txt, headers={id=f9197ec2-e73a-19be-e94b-94bffe515569, timestamp=1455858295647}]]
File /home/harsh/test/test_input_file1.txt transformed by 1st war 45718961-2a99-d368-d88a-9bc2ceb955cd

The 2nd server is generating the same log with different message ids. Am I missing something in this?
Do I need to write my custom filter for this??

回答1:

My problem is, will it able to handle around 5 millions files per day?

That depends on the size of the files and the bandwidth of the network; the use of Spring Integration is unlikely to be a factor.

You should probably remove files locally after processing, though, to avoid large directory scans.

To avoid duplicates in a cluster, you need to use a FtpPersistentAcceptOnceFileListFilter backed by a shared metadata store so that each instance will skip files handled by other instances.

See the documentation for more information.

EDIT:

I just tested with your configuration and see no problems. Are you sure both instances are using the same Redis server?

If you run redis-cli and then monitor, you should see something like:

1459258131.934949 [0 127.0.0.1:55237] "HSETNX" "MetaData" "bar.txt" "1384837200000"
1459258131.935129 [0 127.0.0.1:55237] "HSETNX" "MetaData" "baz.txt" "1384837200000"
1459258131.940125 [0 127.0.0.1:55237] "HSETNX" "MetaData" "/tmp/test/bar.txt" "1459258131000"
1459258131.940353 [0 127.0.0.1:55237] "HSETNX" "MetaData" "/tmp/test/baz.txt" "1459258131000"

In this case, the remote directory had 2 files; the first 2 lines are from the remote filter, the last two are from the local filter (setting the initial values).

You should then see a bunch of

1459258142.073316 [0 127.0.0.1:55237] "HSETNX" "MetaData" "bar.txt" "1384837200000"
1459258142.073506 [0 127.0.0.1:55237] "HGET" "MetaData" "bar.txt"

(once per poll - checking to see if the timestamp changed).