4

I want to do parallel processing in for loop using pyspark.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

data = [a,b,c]


for i in data:
    try:
        df = spark.read.parquet('gs://'+i+'-data')
        df.createOrReplaceTempView("people")
        df2=spark.sql("""select * from people """)
        df.show()
    except Exception as e:
        print(e)
        continue

Above mentioned script is working fine but i want to do parallel processing in pyspark and which is possible in scala

2

2 Answers 2

7

Spark itself runs job parallel but if you still want parallel execution in the code you can use simple python code for parallel processing to do it (this was tested on DataBricks Only link).

data = ["a","b","c"]

from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)


def fun(x):
    try:
        df = sqlContext.createDataFrame([(1,2, x), (2,5, "b"), (5,6, "c"), (8,19, "d")], ("st","end", "ani"))
        df.show()
    except Exception as e:
        print(e)

pool.map( fun,data)

I have changed your code a bit but this is basically how you can run parallel tasks, If you have some flat files that you want to run parallel just make a list with their name and pass it into pool.map( fun,data).

Change the function fun as need be.

For more details on the multiprocessing module check the documentation.

Similarly, if you want to do it in Scala you will need the following modules

import scala.concurrent.{Future, Await}

For a more detailed understanding check this out. The code is for Databricks but with a few changes, it will work with your environment.

Sign up to request clarification or add additional context in comments.

8 Comments

Will this bring it to the driver node? Or will it execute the parallel processing in the multiple worker nodes?
this is parallel execution in the code not actuall parallel execution. this is simple python parallel Processign it dose not interfear with the Spark Parallelism.
I also think this simply adds threads to the driver node. It doesn't send stuff to the worker nodes. I think Andy_101 is right.
I actually tried this out, and it does run the jobs in parallel in worker nodes surprisingly, not just the driver! My experiment setup was using 200 executors, and running 2 jobs in series would take 20 mins, and running them in ThreadPool takes 10 mins in total.
I think this does not work. Here's my sketch of proof. import socket from multiprocessing.pool import ThreadPool pool = ThreadPool(10) def getsock(i): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) return s.getsockname()[0] list(pool.map(getsock,range(10))) This always gives the same IP address. Namely that of the driver. Hence we are not executing on the workers.
|
1

Here's a parallel loop on pyspark using azure databricks.

import socket

def getsock(i):
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  s.connect(("8.8.8.8", 80))
  return s.getsockname()[0]

rdd1 = sc.parallelize(list(range(10)))
parallel=rdd1.map(getsock).collect()

On other platforms than azure you'll maybe need to create the spark context sc. On azure the variable exists by default.

Coding it up like this only makes sense if in the code that is executed parallelly (getsock here) there is no code that is already parallel. For instance, had getsock contained code to go through a pyspark DataFrame then that code is already parallel. So, it would probably not make sense to also "parallelize" that loop.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.