0

I would like to know if it is possible to package a series of PySpark commands into a function such that such a function takes a dataframe and applies them to the dataframe. Something we do in Python.

For example, I have the following dataframe:

sevents_df.show(5)

+-------+--------+-------------+----------------+------------+-----+
|Counter|Duration|StartTime    |TypeEnumeration |Floor_Number|Value|
+-------+--------+-------------+----------------+------------+-----+
|    1.0|    5460|1503067077370|UC_001          |         NaN|  NaN|
|    1.0|     322|1503067090480|UC_008          |         NaN|  NaN|
|    1.0|     990|1503067099300|UC_001          |         NaN|  NaN|
|    1.0|    5040|1503067396060|UC_001          |         NaN|  NaN|
|    1.0|    6090|1503067402150|UC_001          |         NaN|  NaN|
+-------+--------+-------------+----------------+------------+-----+

Step 1. The first thing I do is to filter out the type. I simply keep UC_001.

sevents_filter = sevents_df.filter(sevents_df['TypeEnumeration'].isin(['UC_001']) == True)

Step 2. Drop some columns:

columns_to_drop = ['Comments', 'Floor_Number', 'Value']
sevents_clean = sevents_filter.drop(*columns_to_drop)

Step 3. Convert StartTime to Date

def convert_to_seconds(x):
    return x/1000

udf_myFunction = udf(convert_to_seconds, IntegerType())
sevents2 = sevents2.withColumn("StartTime", udf_myFunction("StartTime"))
sevents4 = sevents2.withColumn('epoch',
                               f.date_format(sevents2.StartTime.cast(dataType=t.TimestampType()),"yyyy-MM-dd"))

I would like to put these three steps in a function like:

some udf pySpark_function(dataframe):
    step 1
    step 2
    step 3

The reason I want to do this is because if I have N dataframes I cannot imagine writing these steps N times.

One solution is to concatenate these N frames into one frame and pass this one giant frame through these steps once. Are there any alternatives to passing one frame at a time?

1
  • 1
    You can also look at building a custom pipeline transformer - see this post. Commented Aug 10, 2018 at 15:03

1 Answer 1

2

An UDF is used to process values in dataframe columns and can't be used to process a whole dataframe. Instead, create a normal method that takes a dataframe and returns a processed dataframe.

def process_df(df):
    df = df.filter(df['TypeEnumeration'] == 'UC_001')

    columns_to_drop = ['Comments', 'Floor_Number', 'Value']
    df = df.drop(*columns_to_drop)

    df = df.withColumn('epoch', f.date_format((df.StartTime / 1000).cast(t.TimestampType()), "yyyy-MM-dd"))

    return df

Then simply loop over all the dataframes and use the above method.

Note: I made some simplifications to the code. There is no need for isin since you are only filtering with a single value and no UDF is necessary to divide by 1000. When possible it's preferable to use the inbuilt Spark functions instead of custom a UDF, it's faster.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.