2

I've written a program in Python and pandas which takes a very large dataset (~4 million rows per month for 6 months), groups it by 2 of the columns (date and a label), and then applies a function to each group of rows. There are a variable number of rows in each grouping - anywhere from a handful of rows to thousands of rows. There are thousands of groups per month (label-date combos).

My current program uses multiprocessing, so it's pretty efficient, and I thought would map well to Spark. I've worked with map-reduce before, but am having trouble implementing this in Spark. I'm sure I'm missing some concept in the pipelining, but everything I've read appears to focus on key-value processing, or splitting a distributed dataset by arbitrary partitions, rather than what I'm trying to do. Is there a simple example or paradigm for doing this? Any help would be greatly appreciated.

EDIT: Here's some pseudo-code for what I'm currently doing:

reader = pd.read_csv()
pool = mp.Pool(processes=4)
labels = <list of unique labels>
for label in labels:
    dates = reader[(reader.label == label)]
    for date in dates:
        df = reader[(reader.label==label) && (reader.date==date)]
        pool.apply_async(process, df, callback=callbackFunc)
pool.close()
pool.join()

When I say asynchronous, I mean something analogous to pool.apply_async().

2
  • process is an arbitrary function or do you have anything particular in mind? Is it commutative and associative? Commented Sep 25, 2015 at 18:15
  • @zero323 For this program, process is a function that reads external data for that date, and then iterates over all of the rows for the label on that date and calculates several different values to associate with each row. Commented Sep 25, 2015 at 18:17

1 Answer 1

1

As for now (PySpark 1.5.0) is see only two three options:

  1. You can try to express your logic using SQL operations and UDFs. Unfortunately Python API doesn't support UDAFs (User Defined Aggregate Functions) but it is still expressive enough, especially with window functions, to cover wide range of scenarios.

    Access to the external data sources can be handled in a couple of ways including:

    • access inside UDF with optional memoization
    • loading to a data frame and using join operation
    • using broadcast variable
  2. Converting data frame to PairRDD and using on of the following:

    • partitionBy + mapPartitions
    • reduceByKey / aggregateByKey

If Python is not a strong requirement Scala API > 1.5.0 supports UDAFs which enable something like this:

df.groupBy(some_columns: _*).agg(some_udaf)
  1. Partitioning data by key and using local Pandas data frames per partition
Sign up to request clarification or add additional context in comments.

2 Comments

That's something I didn't think about - Python is not a strong requirement, I could certainly switch over to Scala to try this out. For your Scala example, can you provide a little more detail (or a link to somewhere I can get started)? Thanks!
Two simple examples: stackoverflow.com/a/32750733/1560062, stackoverflow.com/a/32101530/1560062. You'll some other in the Spark source.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.