4

I have written a mapreduce program to process logs.The job writes side data in addition to actual output to a different location external to output path set in Driver code.But with speculative execution enabled ,the output of killed task attempts are not removed.Is there way to avoid this problem ? Is it possible to solve the issue other than writing to normal output location and copying to external location after job completion ?

Is it possible to solve this issue using 'OutputCommitter' ?

Has anyone tried this ? Any help would be appreciated.

2 Answers 2

1

Yes, FileOutputCommitter can be used that moves the contents of the temporary task directory to final output directory when a task succeeds, and deletes the original task directory.

I believe most of the the built-in output formats extending FileOutputFormat in Hadoop uses a OutputCommitter, by default which is FileOutputCommitter.

This is the code from FileOutputFormat

public synchronized 
     OutputCommitter getOutputCommitter(TaskAttemptContext context
                                        ) throws IOException {
    if (committer == null) {
      Path output = getOutputPath(context);
      committer = new FileOutputCommitter(output, context);
    }
    return committer;
  }

To write to multiple paths you can probably look into MultipleOutputs, that by default uses the OutputCommitter.

Or you can create your own output format and extend FileOutputFomat and override the above function in FileOutputFormat, create your own OutputCommitter implementation looking at the FileOutputCommitter code.

In the FileOoutputcommitter code you will find the function, which you might be interested in -

    /**
   * Delete the work directory
   */
  @Override
  public void abortTask(TaskAttemptContext context) {
    try {
      if (workPath != null) { 
        context.progress();
        outputFileSystem.delete(workPath, true);
      }
    } catch (IOException ie) {
      LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
    }
  }

If a task succeeds then commitTask()is called, which in the default implementation moves the temporaray task output directory (which has the task attempt ID in its name to avoid conflicts between task attempts) to the final output path, ${mapred.out put.dir}. Otherwise, the framework calls abortTask(), which deletes the temporary task output directory.

Sign up to request clarification or add additional context in comments.

7 Comments

,I will give a try with OutpurCommitter.I have a query.How multipleoutputs work if i need to output data in both map and reduce task in a mapreduce job (The key and value type are different for multiple outputs and normal output)? If I output data using multiple outputs in map task ,will it be written in map task itself or will be fowarded to reduce tasks ? Does the key and value type needs to be generic ie (WritableComparable and Writable)?
It can go both ways - if you use mos.write(K key, V value, String baseOutputPath) from mapper, where baseOutputPath something like path/keyname, you will get output as path/keyname-m-0000x. This output won't be processed by reduce tasks. Reduce task will only process keys emitted using context.write(). You can use LazyOutputFormat to mimic the behavior MultipleTextOutputFormat as mentioned in the same link for MultipleOutputs. And, yes, The key, value used in the MultipleOutputs needs to be either WritableComparable/Writable, as the API mentions.
.I have read that issue with multiple outputs would be taken care if multiple output location is set to job output location.Suppose if I'm writing multiple output in both mapper and reducer using same location as job output ,how it will behave ? Would I miss data written in mapper phase ?
@arjun: The above funtion deletes the working directory for your task as far as I know and that should be unique across all the tasks (map or reduce). You need to override that function to add your own implementation to remove your custom directories or stale files if any task fails. So, it's totally up to you how you do that.
@SSaikia_JtheRocker I have a simple map only job, single output, default FileOutputFormat as you mentione above, but unless I turn off the speculative execution, some tasks fail because the output directory already exists. I call: FileOutputFormat.setOutputPath(job, new Path(args[1])); Any idea how can this happen?
|
0

To avoid _logs and _SUCCESS files being created in the mapreduce output folder you may use the below settings:
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); conf.set("hadoop.job.history.user.location", "none");

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.