I am developing a hadoop project. I want to find customers in a certain day and then write those with the max consumption in that day. In my reducer class, for some reason, the global variable max doesn't change it's value after a for loop.
EDIT I want to find the customers with max consumption in a certain day. I have managed to find the customers in the date I want, but I am facing a problem in my Reducer class. Here is the code:
EDIT #2 I already know that the values(consumption) are Natural numbers. So in my output file I want to be only the customers, of a certain day, with max consumption.
EDIT #3 My input file is consisted of many data. It has three columns; the customer's id, the timestamp (yyyy-mm-DD HH:mm:ss) and the consumption
Driver class
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class alicanteDriver {
public static void main(String[] args) throws Exception {
long t_start = System.currentTimeMillis();
long t_end;
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Alicante");
job.setJarByClass(alicanteDriver.class);
job.setMapperClass(alicanteMapperC.class);
//job.setCombinerClass(alicanteCombiner.class);
job.setPartitionerClass(alicantePartitioner.class);
job.setNumReduceTasks(2);
job.setReducerClass(alicanteReducerC.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt"));
FileOutputFormat.setOutputPath(job, new Path("/alicante_output"));
job.waitForCompletion(true);
t_end = System.currentTimeMillis();
System.out.println((t_end-t_start)/1000);
}
}
Mapper class
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class alicanteMapperC extends
Mapper<LongWritable, Text, Text, IntWritable> {
String Customer = new String();
SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date t = new Date();
IntWritable Consumption = new IntWritable();
int counter = 0;
// new vars
int max = 0;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Date d2 = null;
try {
d2 = ft.parse("2013-07-01 01:00:00");
} catch (ParseException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
if (counter > 0) {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line, ",");
while (itr.hasMoreTokens()) {
Customer = itr.nextToken();
try {
t = ft.parse(itr.nextToken());
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Consumption.set(Integer.parseInt(itr.nextToken()));
//sort out as many values as possible
if(Consumption.get() > max) {
max = Consumption.get();
}
//find customers in a certain date
if (t.compareTo(d2) == 0 && Consumption.get() == max) {
context.write(new Text(Customer), Consumption);
}
}
}
counter++;
}
}
Reducer class
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;
public class alicanteReducerC extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int max = 0; //this var
// declaration of Lists
List<Text> l1 = new ArrayList<Text>();
List<IntWritable> l2 = new ArrayList<IntWritable>();
for (IntWritable val : values) {
if (val.get() > max) {
max = val.get();
}
l1.add(key);
l2.add(val);
}
for (int i = 0; i < l1.size(); i++) {
if (l2.get(i).get() == max) {
context.write(key, new IntWritable(max));
}
}
}
}
Some values of the Input file
C11FA586148,2013-07-01 01:00:00,3
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013-07-01 01:00:00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07-01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4
Output should be
C11FA586258 5
C11FA586413 5
I have searched the forums for a couple of hours, and still can't find the issue. Any ideas?
here is the refactored code: you can pass/change specific value for date of consumption. In this case you don't need reducer. my first answer was to query max comsumption from input, and this answer is to query user provided consumption from input.
setup
method will get user provided value for mapper.maxConsumption.date
and pass them to map
method.
cleaup
method in reducer scans all max consumption customers and writes final max in input (i.e, 5 in this case) - see screen shot for detail execution log:
run as:
hadoop jar maxConsumption.jar -Dmapper.maxConsumption.date="2013-07-01 01:00:00" Data/input.txt output/maxConsupmtion5
#input:
C11FA586148,2013-07-01 01:00:00,3
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013-07-01 01:00:00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07-01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4
#output:
C11FA586258 5
C11FA586413 5
public class maxConsumption extends Configured implements Tool{
public static class DataMapper extends Mapper<Object, Text, Text, IntWritable> {
SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date dateInFile, filterDate;
int lineno=0;
private final static Text customer = new Text();
private final static IntWritable consumption = new IntWritable();
private final static Text maxConsumptionDate = new Text();
public void setup(Context context) {
Configuration config = context.getConfiguration();
maxConsumptionDate.set(config.get("mapper.maxConsumption.date"));
}
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
try{
lineno++;
filterDate = ft.parse(maxConsumptionDate.toString());
//map data from line/file
String[] fields = value.toString().split(",");
customer.set(fields[0].trim());
dateInFile = ft.parse(fields[1].trim());
consumption.set(Integer.parseInt(fields[2].trim()));
if(dateInFile.equals(filterDate)) //only send to reducer if date filter matches....
context.write(new Text(customer), consumption);
}catch(Exception e){
System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage());
}
}
}
public static class DataReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
LinkedHashMap<String, Integer> maxConsumption = new LinkedHashMap<String,Integer>();
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int max=0;
System.out.print("reducer received: " + key + " [ ");
for(IntWritable value: values){
System.out.print( value.get() + " ");
if(value.get() > max)
max=value.get();
}
System.out.println( " ]");
System.out.println(key.toString() + " max is " + max);
maxConsumption.put(key.toString(), max);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
int max=0;
//first find the max from reducer
for (String key : maxConsumption.keySet()){
System.out.println("cleaup customer : " + key.toString() + " consumption : " + maxConsumption.get(key)
+ " max: " + max);
if(maxConsumption.get(key) > max)
max=maxConsumption.get(key);
}
System.out.println("final max is: " + max);
//write only the max value from map
for (String key : maxConsumption.keySet()){
if(maxConsumption.get(key) == max)
context.write(new Text(key), new IntWritable(maxConsumption.get(key)));
}
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new maxConsumption(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: -Dmapper.maxConsumption.date=\"2013-07-01 01:00:00\" <in> <out>");
System.exit(2);
}
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "get-max-consumption");
job.setJarByClass(maxConsumption.class);
job.setMapperClass(DataMapper.class);
job.setReducerClass(DataReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileSystem fs = null;
Path dstFilePath = new Path(args[1]);
try {
fs = dstFilePath.getFileSystem(conf);
if (fs.exists(dstFilePath))
fs.delete(dstFilePath, true);
} catch (IOException e1) {
e1.printStackTrace();
}
return job.waitForCompletion(true) ? 0 : 1;
}
}
Probably all the values that go to your reducer are under 0. Try min value to identify if you variable change.
max = MIN_VALUE;
Based on what you say, the output should be only 0's (in this the max value in the reducers are 0) or no output (all values less than 0). Also, look this
context.write(key, new IntWritable());
it should be
context.write(key, new IntWritable(max));
EDIT: I just saw your Mapper class, it has a lot of problems. the following code is skiping the first element in every mapper. why?
if (counter > 0) {
I guess, that you are getting something like this right? "customer, 2013-07-01 01:00:00, 2,..." if that is the case and you are already filtering values, you should declare your max variable as local, not in the mapper scope, it would affect multiple customers.
There are a lot of questions around this.. you could explain your input for each mapper and what you want to do.
EDIT2: Based on your answer I would try this
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class AlicanteMapperC extends Mapper<LongWritable, Text, Text, IntWritable> {
private final int max = 5;
private SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Date t = null;
String[] line = value.toString().split(",");
String customer = line[0];
try {
t = ft.parse(line[1]);
} catch (ParseException e) {
// TODO Auto-generated catch block
throw new RuntimeException("something wrong with the date!" + line[1]);
}
Integer consumption = Integer.parseInt(line[2]);
//find customers in a certain date
if (t.compareTo(ft.parse("2013-07-01 01:00:00")) == 0 && consumption == max) {
context.write(new Text(customer), new IntWritable(consumption));
}
counter++;
}
}
and reducer pretty simple to emit 1 record per customer
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;
public class AlicanteReducerC extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//We already now that it is 5
context.write(key, new IntWritable(5));
//If you want something different, for example report customer with different values, you could iterate over the iterator like this
//for (IntWritable val : values) {
// context.write(key, new IntWritable(val));
//}
}
}