2

I am trying to follow a Hadoop tutorial on a website. I am trying to implement it in Java. The file that is provided is a file containing data about a forum. I want to parse that file and use the data.

The code to set my configurations is as follows:

    public class ForumAnalyser extends Configured implements Tool{
        public static void main(String[] args) {
            int exitCode = 0;
            try {
                exitCode = ToolRunner.run(new ForumAnalyser(), args);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            finally {
                System.exit(exitCode);
            }
        }    

        @Override
        public int run(String[] args) throws Exception {
            JobConf conf = new JobConf(ForumAnalyser.class);
            setStudentHourPostJob(conf);
            JobClient.runJob(conf);
            return 0;
        }

    public static void setStudentHourPostJob(JobConf conf) {
            FileInputFormat.setInputPaths(conf, new Path("input2"));
            FileOutputFormat.setOutputPath(conf, new Path("output_forum_post"));
            conf.setJarByClass(ForumAnalyser.class);

            conf.setMapperClass(StudentHourPostMapper.class);
            conf.setOutputKeyClass(LongWritable.class);
            conf.setMapOutputKeyClass(LongWritable.class);

            conf.setReducerClass(StudentHourPostReducer.class); 
            conf.setOutputValueClass(IntWritable.class);
            conf.setMapOutputValueClass(IntWritable.class);
        }
}

Each record in the file is separated by a "\n". So in the mapper class, each record is mostly correctly returned. Each column in every record is separated by tabs. The problem occurs for a specific column "posts". This column has the "posts" written by people and hence also contains "\n". So the mapper incorrectly reads a certain line under the "posts" column as a new record. Also, the "posts" column is specifically in double quotes in the file. The question that I have is: 1. How can I tell the mapper to differentiate each record correctly? Can I somehow tell it to read each column by tab? (I know how many columns each record has)?

Thanks in advance for the help.

1
  • Why not just use the NIO2 API? Commented Dec 23, 2015 at 21:51

1 Answer 1

1

By default, the MapReduce uses TextInputFormat, in which each record is a line of input (it assumes each record is delimited by new line ("\n")).

To achieve your requirements, you need to write your own InputFormat and RecordReader classes. For e.g. in Mahout, there is a XmlInputFormat for reading entire XML file as one record. Check the code here: https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java

I took the code for XmlInputFormat and modified it to achieve your requirements. Here is the code (I call it as MultiLineInputFormat and MultiLineRecordReader):

package com.myorg.hadooptests;

import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * Reads records that are delimited by a specific begin/end tag.
 */
public class MultiLineInputFormat extends TextInputFormat {

    private static final Logger log = LoggerFactory.getLogger(MultiLineInputFormat.class);

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
        try {
            return new MultiLineRecordReader((FileSplit) split, context.getConfiguration());
        } catch (IOException ioe) {
            log.warn("Error while creating MultiLineRecordReader", ioe);
            return null;
        }
    }

    /**
     * MultiLineRecordReader class to read through a given text document to output records containing multiple
     * lines as a single line
     *
     */
    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text> {

        private final long start;
        private final long end;
        private final FSDataInputStream fsin;
        private final DataOutputBuffer buffer = new DataOutputBuffer();
        private LongWritable currentKey;
        private Text currentValue;

        private static final Logger log = LoggerFactory.getLogger(MultiLineRecordReader.class);

        public MultiLineRecordReader(FileSplit split, Configuration conf) throws IOException {

            // open the file and seek to the start of the split
            start = split.getStart();
            end = start + split.getLength();
            Path file = split.getPath();
            FileSystem fs = file.getFileSystem(conf);
            fsin = fs.open(split.getPath());
            fsin.seek(start);

            log.info("start: " + Long.toString(start) + " end: " + Long.toString(end));
        }

        private boolean next(LongWritable key, Text value) throws IOException {
            if (fsin.getPos() < end) {
                try {
                    log.info("Started reading");
                    if(readUntilEnd()) {
                        key.set(fsin.getPos());
                        value.set(buffer.getData(), 0, buffer.getLength());
                        return true;
                    }
                } finally {
                    buffer.reset();
                }
            }
            return false;
        }

        @Override
        public void close() throws IOException {
            Closeables.closeQuietly(fsin);
        }

        @Override
        public float getProgress() throws IOException {
            return (fsin.getPos() - start) / (float) (end - start);
        }

        private boolean readUntilEnd() throws IOException {
            boolean insideColumn = false;
            byte[] delimiterBytes = new String("\"").getBytes("utf-8");
            byte[] newLineBytes = new String("\n").getBytes("utf-8");

            while (true) {
                int b = fsin.read();

                // end of file:
                if (b == -1) return false;
                log.info("Read: " + b);

                // We encountered a Double Quote
                if(b == delimiterBytes[0]) {
                    if(!insideColumn)
                        insideColumn = true;
                    else
                        insideColumn = false;
                }

                // If we encounter a new line and we are not inside a columnt, it means end of record.
                if(b == newLineBytes[0] && !insideColumn) return true;

                // save to buffer:
                buffer.write(b);

                // see if we've passed the stop point:
                if (fsin.getPos() >= end) {
                    if(buffer.getLength() > 0) // If buffer has some data, then return true
                        return true;
                    else
                        return false;
                }
            }
        }

        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            return currentKey;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return currentValue;
        }

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            currentKey = new LongWritable();
            currentValue = new Text();
            return next(currentKey, currentValue);
        }
    }
}

Logic:

  • I have assumed that the fields containing new lines ("\n") are delimited by double quotes (").
  • The record reading logic is in readUntilEnd() method.
  • In this method, if a new line appears and we are in the middle of reading a field (which is delimited by double quotes), we do not consider it as one record.

To test this, I wrote a Identity Mapper (which writes the input as-is to the output). In the driver, you explicitly specify the input format as your custom input format.

For e.g., I have specified the input format as:

job.setInputFormatClass(MultiLineInputFormat.class); // This is my custom class for InputFormat and RecordReader

Following is the code:

package com.myorg.hadooptests;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MultiLineDemo {

       public static class MultiLineMapper
                extends Mapper<LongWritable, Text , Text, NullWritable> {

            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                context.write(value, NullWritable.get());
            }
        }

        public static void main(String[] args) throws Exception {

            Configuration conf = new Configuration();

            Job job = Job.getInstance(conf, "MultiLineMapper");
            job.setInputFormatClass(MultiLineInputFormat.class);

            job.setJarByClass(MultiLineDemo.class);
            job.setMapperClass(MultiLineMapper.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);

            FileInputFormat.addInputPath(job, new Path("/in/in8.txt"));
            FileOutputFormat.setOutputPath(job, new Path("/out/"));

            job.waitForCompletion(true);

    }
}

I ran this on the following input. The input records match the output records exactly. You can see that 2nd field in each record, contains new lines ("\n"), but still entire record is returned in the output.

E:\HadoopTests\target>hadoop fs -cat /in/in8.txt

1       "post1 \n"      3
1       "post2 \n post2 \n"     3
4       "post3 \n post3 \n post3 \n"    6
1       "post4 \n post4 \n post4 \n post4 \n"   6

E:\HadoopTests\target>hadoop fs -cat /out/*

1       "post1 \n"      3
1       "post2 \n post2 \n"     3
1       "post4 \n post4 \n post4 \n post4 \n"   6
4       "post3 \n post3 \n post3 \n"    6

Note: I wrote this code for demo purpose. You need to handle the corner cases (if any) and optimize the code (if there is a scope for optimization).

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.