1

I have to make a left join between a principle data frame and several reference data frame, so a chained join computation. And I wonder how to make this action efficient and scalable.

Method 1 is easy to understand, which is also the current method, but I'm not satisfied because all the transformations have been chained and waited for the final action to trigger the computation, if I continue to add transformation and the volume of data, spark will fail at the end, so this method is not scalable.

Method 1:

  def pipeline(refDF1: DataFrame, refDF2: DataFrame, refDF3: DataFrame, refDF4: DataFrame, refDF5: DataFrame): DataFrame = {

  val transformations: List[DataFrame => DataFrame] = List(
    castColumnsFromStringToLong(ColumnsToCastToLong),
    castColumnsFromStringToFloat(ColumnsToCastToFloat),
    renameColumns(RenameMapping),
    filterAndDropColumns,
    joinRefDF1(refDF1),
    joinRefDF2(refDF2),
    joinRefDF3(refDF3),
    joinRefDF4(refDF4),
    joinRefDF5(refDF5),
    calculate()
  )

  transformations.reduce(_ andThen _)

  }

  pipeline(refDF1, refDF2, refDF3, refDF4, refDF5)(principleDF)

Method 2: I've not found a real way to achieve my idea, but I hope to trigger the computation of each join immediately.

according to my test, count() is too heavy for spark and useless for my application, but I don't know how to trigger the join computation with an efficient action. This kind of action is, in fact, the answer to this question.

  val joinedDF_1 = castColumnsFromStringToLong(principleDF, ColumnsToCastToLong)
  joinedDF_1.cache() // joinedDF is not always used multiple times, but for some data frame, it is, so I add cache() to indicate the usage
  joinedDF_1.count()  

  val joinedDF_2 = castColumnsFromStringToFloat(joinedDF_1, ColumnsToCastToFloat)
  joinedDF_2.cache()
  joinedDF_2.count()

  val joinedDF_3 = renameColumns(joinedDF_2, RenameMapping)
  joinedDF_3.cache()
  joinedDF_3.count()

  val joinedDF_4 = filterAndDropColumns(joinedDF_4)
  joinedDF_4.cache()
  joinedDF_4.count()

  ...
4
  • instead of cache, can you try using persist method by specifying the storage level as memory and disk Commented Apr 12, 2019 at 12:40
  • I'm curious to know why you need to have exactly 6 DataFrames (and not an undertermined number of DataFrames) or why would you need exactly N operations (and not an undetermined number of transformations). Commented Apr 12, 2019 at 12:56
  • @belka in fact it is undetermined number of data frame and transforme action, I only give an example to show the demand of the application. Commented Apr 12, 2019 at 13:06
  • @mingzhao.pro Ok I see. Try forcing the caching between each transformation and then + caching then unpersisting when the previously cached DataFrame is no longer needed. Try to do it first with only 2 operations and 3 DataFrames to check on a simplest example and iterate if that's not what you want. Commented Apr 12, 2019 at 13:10

3 Answers 3

2

When you want to force the computation of a given join (or any transformation that is not final) in Spark, you can use a simple show or count on your DataFrame. This kind of terminal points will force the computation of the result because otherwise it is simply not possible to execute the action.

Only after this will your DataFrame be effectively stored in your cache.

Once you're finished with a given DataFrame, don't hesitate to unpersist. This will unpersist your data if your cluster need more room for further computation.

Sign up to request clarification or add additional context in comments.

5 Comments

count is heavy and show will only compute a part of the Data Frame, there is really no other options?
I'm afraid the usage of count will also be a vulnerable point when the volume of data continue to grow
For me that's the only way to trigger an operation explicitly. Another option would be to persist instead of cache with a level defined as StorageLevel.MEMORY_AND_DISK or DISK_ONLY (at your convenience) if you don't have such a big cache on your cluster.
hey, I was left for two weeks on vacation, I will continue my research on this topic and keep updated. If you have any idea, I will be glad to test.
@mingzhao.pro if this solved your issue don't hesitate to accept the answer ;)
0

You need to repartitions your dataset with the columns before calling the join transformation.

Example:

df1=df1.repartion(col("col1"),col("col2"))
df2=df2.repartion(col("col1"),col("col2"))
joinDF = df1.join(jf2,df1.col("col1").equals(df2.col("col1")) &....)

1 Comment

why would we need that ?
0

Try creating a new dataframe based on it. Ex: val dfTest = session.createDataFrame(df.rdd, df.schema).cache() dfTest .storageLevel.useMemory // result should be a true.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.