1

I´m used to program in Python. My company now got a Hadoop Cluster with Jupyter installed. Until now I never used Spark / Pyspark for anything.

I am able to load files from HDFS as easy as this:

text_file = sc.textFile("/user/myname/student_grades.txt")

And I´m able to write output like this:

text_file.saveAsTextFile("/user/myname/student_grades2.txt")

The thing I´m trying to achieve is to use a simple "for loop" to read text files one-by-one and write it's content into one HDFS file. So I tried this:

list = ['text1.txt', 'text2.txt', 'text3.txt', 'text4.txt']

for i in list:
    text_file = sc.textFile("/user/myname/" + i)
    text_file.saveAsTextFile("/user/myname/all.txt")

So this works for the first element of the list, but then gives me this error message:

Py4JJavaError: An error occurred while calling o714.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory 
XXXXXXXX/user/myname/all.txt already exists

To avoid confusion I "blured"-out the IP address with XXXXXXXX.


What is the right way to do this? I will have tons of datasets (like 'text1', 'text2' ...) and want to perform a python function with each of them before saving them into HDFS. But I would like to have the results all together in "one" output file.

Thanks a lot!
MG

EDIT: It seems like that my final goal was not really clear. I need to apply a function to each text file seperately and then I want to append the output to the existing output directory. Something like this:

for i in list:
    text_file = sc.textFile("/user/myname/" + i)
    text_file = really_cool_python_function(text_file)
    text_file.saveAsTextFile("/user/myname/all.txt")

4 Answers 4

1

I wanted to post this as comment but could not do so as I do not have enough reputation.

You have to convert your RDD to dataframe and then write it in append mode. To convert RDD to dataframe please look into this answer:
https://stackoverflow.com/a/39705464/3287419
or this link http://spark.apache.org/docs/latest/sql-programming-guide.html
To save dataframe in append mode below link may be useful:
http://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes

Almost same question is here also Spark: Saving RDD in an already existing path in HDFS . But the answer provided is for scala. I hope something similar can be done in python also.

There is yet another (but ugly) approach. Convert your RDD to string. Let the resulting string be resultString . Use subprocess to append that string to destination file i.e.

subprocess.call("echo "+resultString+" | hdfs dfs -appendToFile - <destination>", shell=True)
Sign up to request clarification or add additional context in comments.

Comments

0

you can read multiple files and save them by

textfile = sc.textFile(','.join(['/user/myname/'+f for f in list]))
textfile.saveAsTextFile('/user/myname/all')

you will get all part files within output directory.

4 Comments

It seems like that my final goal was not really clear. I need to apply a function to each text file seperately and then I want to append the output to the existing output directory. See the EDIT
same function for all text files ?
Yes same function for all files, but I cant join the text files before because each files needs to be treated seperately
columns in all the files will be similar or different??
0

If the text files all have the same schema, you could use Hive to read the whole folder as a single table, and directly write that output.

Comments

0

I would try this, it should be fine:

   list = ['text1.txt', 'text2.txt', 'text3.txt', 'text4.txt']
    
    for i in list:
        text_file = sc.textFile("/user/myname/" + i)
    text_file.saveAsTextFile(f"/user/myname/{i}")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.