7

I have used one mapper,one reducer and one combiner class but I am getting the error as below:

java.io.IOException: wrong value class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:199)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1307)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1623)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at BookPublished1$Combine.reduce(BookPublished1.java:47)
at BookPublished1$Combine.reduce(BookPublished1.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1644)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1618)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1467)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:699)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:769)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

My entire program looks like below:

import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;


public class BookPublished1 {

    public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{

        public void map(LongWritable key, Text value,Context context)
                throws IOException,InterruptedException {

            String line = value.toString();
            String [] strYear = line.split(";");
            context.write(new Text(strYear[3]), new IntWritable(1));
            }


        }


    public static class Combine extends Reducer<Text,IntWritable,Text,Text>{

        public void reduce(Text key, Iterable<IntWritable> values,Context context)
                throws IOException,InterruptedException {
            int sum=0;
            // TODO Auto-generated method stub
            for(IntWritable x: values)
            {
                sum+=x.get();
            }



            context.write(new Text("BookSummary"), new Text(key + "_"+ sum));

        }

    }
 public static class Reduce extends Reducer<Text,Text,Text,FloatWritable>{

        public void reduce(Text key, Iterable<Text> values,Context context)throws IOException,InterruptedException
             {
            Long publishYear =0L, max=Long.MAX_VALUE;
                            Text publishYear1 = null,maxYear=null;
                            Long publishValue= 0L;
            String compositeString;
            String compositeStringArray[];
            // TODO Auto-generated method stub
            for(Text  x: values)
            {
                                                compositeString = x.toString();
                compositeStringArray = compositeString.split("_");
                publishYear1=new Text(compositeStringArray[0]);
                publishValue=new Long(compositeStringArray[1]);
                if(publishValue > max){
                max=publishValue;
                maxYear=publishYear1;

            }
            }
        Text keyText= new Text("max" + " ( " + maxYear.toString() + ") : ");

            context.write(keyText, new FloatWritable(max));



    }
 }


    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        Job job = new Job(conf,"BookPublished");

        job.setJarByClass(BookPublished1.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setCombinerClass(Combine.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);



        Path outputPath = new Path(args[1]);    
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));


        outputPath.getFileSystem(conf).delete(outputPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

please help me with the resolution.

1 Answer 1

14

Output types of a combiner must match output types of a mapper. Hadoop makes no guarantees on how many times the combiner is applied, or that it is even applied at all. And that's what happens in your case.

Values from map (<Text, IntWritable>) go directly to the reduce where types <Text, Text> are expected.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.