how to submit mapreduce job with yarn api in java

2019-06-07 14:50发布

问题:

I want submit my MR job using YARN java API, I try to do it like WritingYarnApplications, but I don't know what to add amContainer, below is code I have written:

package org.apache.hadoop.examples;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.util.Records;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YarnJob {
    private static Logger logger = LoggerFactory.getLogger(YarnJob.class);

    public static void main(String[] args) throws Throwable {

        Configuration conf = new Configuration();
        YarnClient client = YarnClient.createYarnClient();
        client.init(conf);
        client.start();

        System.out.println(JSON.toString(client.getAllQueues()));
        System.out.println(JSON.toString(client.getConfig()));
        //System.out.println(JSON.toString(client.getApplications()));
        System.out.println(JSON.toString(client.getYarnClusterMetrics()));

        YarnClientApplication app = client.createApplication();
        GetNewApplicationResponse appResponse = app.getNewApplicationResponse();

        ApplicationId appId = appResponse.getApplicationId();

        // Create launch context for app master
        ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
        // set the application id
        appContext.setApplicationId(appId);
        // set the application name
        appContext.setApplicationName("test");
        // Set the queue to which this application is to be submitted in the RM
        appContext.setQueue("default");

        // Set up the container launch context for the application master
        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
        //amContainer.setLocalResources();
        //amContainer.setCommands();
        //amContainer.setEnvironment();

        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(Resource.newInstance(1024, 1));

        appContext.setApplicationType("MAPREDUCE");

        // Submit the application to the applications manager
        client.submitApplication(appContext);
        //client.stop();
    }
}

I can run a mapreduce job properly with command interface:

hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount /user/admin/input /user/admin/output/

But how can I submit this wordcount job in yarn java api?

回答1:

You do not use Yarn Client to submit job, instead use MapReduce APIs to submit job. See this link for Example

However if you need more control on the job, like getting status of completion, Mapper phase status, Reducer phase status, etc, you can use

job.submit();

Instead of

job.waitForCompletion(true)

You can use functions job.mapProgress() and job.reduceProgress() to get the status. There are lots of functions in job object which you can explore.

As far as your query about

hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount /user/admin/input /user/admin/output/

Whats happening here is you are running your driver program which is available in wordcount.jar. Instead of doing "java -jar wordcount.jar" you are using "hadoop jar wordcount.jar". you can as well use "yarn jar wordcount.jar". Hadoop/Yarn will setup necessary additional classpaths compared to java -jar command. This executes the "main()" of your driver program which is available in class org.apache.hadoop.examples.WordCount as specified in the command.

You can check out the source here Source for WordCount class

The only reason i would assume you want to submit job via yarn is to integrate it with some kind of service which kicks up MapReduce2 jobs on certain events.

For this you can always have your drivers main() something like this.

public class MyMapReduceDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    /******/

    int errCode = ToolRunner.run(conf, new MyMapReduceDriver(), args);

    System.exit(errCode);
}

@Override
public int run(String[] args) throws Exception {

    while(true) {

        try{

            runMapReduceJob();
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
    }
}

private void runMapReduceJob() {

    Configuration conf = new Configuration();
    Job job = new Job(conf, "word count");
    /******/

    job.submit();

    // Get status
    while(job.getJobState()==RUNNING || job.getJobState()==PREP){
        Thread.sleep(1000);

        System.out.println(" Map: "+ StringUtils.formatPercent(job.mapProgress(), 0) + " Reducer: "+ StringUtils.formatPercent(job.reduceProgress(), 0));

    }
}}

Hope this helps.



标签: java hadoop yarn