Merging small files in hadoop

2020-03-02 07:03发布

I have a directory (Final Dir) in HDFS in which some files(ex :10 mb) are loading every minute. After some time i want to combine all the small files to a large file(ex :100 mb). But the user is continuously pushing files to Final Dir. it is a continuous process.

So for the first time i need to combine the first 10 files to a large file (ex : large.txt) and save file to Finaldir.

Now my question is how i will get the next 10 files excluding the first 10 files?

can some please help me

2条回答
劫难
2楼-- · 2020-03-02 07:25

Here is one more alternate, this is still the legacy approach pointed out by @Andrew in his comments but with extra steps of making your input folder as a buffer to receive small files pushing them to a tmp directory in a timely fashion and merging them and pushing the result back to input.

step 1 : create a tmp directory

hadoop fs -mkdir tmp

step 2 : move all the small files to the tmp directory at a point of time

hadoop fs -mv input/*.txt tmp

step 3 -merge the small files with the help of hadoop-streaming jar

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
                   -Dmapred.reduce.tasks=1 \
                   -input "/user/abc/input" \
                   -output "/user/abc/output" \
                   -mapper cat \
                   -reducer cat

step 4- move the output to the input folder

hadoop fs -mv output/part-00000 input/large_file.txt

step 5 - remove output

 hadoop fs -rm -R output/

step 6 - remove all the files from tmp

hadoop fs -rm tmp/*.txt

Create a shell script from step 2 till step 6 and schedule it to run at regular intervals to merge the smaller files at regular intervals (may be for every minute based on your need)

Steps to schedule a cron job for merging small files

step 1: create a shell script /home/abc/mergejob.sh with the help of above steps (2 to 6)

important note: you need to specify the absolute path of hadoop in the script to be understood by cron

#!/bin/bash
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv input/*.txt tmp
wait
/home/abc/hadoop-2.6.0/bin/hadoop jar /home/abc/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
                   -Dmapred.reduce.tasks=1 \
                   -input "/user/abc/input" \
                   -output "/user/abc/output" \
                   -mapper cat \
                   -reducer cat
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv output/part-00000 input/large_file.txt
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm -R output/
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm tmp/*.txt

step 2: schedule the script using cron to run every minute using cron expression

a) edit crontab by choosing an editor

>crontab -e

b) add the following line at the end and exit from the editor

* * * * * /bin/bash /home/abc/mergejob.sh > /dev/null 2>&1

The merge job will be scheduled to run for every minute.

Hope this was helpful.

查看更多
不美不萌又怎样
3楼-- · 2020-03-02 07:36

@Andrew pointed you to a solution that was appropriate 6 years ago, in a batch-oriented world.
But it's 2016, you have a micro-batch data flow running and require a non-blocking solution.

That's how I would do it:

  • create an EXTERNAL table with 3 partitions, mapped on 3 directories e.g. new_data, reorg and history
  • feed the new files into new_data
  • implement a job to run the batch compaction, and run it periodically

Now the batch compaction logic:

  1. make sure that no SELECT query will be executed while the compaction is running, else it would return duplicates
  2. select all files that are ripe for compaction (define your own criteria) and move them from new_data directory to reorg
  3. merge the content of all these reorg files, into a new file in history dir (feel free to GZip it on the fly, Hive will recognize the .gz extension)
  4. drop the files in reorg

So it's basically the old 2010 story, except that your existing data flow can continue dumping new files into new_data while the compaction is safely running in separate directories. And in case the compaction job crashes, you can safely investigate / clean-up / resume the compaction without compromising the data flow.


By the way, I am not a big fan of the 2010 solution based on a "Hadoop Streaming" job -- on one hand, "streaming" has a very different meaning now; on the second hand, "Hadoop streaming" was useful in the old days but is now out of the radar; on the gripping hand [*] you can do it quite simply with a Hive query e.g.

INSERT INTO TABLE blahblah PARTITION (stage='history')
SELECT a, b, c, d
FROM blahblah
WHERE stage='reorg'
;

With a couple of SET some.property = somevalue before that query, you can define what compression codec will be applied on the result file(s), how many file(s) you want (or more precisely, how big you want the files to be - Hive will run the merge accordingly), etc.

Look into https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties under hive.merge.mapfiles and hive.merge.mapredfiles (or hive.merge.tezfiles if you use TEZ) and hive.merge.smallfiles.avgsize and then hive.exec.compress.output and mapreduce.output.fileoutputformat.compress.codec -- plus hive.hadoop.supports.splittable.combineinputformat to reduce the number of Map containers since your input files are quite small.


[*] very old SF reference here :-)

查看更多
登录 后发表回答