1

I have a large python project right now, in which the driver program has a function that uses a for loop to traverse each file on my GCP (google cloud platform) bucket. I'm using CLI to submit the job to GCP and let the job run there on GCP.

For each file being traversed in this for loop, I'm invoking a function parse_file(...) that parses the file and invokes serials of other functions that deal with this file.

The whole project runs and takes a few minutes, which is slow, and the driver program hasn't used much PySpark yet. The issue is each parse_file(...) in that file-level for loop is executed in sequential order. Is it possible to use PySpark to parallelize that file-level for loop to run the parse_file(...) function in parallel for all these files to reduce program execution time and improve efficiency? If so, since the program isn't using PySpark, is there a lot of code modification needed to be done to make it parallelized?

So the function of the program looks like this

# ... some other codes
attributes_table = ....
for obj in gcp_bucket.objects(path):
    if obj.key.endswith('sys_data.txt'):
        #....some other codes
        file_data = (d for d in obj.download().decode('utf-8').split('\n'))
        parse_file(file_data, attributes_table)
        #....some other codes ....

How do I use PySpark to parallelize this part instead of using for loop traverse file one at a time?

1 Answer 1

1

Thank you for asking your question.

I would recommend creating a RDD based on your gcp_bucket.objects(path).

You have your SparkContext so creating the RDD should be as simple as:
my_rdd = sc.parallelize(gcp_bucket.objects(path).

To the uninitiated the convention is to assign the SparkContext is assigned to the variable sc. The contents of your for loop will have to be put into a function, let's call it my_function. You now have all your pieces.

Your next step will be mapping your function as such:

results_dag = my_rdd.map(my_function)
results = results_dag.collect()

Recall that Spark performs lazy evaluation. This is why we need to perform the collect operation at the end.

A few other recommendations. The first is run your code on a small set of objects from your GCP bucket. Get a sense of the timings. The other recommendation, to facilitate good coding practices, is to consider breaking the operations inside your for loop down even further into additional RDDs. You can always chain them together...

my_rdd = sc.parallelize(gcp_bucket.objects(path)
dag1 = my_rdd.map(function1)
dag2 = dag1.map(function2)
dag3 = dag2.map(function3)
results = dag3.collect()
Sign up to request clarification or add additional context in comments.

6 Comments

Thanks so much! I will try this now.
Thanks so much for the advice and your recommendations. I've been trying with your approach, I did use collect() but the program yelled with some errors, especially with the one at the line where I have collect(): PicklingError: Could not serialize object: TypeError: can't pickle _thead.lock objects I'm not sure how this error is generated. If I remove collect(), the program ran, but when I did dag1 = my_rdd.map(function1), function1 wasn't executed, is this because transformation is lazy and it just tells Spark this will be actually taken place once an action collect() is performed?
Also, does it matter whether "myfunction" returns anything to the driver program? What my "myfunction" in your example does is to do a series of stuffs on the passed in file data, so I don't see these series of stuffs necessarily return anything back to the Driver node on Spark. Does this invalidate the point of using collect()?
Troubleshooting thread locks is complicated. I would recommend breaking your problem down and isolating the portion of the code you want to parallelize. I would test your parallelization code on a small subset of paths to ensure your code is functioning properly. What you're specifically trying to do I've done before without having any issues. You will have to call collect otherwise the DAG will not be evaluated. I recommend looking at a few spark tutorials.
Thanks so much @VanBantam. I boiler down my problem to small pieces. I ran into a major issue now. I tried: dag1 = my_rdd.map(lambda file: function1(file)) ``` def function1(file):<br/> my_publisher = MyPublisher(attrs)<br/> result = function2(my_publisher)<br/> ``` Here my_publisher calls the MyPublisher constructor, but the program reports error Could not serialize object: TypeError: can't pickle _thread.lock objects. The issue comes from that my_publisher isn't serializable to be pickled. But my program has to use my_publisher. Is there anyway to fix this?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.