Apache Flume - send only new file contents

2019-09-15 06:01发布

问题:

I am a very new user to Flume, please treat me as an absolute noob. I am having a minor issue configuring Flume for a particular use case and was hoping you could assist. Note that I am not using HDFS, which is why this question is different from others you may have seen on forums.

I have two Virtual Machines (VMs) connected to each other through an internal network on Oracle Virtual Box. My goal is to have one VM watch a particular directory that will only ever have one file in it. When the file is changed, I wish for Flume to only send only the new lines/data. I want the other VM to receive this data and update/concatenate the data to a single file in a particular directory on it.

So far, I have this process very close to working. Whenever changes are made in VM1, they are updated on VM2. However, the entire file on VM1 is sent to VM2 every time, not the new lines. For example, if I wrote “Test1” and then a while later underneath wrote “Test2” to the file on VM1, on VM2 the output would be:

Test1

Test1

Test2

What I want to see is:

            Test1

            Test2

I am not sure how to implement this, and am sending this email after thoroughly examining the Flume user guide documentation and most relevant articles on stackoverflow/stackexchange. For your reference, below are the current configurations(they are working in the manner I mentioned above).

VM1 configuration

VM2 configuration

I realize another solution would be to keep the configuration on VM1 and overwrite the file on VM2 everytime new contents are detected. However, I am also unsure how to implement this.

Any assistance you could provide is greatly appreciated!

回答1:

Use TailDir source provided in Flume.It periodically writes last position read in position file and its more reliable than exec source as even in case of agent crashes or stops for some reason it will start reading from last position saved in the position file.

agent1.sources.src1.type = TAILDIR 
agent1.sources.src1.channels = ch1 
agent1.sources.src1.filegroups =f1
agent1.sources.src1.filegroups.f1= //path to log file 
agent1.sources.src1.maxBackoffSleep = 10000

Set maxBackoffSleep value as per your need it means how much max time agent should wait before polling for changes in log file , when it didnt find any changes in last attempt made.