By default, the MapReduce uses TextInputFormat, in which each record is a line of input (it assumes each record is delimited by new line ("\n")).
To achieve your requirements, you need to write your own InputFormat and RecordReader classes. For e.g. in Mahout, there is a XmlInputFormat for reading entire XML file as one record. Check the code here: https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
I took the code for XmlInputFormat and modified it to achieve your requirements. Here is the code (I call it as MultiLineInputFormat and MultiLineRecordReader):
package com.myorg.hadooptests;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Reads records that are delimited by a specific begin/end tag.
*/
public class MultiLineInputFormat extends TextInputFormat {
private static final Logger log = LoggerFactory.getLogger(MultiLineInputFormat.class);
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
try {
return new MultiLineRecordReader((FileSplit) split, context.getConfiguration());
} catch (IOException ioe) {
log.warn("Error while creating MultiLineRecordReader", ioe);
return null;
}
}
/**
* MultiLineRecordReader class to read through a given text document to output records containing multiple
* lines as a single line
*
*/
public static class MultiLineRecordReader extends RecordReader<LongWritable, Text> {
private final long start;
private final long end;
private final FSDataInputStream fsin;
private final DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable currentKey;
private Text currentValue;
private static final Logger log = LoggerFactory.getLogger(MultiLineRecordReader.class);
public MultiLineRecordReader(FileSplit split, Configuration conf) throws IOException {
// open the file and seek to the start of the split
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(split.getPath());
fsin.seek(start);
log.info("start: " + Long.toString(start) + " end: " + Long.toString(end));
}
private boolean next(LongWritable key, Text value) throws IOException {
if (fsin.getPos() < end) {
try {
log.info("Started reading");
if(readUntilEnd()) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
return false;
}
@Override
public void close() throws IOException {
Closeables.closeQuietly(fsin);
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}
private boolean readUntilEnd() throws IOException {
boolean insideColumn = false;
byte[] delimiterBytes = new String("\"").getBytes("utf-8");
byte[] newLineBytes = new String("\n").getBytes("utf-8");
while (true) {
int b = fsin.read();
// end of file:
if (b == -1) return false;
log.info("Read: " + b);
// We encountered a Double Quote
if(b == delimiterBytes[0]) {
if(!insideColumn)
insideColumn = true;
else
insideColumn = false;
}
// If we encounter a new line and we are not inside a columnt, it means end of record.
if(b == newLineBytes[0] && !insideColumn) return true;
// save to buffer:
buffer.write(b);
// see if we've passed the stop point:
if (fsin.getPos() >= end) {
if(buffer.getLength() > 0) // If buffer has some data, then return true
return true;
else
return false;
}
}
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return currentValue;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
currentKey = new LongWritable();
currentValue = new Text();
return next(currentKey, currentValue);
}
}
}
Logic:
- I have assumed that the fields containing new lines ("\n") are delimited by double quotes (").
- The record reading logic is in
readUntilEnd() method.
- In this method, if a new line appears and we are in the middle of reading a field (which is delimited by double quotes), we do not consider it as one record.
To test this, I wrote a Identity Mapper (which writes the input as-is to the output). In the driver, you explicitly specify the input format as your custom input format.
For e.g., I have specified the input format as:
job.setInputFormatClass(MultiLineInputFormat.class); // This is my custom class for InputFormat and RecordReader
Following is the code:
package com.myorg.hadooptests;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MultiLineDemo {
public static class MultiLineMapper
extends Mapper<LongWritable, Text , Text, NullWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "MultiLineMapper");
job.setInputFormatClass(MultiLineInputFormat.class);
job.setJarByClass(MultiLineDemo.class);
job.setMapperClass(MultiLineMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path("/in/in8.txt"));
FileOutputFormat.setOutputPath(job, new Path("/out/"));
job.waitForCompletion(true);
}
}
I ran this on the following input. The input records match the output records exactly. You can see that 2nd field in each record, contains new lines ("\n"), but still entire record is returned in the output.
E:\HadoopTests\target>hadoop fs -cat /in/in8.txt
1 "post1 \n" 3
1 "post2 \n post2 \n" 3
4 "post3 \n post3 \n post3 \n" 6
1 "post4 \n post4 \n post4 \n post4 \n" 6
E:\HadoopTests\target>hadoop fs -cat /out/*
1 "post1 \n" 3
1 "post2 \n post2 \n" 3
1 "post4 \n post4 \n post4 \n post4 \n" 6
4 "post3 \n post3 \n post3 \n" 6
Note: I wrote this code for demo purpose. You need to handle the corner cases (if any) and optimize the code (if there is a scope for optimization).