7

I'm retrofitting some existing code to use Spark. I've multiple data frames that hold different data sets. While transforming my main dataframe (or my main data set), I need to use data from the other data frames to complete the transformation. I also have a situation (atleast in the current structure) where I need to create new data frames in the transformation function of another data frames.

I'm trying to determine the following:

  1. Can I access a data frame inside the transformation function of another data frame?
  2. Can a data frame be created on an executor inside the transformation function of a dataframe?

Pointers on how to deal with such a situation would be very helpful.

1
  • What kind of data are you looking to get from your secondary/auxiliary dataframes; is it key-value, aggregate, etc? What kind of transformation are you talking about doing to your main DF; grouping + aggregation, using the map function over RDD, etc? Commented Sep 1, 2017 at 22:00

1 Answer 1

10

The answer to both questions is NO:

DataFrames are driver-side abstractions of distributed collections. They cannot be used, created, or referenced in any executor-side transformation.

Why? DataFrames (like RDDs and Datasets) can only be used within the context of an active SparkSession - without it, the DataFrame cannot "point" to its partitions on the active executors; The SparkSession should be thought of as a live "connection" to the cluster of executors.

Now, if you try using a DataFrame inside another transformation, that DataFrame would have to be serialized on the driver side, sent to the executor(s), and then deserialized there. But this deserialized instance (in a separate JVM) would necessarily lose it's SparkSession - that "connection" was from the driver to the executor, not from this new executor we're now operating in.

So what should you do? You have a few options for referencing one DataFrame's data in another, and choosing the right one is mostly dependent on the amounts of data that would have to be shuffled (or - transferred between executors):

  1. Collect one of the DataFrames (if you can guarantee it's small!), and then use the resulting local collection (either directly or using spark.broadcast) in any transformation.

  2. Join the two DataFrames on some common fields. This is a very common solution, as the logic of using one DataFrame's data when transforming another usually has to do with some kind of "lookup" for the right value based on some subset of the columns. This usecase translates into a JOIN operation rather naturally

  3. Use set operators like except, intersect and union, if they provide the logical operation you're after.

Sign up to request clarification or add additional context in comments.

6 Comments

Ok, I had an inclination that the answer is No, but your explanation was very helpful. A join would work for me for some of the datasets. I'm not sure I understand how I could do a collect and use it locally in an executor. Wouldn't the collect result in the dataset residing on the driver locally?
Right - collect would result in a driver-side, local result; If you then try using that result in a transformation, Spark will serialize that collection and send it to each of the executors along with the function that uses it; This will be done per task, so if this collection isn't very small, you can use broadcast to make sure it's sent only once per executor. Both of these options are relevant only if that collection is small enough to reside in driver's memory.
collect + broadcast can be helpful in cases where two datasets don't have a direct relation and you need to find the relation in the other dataset i.e a straightforward join or range join may not work. But what I'm still trying to figure out is how to deal with situations where the second dataset will not fit in memory. How to work with datasets where there is no direct relationship between the two but you want to distribute the processing on spark?
Even when there's no "direct" relationship - there's some relationship: there's some logic you'd use to match one or more records from one DataFrame to each record in the other, right? Usually this logic can be represented as a series of transformations on the individual DataFrames and joins between them. If you have a specific requirement where you can't find that translation - feel free to post a separate question.
@BdEngineer sorry that's not how this platform works... If you can describe your problem as a new question you'll have a better chance at getting helpful answers.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.