Is it possible to run multiple mappers on one node

2019-07-15 00:28发布

I have the code of KMeans and my task is to calculate the speedup, I've done it by running it on different numbers of nodes in my uni's clusters. But is it possible to change the number of mappers and/or reducers, so that I can check the change in speedup while running it on single node.

While googling, I found that by using conf.setNumReduceTasks(2); I can change the numbers of reducers. but I havn't see any change in my output. (My output is the time in ms).

The code I am using is from github: https://github.com/himank/K-Means/blob/master/src/KMeans.java Although I've made some changes according to my requirement, but the main functionality is the same.

Here is how main function looks like:

    public static void main(String[] args) throws Exception {
    long startTime = System.currentTimeMillis();
    IN = args[0];
    OUT = args[1];
    String input = IN;
    String output = OUT + System.nanoTime();
    String again_input = output;
    int iteration = 0;
    boolean isdone = false;
    while (isdone == false) {
        JobConf conf = new JobConf(KMeans.class);
        if (iteration == 0) {
            Path hdfsPath = new Path(input + CENTROID_FILE_NAME);
            DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
        } else {
            Path hdfsPath = new Path(again_input + OUTPUT_FILE_NAME);
            DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
        }
        conf.setJobName(JOB_NAME);
        //conf.setNumReduceTasks(2);
        conf.setMapOutputKeyClass(DoubleWritable.class);
        conf.setMapOutputValueClass(DoubleWritable.class);
        conf.setOutputKeyClass(DoubleWritable.class);
        conf.setOutputValueClass(Text.class);
        conf.setMapperClass(Map.class);
        conf.setNumMapTasks(4);
        conf.setReducerClass(Reduce.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(input + DATA_FILE_NAME));
        FileOutputFormat.setOutputPath(conf, new Path(output));
        JobClient.runJob(conf);
        Path ofile = new Path(output + OUTPUT_FILE_NAME);   

        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://127.0.0.1:9000"), configuration);
        Path filePath = new Path(output + OUTPUT_FILE_NAME);
        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(filePath)));
        List<Double> centers_next = new ArrayList<Double>();
        String line = br.readLine();
        while (line != null) {
            String[] sp = line.split("\t| ");
            double c = Double.parseDouble(sp[0]);
            centers_next.add(c);
            line = br.readLine();
        }
        br.close();
        String prev;
        if (iteration == 0) {
            prev = input + CENTROID_FILE_NAME;
        } else {
            prev = again_input + OUTPUT_FILE_NAME;
        }
        Path prevfile = new Path(prev);
        FileSystem fs1 = FileSystem.get(new URI("hdfs://127.0.0.1:9000"), configuration);
        BufferedReader br1 = new BufferedReader(new InputStreamReader(fs1.open(prevfile)));
        List<Double> centers_prev = new ArrayList<Double>();
        String l = br1.readLine();
        while (l != null) {
            String[] sp1 = l.split(SPLITTER);
            double d = Double.parseDouble(sp1[0]);
            centers_prev.add(d);
            l = br1.readLine();
        }
        br1.close();
        Collections.sort(centers_next);
        Collections.sort(centers_prev);
        Iterator<Double> it = centers_prev.iterator();
        for (double d : centers_next) {
            double temp = it.next();
            if (Math.abs(temp - d) <= 0.1) {
                isdone = true;
            } else {
                isdone = false;
                break;
            }
        }
        ++iteration;
        again_input = output;
        output = OUT + System.nanoTime();
    }
    long endTime   = System.currentTimeMillis();
    long totalTime = endTime - startTime;
    System.out.println(totalTime);
}

PS. I am new to Hadoop and MapReduce.

3条回答
2楼-- · 2019-07-15 00:59

Apache Map Reduce Tutorial provides more info.

How Many Maps?

The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.

The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.

Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.

查看更多
3楼-- · 2019-07-15 01:17

The number of maps for a given job is usually driven by the number of input splits in the input files and not by the setNumMapTasks() or mapred.map.tasks parameter. a Map task is spawned for each input split. the mapred.map.tasks parameter is just a hint to the InputFormat for the number of maps. the number of map tasks can be increased manually using setNumMapTasks(), It can be used to increase the number of map tasks, but will not set the number below that which Hadoop determines via splitting the input data.

http://wiki.apache.org/hadoop/HowManyMapsAndReduces

查看更多
手持菜刀,她持情操
4楼-- · 2019-07-15 01:24

Yes.

You can change number of mappers using setNumMapTasks or conf.set('mapred.map.tasks','numberofmappersyouwanttoset') (but its a suggestion to configuration ) but there is no guarantee that mapper instances will be set. Moreover, It depends on inputsplits.

You can change number of reducer as well. using the kind of code you have written.

Conclusion :

setting number of maps - Suggestion (actually based on inputsplits i.e the total number of blocks of the input files.)

setting number of reducer - Demand

Apart from number of mappers and reducers from above answer from @radkris , pls. have a look in this.

查看更多
登录 后发表回答