2

I have a case where I am using PySpark (or Spark if I can't do it with Python and instead need to use Scala or Java) to pull data from several hundred database tables that lack primary keys. (Why Oracle would ever create an ERP product that contains tables with primary keys is a different subject... but regardless, we need to be able to pull the data and save the data from each database table into a Parquet file.) I originally tried using Sqoop instead of PySpark, but due to a number of issues we ran into, it made more sense to try using PySpark/Spark instead.

Ideally, I'd like to have each task node in my compute cluster: take the name of a table, query that table from the database, and save that table as a Parquet file (or set of Parquet files) in S3. My first step is to get it working locally in standalone mode. (If I had a primary key for each given table, then I could partition the query and file saving process across different sets of rows for the given table and distribute the row partitions across the task nodes in the compute cluster to perform the file saving operation in parallel, but because Oracle's ERP product lacks primary keys for the tables of concern, that's not an option.)

I'm able to successfully query the target database with PySpark, and I'm able to successfully save the data into a parquet file with multithreading, but for some reason, only a single thread does anything. So, what happens is that only a single thread takes a tableName, queries the database, and saves the file to the desired directory as a Parquet file. Then the job ends as if no other threads were executed. I'm guessing that there may be some type of locking issue taking place. If I correctly understood the comments here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark? then what I'm trying to do should be possible unless there are specific issues related to executing parallel JDBC SQL queries.

Edit: I'm specifically looking for a way that allows me to use a thread pool of some type so that I don't need to manually create a thread for each one of the tables that I need to process and manually load-balance them across the task nodes in my cluster.

Even when I tried setting:

--master local[*]

and

--conf 'spark.scheduler.mode=FAIR'

the problem remained.

Also, to briefly explain my code, I needed to use a custom JDBC driver, and I'm running the code in a Jupyter notebook on Windows, so I'm using a workaround to ensure that PySpark starts with the correct parameters. (For the record, I have nothing against other operating systems, but my Windows machine is my fastest workstation, so that's why I'm using it.)

Here's my setup:

driverPath = r'C:\src\NetSuiteJDBC\NQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\\src\\PySparkConfigs\\fairscheduler.xml' pyspark-shell".format(driverPath)
)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType

spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()

Then, to test the multiprocessing, I created the file sparkMethods.py in the directory where I'm running my Jupyter notebook and put this method in it:

def testMe(x):
    return x*x

When I run:

from multiprocessing import Pool
import sparkMethods

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print(pool.map(sparkMethods.testMe, range(10)))

in my Jupyter notebook, I get the expected output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Now, before anyone reviles against the way I wrote the next method, please know that I initially tried passing the spark context via a closure and then ran into a Pickling error, as documented here: I can "pickle local objects" if I use a derived class? So, I included all of the Spark context in this next method that I put into the sparkMethods.py file (at least until I can find a better way). The reason that I put the methods into the external file (instead of including them just in the Jupyter Notebook) was to deal with this problem: https://bugs.python.org/issue25053 as discussed here: Multiprocessing example giving AttributeError and here: python multiprocessing: AttributeError: Can't get attribute "abc"

This is that method that contains the logic for making the JDBC connection:

# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
    import os
    import os.path
    from pyspark.sql import SparkSession, SQLContext
    spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
    spark.sparkContext.setLogLevel("INFO")
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

    jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", "OURCONNECTIONURL;") \
        .option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver") \
        .option("dbtable", tableName) \
        .option("user", "USERNAME") \
        .option("password", "PASSWORD") \
        .load()

    filePath = "C:\\src\\NetsuiteSparkProject\\" + tableName + "\\" + tableName + ".parquet"
    jdbcDF.write.parquet(filePath)
    fileExists = os.path.exists(filePath)
    if(fileExists):
        return (filePath + " exists!")
    else:
        return (filePath + " could not be written!")

Then, back in my Jupyter notebook, I run:

import sparkMethods
from multiprocessing import Pool

if __name__ == '__main__':
    with Pool(5) as p:
        p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)

The problem is that only one thread seems to execute.

When I execute it, in the console output, I see that it includes this initially:

The process cannot access the file because it is being used by another process. The system cannot find the file C:\Users\DEVIN~1.BOS\AppData\Local\Temp\spark-class-launcher-output-3662.txt. . . .

which leads me to suspect that perhaps there is some type of locking taking place.

Regardless, one of the threads will always run to completion successfully and successfully query its corresponding table and save it to a Parquet file as desired. There is some non-determinism in the process because different executions result in a different thread winning the race and consequently processing a different table. Interestingly, only a single job is getting executed, as shown in the Spark UI: Spark UI picture that shows that only one Spark Job was executed However, the article here: https://medium.com/@rbahaguejr/threaded-tasks-in-pyspark-jobs-d5279844dac0 implies that I should be expecting to see multiple jobs in the Spark UI if they were successfully started.

Now, if the problem is that PySpark is not actually capable of running multiple JDBC queries in parallel across different task nodes, then perhaps my solution would be to use a JDBC connection pool or even just open a connection for each table (as long as I close the connection at the end of the thread). When getting the list of tables to process, I had success with connecting to the database through the jaydebeapi library like this:

import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",  
                          "OURCONNECTIONURL;", 
                          ["USERNAME", "PASSWORD"], 
                          r"C:\src\NetSuiteJDBC\NQjc.jar")

top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables

Output is:

