java.lang.IllegalArgumentException: Wrong FS: , ex

2019-05-02 23:44发布

问题:

I am trying to implement reduce side join , and using mapfile reader to look up distributed cache but it is not looking up the values when checked in stderr it showed the following error, lookupfile file is already present in hdfs , and seems to be loaded correctly into cache as seen in the stdout.

java.lang.IllegalArgumentException: Wrong FS: file:/app/hadoop/tmp/mapred/local/taskTracker/distcache/-8118663285704962921_-1196516983_170706299/localhost/input/delivery_status/DeliveryStatusCodes/data, expected: hdfs://localhost:9000 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:390) at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:554) at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:816) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1479) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1474) at org.apache.hadoop.io.MapFile$Reader.createDataFileReader(MapFile.java:302) at org.apache.hadoop.io.MapFile$Reader.open(MapFile.java:284) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:273) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:260) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:253) at mr_poc.reducerrsj.initializeDepartmentsMap(reducerrsj.java:59) at mr_poc.reducerrsj.setup(reducerrsj.java:42) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:174) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) at org.apache.hadoop.mapred.Child.main(Child.java:249) java.lang.NullPointerException at mr_poc.reducerrsj.buildOutputValue(reducerrsj.java:83) at mr_poc.reducerrsj.reduce(reducerrsj.java:127) at mr_poc.reducerrsj.reduce(reducerrsj.java:1) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.

This is my driver code ,

package mr_poc;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class driverrsj extends Configured implements Tool{

    @Override
    public int run(String[] arg) throws Exception {
if(arg.length != 3)
{
    System.out.printf("3 parameters are required for DriverRSJ- <Input Dir1> <Input Dir2> <Output Dir> \n");
    return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
DistributedCache.addCacheFile(new URI("/input/delivery_status"), conf);
System.out.println("Cache : " + job.getConfiguration().get("mapred.cache.files"));
job.setJarByClass(driverrsj.class);
conf.setInt("cust_info", 1);
conf.setInt("status", 2);
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(arg[0].toString()).append(",").append(arg[1].toString());
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(arg[2]));
job.setJarByClass(driverrsj.class);
job.setMapperClass(mappperRSJ.class);
job.setReducerClass(reducerrsj.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
//job.setPartitionerClass(partinonrsj.class);
job.setSortComparatorClass(secondarysortcomp.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);


boolean success =job.waitForCompletion(true);
return success? 0 : 1;

    }

    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new Configuration(), new driverrsj(),args);
        System.exit(exitCode);

    }


}

This is my reducer code

package mr_poc;

