2

I am trying to apply pandas_udf in pyspark.

I have a pyspark dataframe like the following:

+-------------------+------------------+--------+-------+
|                lat|               lon|duration|stop_id|
+-------------------+------------------+--------+-------+
|  -6.23748779296875| 106.6937255859375|     247|      0|
|  -6.23748779296875| 106.6937255859375|    2206|      1|
|  -6.23748779296875| 106.6937255859375|     609|      2|
| 0.5733972787857056|101.45503234863281|   16879|      3|
| 0.5733972787857056|101.45503234863281|    4680|      4|
| -6.851855278015137|108.64261627197266|     164|      5|
| -6.851855278015137|108.64261627197266|     220|      6|
| -6.851855278015137|108.64261627197266|    1669|      7|
|-0.9033176600933075|100.41548919677734|   30811|      8|
|-0.9033176600933075|100.41548919677734|   23404|      9|
+-------------------+------------------+--------+-------+

I am trying a simple function to create a column test that is 1 if duration is greater than 1000 and 0 otherwise.

schema =StructType([
  StructField('test', IntegerType(), True),
  StructField('stop_id', IntegerType(), True)
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def probTime(stop_df):
    stopid = stop_df['stop_id'].values[0]
    val = stop_df['duration'].values[0]
    test = 0
    if val > 1000:
      test = 1
    df = pd.DataFrame()
    df['prob_time'] = test
    df['stop_id'] = stopid
    return df

but I get an empty table.

sp = stop_df.groupBy("stop_id").apply(probTime)
sp.show(5)

+----+-------+
|test|stop_id|
+----+-------+
+----+-------+

2 Answers 2

1

The issue arises with the assignment of the new DF inside the grouping function: you need to assign the values as lists. Take for instance the following examples:

df = pd.DataFrame()
test = 1
stopid = 1
df['prob_time'] = test
df['stop_id'] = stopid
print(df)

This yields:

Columns: [prob_time, stop_id]
Index: []

compared to

df = pd.DataFrame()
test = 1
stopid = 1
df['prob_time'] = [test]
df['stop_id'] = [stopid]
print(df)

which yields

   prob_time  stop_id
0          1        1

Therefore you should change your code to the latter form.

Sign up to request clarification or add additional context in comments.

Comments

0

Instead of writing a function, it could be done directly on spark using 'when' function.

1) import when function

from pyspark.sql.functions import when

2) Use it to create a new column in existing dataframe

stop_df = stop_df.withColumn('test', when(stop_df['duration']>1000, 1).otherwise(0))

stop_df dataframe will have the test column with required values

1 Comment

I would need to understand the function because I need to write a more complex function

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.