['SALES_TERRITORY_PLAN_PARTNER',
 'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
 'ITEM_ACCOUNT_MAP',
 'PRODUCT_TRIAL_STATUS',
 'ACCOUNT_PERIOD_ACTIVITY']

So, conceivably, if the problem is that PySpark cannot be used to distribute multiple queries across task nodes like this, then perhaps I can use the jaydebeapi library to make the connection. However, in that case, I'd still need a way to be able to write the output of the JDBC SQL query to a Parquet file (which ideally would leverage Spark's schema inference capability), but I'm open to taking that approach if it's feasible.

So, how do I successfully query the database and save the output to Parquet files in parallel (i.e. distributed across the task nodes) without the master node performing all of the querying sequentially?

8
  • 1
    I think, the problem is you shouldn't use multiprocess but multithread. As you don't and shouldn't create multiple spark context. What you need is multiple workers but not multiple drivers. Commented Nov 21, 2018 at 2:12
  • Possible duplicate of How to run independent transformations in parallel using PySpark? Commented Nov 21, 2018 at 2:18
  • What does this have to do with Netsuite? FWIW All tables in Netsuite have a primary key ( generally a sequence) Commented Nov 21, 2018 at 7:14
  • @bknights When was the last time you joined OA_TABLES to OA_FKEYS? Commented Nov 21, 2018 at 17:02
  • recently. What are you trying to find? Commented Nov 21, 2018 at 20:17

2 Answers 2

1

With some hints provided by the comments in response to my question, as well as the answer here: How to run independent transformations in parallel using PySpark? I investigated the use of threading instead of multiprocessing. I took a more careful look at one of the answers here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark? and noticed the use of:

from multiprocessing.pool import ThreadPool

I was able to make it work, like this:

from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
pool.close() 
pool.join() 
print(*results, sep='\n')

which prints:

C:\src\NetsuiteSparkProject\SALES_TERRITORY_PLAN_PARTNER\SALES_TERRITORY_PLAN_PARTNER.parquet exists!
C:\src\NetsuiteSparkProject\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
C:\src\NetsuiteSparkProject\ITEM_ACCOUNT_MAP\ITEM_ACCOUNT_MAP.parquet exists!
C:\src\NetsuiteSparkProject\PRODUCT_TRIAL_STATUS\PRODUCT_TRIAL_STATUS.parquet exists!
C:\src\NetsuiteSparkProject\ACCOUNT_PERIOD_ACTIVITY\ACCOUNT_PERIOD_ACTIVITY.parquet exists!
Sign up to request clarification or add additional context in comments.

Comments

0

Basically, Spark takes care of parallelization under the hood and doesn't require using the multiprocessing package, in fact it probably interferes with Spark and is completely unnecessary. But a few things must be done to take advantage of this. The key is to build the queries and transformations first, but don't do any Actions . Also make sure your spark cluster is setup with multiple worker nodes where the work is distributed to. An easy way to do this is to use DataBricks notebooks or other services available from the large cloud vendors which set all this up for you.

Spark has two modes. TRANSFORMATIONS (which don’t execute anything but simply set up the queries and transformation sort of like SQL). And ACTIONS which actually execute the query and act on the results. count() is an action. show() is an action. A query is a transformation and a table addition is a transformation.

To use the inherent parallelism built into Spark, write several queries and transformations in Spark to different tables but don’t collect() or count() or show() the results (Don't perform any actions at this point, only transformations). This will internally arrange the queries but will not execute them ( this is sparks lazy mode) .

Then later in the code when you run an action (like count or show or collect) it will automatically distribute the work to all the available nodes in parallel. That’s the whole Beauty of spark. No special multi-processing is required on your local device, it is all handled by Spark.

Here is a pySpark example:

    # First build the queries but don't collect any data.
        part1_sdf = spark.sql(
          "SELECT UtcTime, uSecDelay, sender, Recipient, date , ID "
          "FROM Delay_table "
          "WHERE date between DATE_ADD(now(), - 60) AND DATE_ADD(now(), -59) "
          "AND ID = 'my_id' "
          "ORDER BY UtcTime DESC "
        )
        part2_sdf = spark.sql(
          "SELECT UtcTime, uSecDelay, sender, Recipient, date, ID "
          "FROM Delay_table "
          "WHERE date between DATE_ADD(now(), -58) AND DATE_ADD(now(), -57) "
          "AND ID = 'my_id' "
          "ORDER BY UtcTime DESC "
        )
        # Peform a Transformation on the 2 queries.  No data is pulled up to this point
        transformed_df = part1_sdf.union(part2_sdf)
        # Finally when an action is called, the data is pulled in parallel:
        transformed_df.show(10)
        ### Output
        +--------------------------+--------------------------------------+-----------------+--------------------+----------+--------+
        |UtcTime|                   uSecDelay|                              
 sender|Recipient|                    date|                                 ID|
        +--------------------------+--------------------------------------+-----------------+--------------------+----------+--------+
        |      2020-01-05 01:39:...|                                    69|                4|                  28|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    65|                4|                  26|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    62|                4|                   0|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                   108|                4|                  16|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    68|                4|                  27|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    71|                4|                  53|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    68|                4|                   7|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    65|                4|                  57|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    64|                4|                  56|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    66|                4|                  44|2020-01-05|  my_id|
        +--------------------------+--------------------------------------+-----------------+--------------------+----------+--------+
        only showing top 10 rows

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.