19

I am trying to run 2 functions doing completely independent transformations on a single RDD in parallel using PySpark. What are some methods to do the same?

def doXTransforms(sampleRDD):
    (X transforms)

def doYTransforms(sampleRDD):
    (Y Transforms)

if __name__ == "__main__":
    sc = SparkContext(appName="parallelTransforms")
    sqlContext = SQLContext(sc)
    hive_context = HiveContext(sc)

    rows_rdd = hive_context.sql("select * from tables.X_table")

    p1 = Process(target=doXTransforms , args=(rows_rdd,))
    p1.start()
    p2 = Process(target=doYTransforms, args=(rows_rdd,))  
    p2.start()
    p1.join()
    p2.join()
    sc.stop()

This does not work and I now understand this will not work. But is there any alternative way to make this work? Specifically are there any python-spark specific solutions?

1
  • If each of your transformations could use (almost) 100% of the cluster resource, which is usually the case, running them in parallel actually makes it slower. Commented Jun 27, 2016 at 9:02

1 Answer 1

18

Just use threads and make sure that cluster have enough resources to process both tasks at the same time.

from threading import Thread
import time

def process(rdd, f):
    def delay(x):
        time.sleep(1)
        return f(x)
    return rdd.map(delay).sum()


rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))

t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2  = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()

Arguably this is not that often useful in practice but otherwise should work just fine.

You can further use in-application scheduling with FAIR scheduler and scheduler pools for a better control over execution strategy.

You can also try pyspark-asyncactions (disclaimer - the author of this answer is also the author of the package) which provides a set of wrappers around Spark API and concurrent.futures:

import asyncactions
import concurrent.futures

f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()

[x.result() for x in concurrent.futures.as_completed([f1, f2])]
Sign up to request clarification or add additional context in comments.

3 Comments

Threads are not truly parallel in python because of the GIL. So if I use the above method then I will not be able to utilize multiple cores , right?
It doesn't matter. The only thing that happens in this code is RPC calls. It doesn't touch actual computations. You could handle this in a single thread with async calls as well. See also this answer and my comments beneath.
This worked , thanks! Only change I had to make to make it work was to change yarn.scheduler.capacity.maximum-am-resource-percent from 0.1 to 0.5 in /etc/hadoop/conf/capacity-scheduler.xml.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.