6

This question talks about how to chain custom PySpark 2 transformations.

The DataFrame#transform method was added to the PySpark 3 API.

This code snippet shows a custom transformation that doesn't take arguments and is working as expected and another custom transformation that takes arguments and is not working.

from pyspark.sql.functions import col, lit

df = spark.createDataFrame([(1, 1.0), (2, 2.)], ["int", "float"])

def with_funny(word):
    def inner(df):
        return df.withColumn("funny", lit(word))
    return inner

def cast_all_to_int(input_df):
    return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])

df.transform(with_funny("bumfuzzle")).transform(cast_all_to_int).show()

Here's what's outputted:

+---+-----+-----+
|int|float|funny|
+---+-----+-----+
|  1|    1| null|
|  2|    2| null|
+---+-----+-----+

How should the with_funny() method be defined to output a value for the PySpark 3 API?

1
  • Can you point me to some resource to understand the inner function definition and why it works? Was trying to solve the multi-parameter transform for hours and am still not sure how it works with the inner function call. Commented Feb 23, 2022 at 6:08

2 Answers 2

6

If I understood, your first transform method will add a new column with a literal that is passed as an argument and the last transform casts all the columns to int type, correct?

casting a string to int will return a null value, your final output is correct:

from pyspark.sql.functions import col, lit

df = spark.createDataFrame([(1, 1.0), (2, 2.)], ["int", "float"])

def with_funny(word):
    def inner(df):
        return df.withColumn("funny", lit(word))
    return inner

def cast_all_to_int(input_df):
    return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])

#first transform
df1 = df.transform(with_funny("bumfuzzle"))
df1.show()

#second transform
df2 = df1.transform(cast_all_to_int)
df2.show()

#all together
df_final = df.transform(with_funny("bumfuzzle")).transform(cast_all_to_int)
df_final.show()

Output:

+---+-----+---------+
|int|float|    funny|
+---+-----+---------+
|  1|  1.0|bumfuzzle|
|  2|  2.0|bumfuzzle|
+---+-----+---------+

+---+-----+-----+
|int|float|funny|
+---+-----+-----+
|  1|    1| null|
|  2|    2| null|
+---+-----+-----+

+---+-----+-----+
|int|float|funny|
+---+-----+-----+
|  1|    1| null|
|  2|    2| null|
+---+-----+-----+

Maybe what you want is switch the order of your transformations like this:

df_final = df.transform(cast_all_to_int).transform(with_funny("bumfuzzle"))
df_final.show()

Output:

+---+-----+---------+
|int|float|    funny|
+---+-----+---------+
|  1|    1|bumfuzzle|
|  2|    2|bumfuzzle|
+---+-----+---------+
Sign up to request clarification or add additional context in comments.

2 Comments

does the parameter for inner "df" have to match the dataframe you create on line 3? Or will it work for all df, including those named something other then "df"?
No, it seems not. Tried with different df and still works. I did not know this inner function definition trick. Spent hours solving the multiple parameter transform.
2

It has been solved in pyspark 3.3.0

def transform(self, func: Callable[..., "DataFrame"], *args: Any, **kwargs: Any) -> "DataFrame":
"""Returns a new :class:`DataFrame`. Concise syntax for chaining custom transformations.

.. versionadded:: 3.0.0

Parameters
----------
func : function
    a function that takes and returns a :class:`DataFrame`.
*args
    Positional arguments to pass to func.

    .. versionadded:: 3.3.0
**kwargs
    Keyword arguments to pass to func.

    .. versionadded:: 3.3.0

Examples
--------
>>> from pyspark.sql.functions import col
>>> df = spark.createDataFrame([(1, 1.0), (2, 2.0)], ["int", "float"])
>>> def cast_all_to_int(input_df):
...     return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])
>>> def sort_columns_asc(input_df):
...     return input_df.select(*sorted(input_df.columns))
>>> df.transform(cast_all_to_int).transform(sort_columns_asc).show()
+-----+---+
|float|int|
+-----+---+
|    1|  1|
|    2|  2|
+-----+---+
>>> def add_n(input_df, n):
...     return input_df.select([(col(col_name) + n).alias(col_name)
...                             for col_name in input_df.columns])
>>> df.transform(add_n, 1).transform(add_n, n=10).show()
+---+-----+
|int|float|
+---+-----+
| 12| 12.0|
| 13| 13.0|
+---+-----+
"""
result = func(self, *args, **kwargs)
assert isinstance(
    result, DataFrame
), "Func returned an instance of type [%s], " "should have been DataFrame." % type(result)
return result

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.