5

I have a mapreduce job written in Python. The program was tested successfully in linux env but failed when I run it under Hadoop.

Here is the job command:

hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1+169.127-streaming.jar \
   -input /data/omni/20110115/exp6-10122 -output /home/yan/visitorpy.out \
   -mapper SessionMap.py   -reducer  SessionRed.py  -file SessionMap.py \
   -file  SessionRed.py

The mode of Session*.py is 755, and #!/usr/bin/env python is the top line in the *.py file. Mapper.py is:

#!/usr/bin/env python
import sys
 for line in sys.stdin:
         val=line.split("\t")
         (visidH,visidL,sessionID)=(val[4],val[5],val[108])
         print "%s%s\t%s" % (visidH,visidL,sessionID)

Error from the log:

java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:260)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
    at java.io.DataOutputStream.write(DataOutputStream.java:90)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:110)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:126)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

8 Answers 8

5

I got the same problem and was wondering because when I test my mapper and reducer on test data it run. But when I run the same test set via hadoop map-reduce, I used to get the same problem.

How to test your code locally:

cat <test file> | python mapper.py | sort | python reducer.py

On more investigation, I found that I didn't include the 'shebang line' in my mapper.py script.

#!/usr/bin/python

Please add above line as first line of your python script and leave a blank line after this.

If you need to know more about 'shebang line', please read Why do people write #!/usr/bin/env python on the first line of a Python script?

Sign up to request clarification or add additional context in comments.

Comments

2

You can find the python error messages (for example traceback) and another things written by your script to stderr in hadoop web interface. It is a little bit hidden, but you will find it in link the streaming provides you with. You click to 'Map' or 'Reduce', then click on any task and then in column Task logs on 'All'

Comments

1

Finally I fixed the bug, and here is the lessons I learned. 1) The original code doesn't have error handling on bad data. I didn't spot the problem when I tested the code on a small dataset. 2) To handle the empty fields/variables, I found that it is a bit tricky in Python to test None and empty strings. Personally I like function len(strVar) which is easy to read and effective. 3) The hadoop command in this case is correct. Somehow the *.py with mode 644 can be run successfully in the environment I use.

1 Comment

If you think this answers the question you should select it as the answer. This will make it easier for others who might be facing a similar issue.
1

Hadoop Streaming - Hadoop 1.0.x

I had the same "Broken pipe" problem. The problem was a "break" statement in my reducer. So, everything went alright until the "break". After that the running reducer stopped running printing the "Broken pipe" error. Also, another reducer started to run having the same fate with the previous. This circle was going on and on.

If I understood right, when a reducer starts to read from stdin( that was my case, in a for loop), then it has to read everything. You cannot "break" this operation even if you close the stdin( os.close( 0) as I tried).

Comments

1

I had the same problem today playing with Hadoop 1.0.1. Fortunately, I've solved it by:

hadoop ... -mapper $cwd/mapper.py -reducer $cwd/reducer.py ...

(My Python scripts were in the current directory). It looks like absolute paths are necessary now.

Best!

Comments

1

A dirty input could cause this problem.

Try to use try{} to avoid this case.

#!/usr/bin/env python
import sys
for line in sys.stdin:
    try:
         val=line.split("\t")
         (visidH,visidL,sessionID)=(val[4],val[5],val[108])
         print "%s%s\t%s" % (visidH,visidL,sessionID)
    except Exception  as e:
        pass

Comments

0

Python + Hadoop is tricky in some details that shouldn't be. Take a look here.

Try to enclose your input path in double quotes. (-input "/data/omni/20110115/exp6-10122")

Comments

0

One possible solution is including "python", i.e.:

-mapper  "python ./mapper.py"
-reducer "python ./reducer.py" 

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.