Merging hdfs files

2019-03-08 11:58发布

问题:

I have 1000+ files available in HDFS with a naming convention of 1_fileName.txt to N_fileName.txt. Size of each file is 1024 MB. I need to merge these files in to one (HDFS)with keeping the order of the file. Say 5_FileName.txt should append only after 4_fileName.txt

What is the best and fastest way to perform this operation.

Is there any method to perform this merging without copying the actual data between data nodes? For e-g: Get the block locations of this files and create a new entry (FileName) in the Namenode with these block locations?

回答1:

There is no efficient way of doing this, you'll need to move all the data to one node, then back to HDFS.

A command line scriptlet to do this could be as follows:

hadoop fs -text *_fileName.txt | hadoop fs -put - targetFilename.txt

This will cat all files that match the glob to standard output, then you'll pipe that stream to the put command and output the stream to an HDFS file named targetFilename.txt

The only problem you have is the filename structure you have gone for - if you have fixed width, zeropadded the number part it would be easier, but in it's current state you'll get an unexpected lexigraphic order (1, 10, 100, 1000, 11, 110, etc) rather than numeric order (1,2,3,4, etc). You could work around this by amending the scriptlet to:

hadoop fs -text [0-9]_fileName.txt [0-9][0-9]_fileName.txt \
    [0-9][0-9[0-9]_fileName.txt | hadoop fs -put - targetFilename.txt


回答2:

There is an API method org.apache.hadoop.fs.FileUtil.copyMerge that performs this operation:

public static boolean copyMerge(
                    FileSystem srcFS,
                    Path srcDir,
                    FileSystem dstFS,
                    Path dstFile,
                    boolean deleteSource,
                    Configuration conf,
                    String addString)

It reads all files in srcDir in alphabetical order and appends their content to dstFile.



回答3:

If you can use spark. It can be done like

sc.textFile("hdfs://...../part*).coalesce(1).saveAsTextFile("hdfs://...../filename)

Hope this works, since spark works in distributed fashion, you wont have to copy filed into one node. Though just a caution, coalescing files in spark can be slow if the files are very large.



回答4:

Since the file order is important and lexicographical order does not fulfill the purpose, it looks like a good candidate to write a mapper program for this task, which can probably run periodically. Offcourse there is no reducer, writing this as an HDFS map task is efficient because it can merge these files into one output file without much data movement across data nodes. As the source files are in HDFS, and since mapper tasks will try data affinity, it can merge files without moving files across different data nodes.

The mapper program will need a custom InputSplit (taking file names in the input directory and ordering it as required) and a custom InputFormat.

The mapper can either use hdfs append or a raw output stream where it can write in byte[].

A rough sketch of the Mapper program I am thinking of is something like:

public class MergeOrderedFileMapper extends MapReduceBase implements Mapper<ArrayWritable, Text, ??, ??> 
{
    FileSystem fs;

    public void map(ArrayWritable sourceFiles, Text destFile, OutputCollector<??, ??> output, Reporter reporter) throws IOException 
    {

        //Convert the destFile to Path.
        ...
        //make sure the parent directory of destFile is created first.
        FSDataOutputStream destOS = fs.append(destFilePath);
        //Convert the sourceFiles to Paths.
        List<Path> srcPaths;
        ....
        ....
            for(Path p: sourcePaths) {

                FSDataInputStream srcIS = fs.open(p);
                byte[] fileContent
                srcIS.read(fileContent);
                destOS.write(fileContent);
                srcIS.close();
                reporter.progress();  // Important, else mapper taks may timeout.
            }
            destOS.close();


        // Delete source files.

        for(Path p: sourcePaths) {
            fs.delete(p, false);
            reporter.progress();
        }

    }
}


回答5:

I wrote an implementation for PySpark as we use this quite often.

Modeled after Hadoop's copyMerge() and uses same lower-level Hadoop APIs to achive this.

https://github.com/Tagar/abalon/blob/v2.3.3/abalon/spark/sparkutils.py#L335

It keeps alphabetical order of file names.



标签: hadoop hdfs