0

Here is my Reducer. Reducer takes in EdgeWritable and NullWritable

EdgeWritable has 4 integers, say <71, 74, 7, 2000> The communication is between 71(FromID) to 74(ToID) on 7(July) 2000(Year).

Mapper outputs 10787 records as such to reducer, But Reducer only outputs 1.

I need to output 44 files with for 44 months between the period Oct-1998 and July 2002. The output should be in format "out"+month+year. ForExample July 2002 records will be in file out72002.

I have debugged the code. After one iteration, it outputs one file and stops without taking next record. Please suggest How I should use MultipleOutput. Thanks

public class MultipleOutputReducer extends Reducer<EdgeWritable, NullWritable, IntWritable, IntWritable>{
private MultipleOutputs<IntWritable,IntWritable> multipleOutputs;

protected void setup(Context context) throws IOException, InterruptedException{
    multipleOutputs = new MultipleOutputs<IntWritable, IntWritable>(context);

}

@Override public void reduce(EdgeWritable key, Iterable val , Context context) throws IOException, InterruptedException { int year = key.get(3).get(); int month= key.get(2).get(); int to = key.get(1).get(); int from = key.get(0).get();

    //if(year >= 1997 && year <= 2001){
        if((month >= 9 && year >= 1997) || (month <= 6 && year <= 2001)){

            multipleOutputs.write(new IntWritable(from), new IntWritable(to), "out"+month+year );
        }
    //}

}
@Override
public void cleanup(Context context) throws IOException, InterruptedException{
    multipleOutputs.close();
}

Driver

public class TimeSlicingDriver extends Configured implements Tool{

static final SimpleDateFormat sdf = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z");
public int run(String[] args) throws Exception {


    if(args.length != 2){
        System.out.println("Enter <input path> <output path>");
        System.exit(-1);
    }

    Configuration setup = new Configuration();
    //setup.set("Input Path", args[0]);
    Job job = new Job(setup, "Time Slicing");
    //job.setJobName("Time Slicing");
    job.setJarByClass(TimeSlicingDriver.class);

    job.setMapperClass(TimeSlicingMapper.class);
    job.setReducerClass(MultipleOutputReducer.class);

    //MultipleOutputs.addNamedOutput(setup, "output", org.apache.hadoop.mapred.TextOutputFormat.class, EdgeWritable.class, NullWritable.class);


    job.setMapOutputKeyClass(EdgeWritable.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);

    /**Set the Input File Path and output file path*/
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));



    return job.waitForCompletion(true)?0:1;
}

1 Answer 1

0

you are not iterating your Iterator "val", for that reason your logic in your code is executed one time for each group.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.