I have to make a left join between a principle data frame and several reference data frame, so a chained join computation. And I wonder how to make this action efficient and scalable.
Method 1 is easy to understand, which is also the current method, but I'm not satisfied because all the transformations have been chained and waited for the final action to trigger the computation, if I continue to add transformation and the volume of data, spark will fail at the end, so this method is not scalable.
Method 1:
def pipeline(refDF1: DataFrame, refDF2: DataFrame, refDF3: DataFrame, refDF4: DataFrame, refDF5: DataFrame): DataFrame = {
val transformations: List[DataFrame => DataFrame] = List(
castColumnsFromStringToLong(ColumnsToCastToLong),
castColumnsFromStringToFloat(ColumnsToCastToFloat),
renameColumns(RenameMapping),
filterAndDropColumns,
joinRefDF1(refDF1),
joinRefDF2(refDF2),
joinRefDF3(refDF3),
joinRefDF4(refDF4),
joinRefDF5(refDF5),
calculate()
)
transformations.reduce(_ andThen _)
}
pipeline(refDF1, refDF2, refDF3, refDF4, refDF5)(principleDF)
Method 2: I've not found a real way to achieve my idea, but I hope to trigger the computation of each join immediately.
according to my test, count() is too heavy for spark and useless for my application, but I don't know how to trigger the join computation with an efficient action. This kind of action is, in fact, the answer to this question.
val joinedDF_1 = castColumnsFromStringToLong(principleDF, ColumnsToCastToLong)
joinedDF_1.cache() // joinedDF is not always used multiple times, but for some data frame, it is, so I add cache() to indicate the usage
joinedDF_1.count()
val joinedDF_2 = castColumnsFromStringToFloat(joinedDF_1, ColumnsToCastToFloat)
joinedDF_2.cache()
joinedDF_2.count()
val joinedDF_3 = renameColumns(joinedDF_2, RenameMapping)
joinedDF_3.cache()
joinedDF_3.count()
val joinedDF_4 = filterAndDropColumns(joinedDF_4)
joinedDF_4.cache()
joinedDF_4.count()
...
persistmethod by specifying the storage level as memory and disk