How can I force Flume-NG to process the backlog of

2019-03-11 15:07发布

问题:

I'm trying to setup Flume-NG to collect various kinds of logs from a bunch of servers (mostly running Tomcat instances and Apache Httpd) and dump them into HDFS on a 5-node Hadoop cluster. The setup looks like this:

Each application server tails the relevant logs into a one of the Exec Sources (one for each log type: java, httpd, syslog), which outs them through a FileChannel to an Avro sink. On each server the different sources, channels and sinks are managed by one Agent. Events get picked up by an AvroSource which resides on the Hadoop Cluster (the node that also hosts the SecondaryNameNode and the Jobtracker). For each logtype there is an AvroSource listening on a different port. The events go through the FileChannel into the HDFS Sink, which saves the events using the FlumeEventAvro EventSerializer and Snappy compression.

The problem: The agent on the Hadoop node that manages the HDFS Sinks (again, one for each logtype) failed after some hours because we didn't change the Heap size of the JVM. From then on lots of events were collected in the FileChannel on that node and after that also on the FileChannels on the Application Servers, because the FileChannel on the Hadoop node reached it's maximum capacity. When I fixed the problem, I couldn't get the agent on the Hadoop node to process the backlog speedily enough so it could resume normal operation. The size of the tmp dir where the FileChannel saves the events before sinking them, keeps growing all the time. Also, HDFS writes seem to be real slow. Is there a way to force Flume to process the backlog first before ingesting new events? Is the following configuration optimal? Maybe related: The files that get written to HDFS are really small, around 1 - 3 MB or so. That's certainly not optimal with the HDFS default blocksize of 64MB and with regards to future MR operations. What settings should I use to collect the events in files large enough for the HDFS blocksize? I have a feeling the config on the Hadoop node is not right, I'm suspecting the values for BatchSize, RollCount and related params are off, but I'm not sure what the optimal values should be.

Example config on Application Servers:

agent.sources=syslogtail httpdtail javatail
agent.channels=tmpfile-syslog tmpfile-httpd tmpfile-java
agent.sinks=avrosink-syslog avrosink-httpd avrosink-java

agent.sources.syslogtail.type=exec
agent.sources.syslogtail.command=tail -F /var/log/messages
agent.sources.syslogtail.interceptors=ts
agent.sources.syslogtail.interceptors.ts.type=timestamp
agent.sources.syslogtail.channels=tmpfile-syslog
agent.sources.syslogtail.batchSize=1

...

agent.channels.tmpfile-syslog.type=file
agent.channels.tmpfile-syslog.checkpointDir=/tmp/flume/syslog/checkpoint
agent.channels.tmpfile-syslog.dataDirs=/tmp/flume/syslog/data

...

agent.sinks.avrosink-syslog.type=avro
agent.sinks.avrosink-syslog.channel=tmpfile-syslog
agent.sinks.avrosink-syslog.hostname=somehost
agent.sinks.avrosink-syslog.port=XXXXX
agent.sinks.avrosink-syslog.batch-size=1

Example config on Hadoop node

agent.sources=avrosource-httpd avrosource-syslog avrosource-java
agent.channels=tmpfile-httpd tmpfile-syslog tmpfile-java
agent.sinks=hdfssink-httpd hdfssink-syslog hdfssink-java

agent.sources.avrosource-java.type=avro
agent.sources.avrosource-java.channels=tmpfile-java
agent.sources.avrosource-java.bind=0.0.0.0
agent.sources.avrosource-java.port=XXXXX

...

agent.channels.tmpfile-java.type=file
agent.channels.tmpfile-java.checkpointDir=/tmp/flume/java/checkpoint
agent.channels.tmpfile-java.dataDirs=/tmp/flume/java/data
agent.channels.tmpfile-java.write-timeout=10
agent.channels.tmpfile-java.keepalive=5
agent.channels.tmpfile-java.capacity=2000000

...

agent.sinks.hdfssink-java.type=hdfs
agent.sinks.hdfssink-java.channel=tmpfile-java
agent.sinks.hdfssink-java.hdfs.path=/logs/java/avro/%Y%m%d/%H
agent.sinks.hdfssink-java.hdfs.filePrefix=java-
agent.sinks.hdfssink-java.hdfs.fileType=DataStream
agent.sinks.hdfssink-java.hdfs.rollInterval=300
agent.sinks.hdfssink-java.hdfs.rollSize=0
agent.sinks.hdfssink-java.hdfs.rollCount=40000
agent.sinks.hdfssink-java.hdfs.batchSize=20000
agent.sinks.hdfssink-java.hdfs.txnEventMax=20000
agent.sinks.hdfssink-java.hdfs.threadsPoolSize=100
agent.sinks.hdfssink-java.hdfs.rollTimerPoolSize=10

回答1:

There are a couple of things I see in your configuration that can cause issues:

  1. Your first agent seems to have an avro sink with batch size of 1. You should bump this up to at least 100 or more. This is because the avro source on the second agent would be committing to the channel with batch size of 1. Each commit causes an fsync, causing the file channel performance to be poor. The batch size on the exec source is also 1, causing that channel to be slow as well. You can increase the batch size (or use the Spool Directory Source - more on that later).

  2. You can have multiple HDFS sinks reading from the same channel to improve performance. You should just make sure that each sink writes to a different directory or have different "hdfs.filePrefix", so that multiple HDFS sinks don't try to write to the same files.

  3. Your batch size for the HDFS sink is 20000, which is quite high, and your callTimeout is the default of 10 seconds. You should increase "hdfs.callTimeout" if you want to keep such a huge batch size. I'd recommend reducing the batch size to 1000 or so, and having timeout of about 15-20 seconds. (Note that at the current batch size, each file holds only 2 batches - so reduce the batch size, increase the rollInterval and timeOut)

If you are using tail -F, I'd recommend trying out the new Spool Directory Source. To use this source, rotate out your log files to a directory, which the Spool Directory Source processes. This source will only process files which are immutable, so you need to rotate the log files out. Using tail -F with exec source has issues, as documented in the Flume User Guide.