import java.io.File;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reducerrsj extends Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text>{
    StringBuilder reduceValueBuilder = new StringBuilder("");
    NullWritable nullWritableKey = NullWritable.get();
    Text reduceOutputValue = new Text("");
    String strSeparator = ",";
    private MapFile.Reader deptMapReader = null;
    Text txtMapFileLookupKey = new Text();
    Text txtMapFileLookupValue = new Text();
    //Path[] cacheFilesLocal;
    //Path[] eachPath;

    @Override
    protected void setup(Context context) throws IOException,InterruptedException {
        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());



        for ( Path eachPath : cacheFiles){

            System.out.println(eachPath.toString());
              System.out.println(eachPath.getName());
            if(eachPath.getName().toString().contains("delivery_status"))
            {

                URI uriUncompressedFile = new File(eachPath.toString()+ "/DeliveryStatusCodes").toURI();
                initializeDepartmentsMap(uriUncompressedFile, context);

            }
            }
        }

    //@SuppressWarnings("deprecation")
    private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
    throws IOException {
    // {{
    // Initialize the reader of the map file (side data)
        Configuration conf = context.getConfiguration();
        conf.addResource(new Path("/usr/local/hadoop-1.2.1/conf/core-site.xml"));
        FileSystem dfs = FileSystem.get(conf);
    try {


    deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());
    } catch (Exception e) {
    e.printStackTrace();
    }
    // }}
    }
    private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
            StringBuilder reduceValueBuilder, Text value) {

            if (key.getsourceindex() == 2) {


            String arrSalAttributes[] = value.toString().split(",");
            txtMapFileLookupKey.set(arrSalAttributes[0].toString());
            System.out.println("key=" + txtMapFileLookupKey);


            try {

            deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
            }
             catch (Exception e) {
            txtMapFileLookupValue.set("");
                e.printStackTrace();
            } finally {
            txtMapFileLookupValue
            .set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
            .equals("")) ? "NOT-FOUND"
            : txtMapFileLookupValue.toString());
            }

            reduceValueBuilder.append(txtMapFileLookupValue.toString());


            } else if(key.getsourceindex() == 1) {

            String arrEmpAttributes[] = value.toString().split(",");
            reduceValueBuilder.append(arrEmpAttributes[0].toString()).append(
            strSeparator);
            } 



            txtMapFileLookupKey.set("");
            txtMapFileLookupValue.set("");

            return reduceValueBuilder;
    }

    @Override
    public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
    Context context) throws IOException, InterruptedException {


    for (Text value : values) {
    buildOutputValue(key, reduceValueBuilder, value);
    }

    // Drop last comma, set value, and emit output
    if (reduceValueBuilder.length() > 1) {

    //reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
    // Emit output
    reduceOutputValue.set(reduceValueBuilder.toString());
    context.write(nullWritableKey, reduceOutputValue);
    } else {
    System.out.println("Key=" + key.getjoinkey() + "src="
    + key.getsourceindex());

    }
    // Reset variables
    reduceValueBuilder.setLength(0);
    reduceOutputValue.set("");

    }
    @Override
    protected void cleanup(Context context) throws IOException,
    InterruptedException {
         if(deptMapReader != null)
         {
deptMapReader.close();
    }
    }
}

this is my core-site-Xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9000</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>

Any help would be highly appreciated. Thanks in advance!!!

回答1:

I had a same issue, I resolved that by adding

FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),conf)

in the driver class.

You have to import URIfrom java.net.URI



回答2:

I think I have encountered a similar problem also. The key point of this problem is that you are going to operating a SequenceFile from DistributedCache which should be on your local file system. From your logs, there is a line

"org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140)" 

If you can check out the source code of SequenceFile.Reader, you will find out that the log is caused by this code

fs.getFileStatus(filename).getLen()    

and this "fs" here should be LocalFileSystem instead of DistributedFileSystem.

My solution is that change

deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());

to

JobConf conf = context.getConfiguration();
String originalFS = conf.get("fs.default.name");   //backup original configuration
conf.set("fs.default.name", "file:///");           //change configuration to local file system
deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), conf);
conf.set("fs.default.name", originalFS);           //restore original configuration

After doing this, the SequenceFile.Reader object can access the cached files on local file system.

I think this problem happens because the SequenceFile API has changed, and some APIs of SequenceFile.Reader like MapFile.Reader(fs, path, conf) in this case have been deprecated.

This solution works fine for me.



回答3:

You should set the property of conf according to your core-site.xml file like this:

conf.set("fs.defaultFS", "hdfs://host:port");
conf.set("mapreduce.jobtracker.address", "host:port");


回答4:

include below line in job runner: DistributedCache.addCacheFile(new URI(""), conf);

below code in setup method of mapper

@Override
protected void setup(Context context) throws IOException, InterruptedException {
    Configuration configuration = new Configuration();
    FileSystem fileSystem = null;
    try {
         fileSystem = FileSystem.get(new URI("<File location"),configuration);
    } catch (URISyntaxException e) {
        e.printStackTrace();
    }

    String location = <S3 file location>;
    FSDataInputStream fsDataInputStream =fileSystem.open(new Path(location));
    Scanner scanner = new Scanner(fsDataInputStream);
    int i = 1;
    while(scanner.hasNextLine()) {
        String str[] = scanner.nextLine().split(",");
        LOG.info("keys are \t" + str[0] + str[1]);
        stickerMap.put(str[0] + str[1], i);
        ++i;
    }
}