4

I'm trying to create a user-defined function that takes a cumulative sum of an array and compares the value to another column. Here is a reproducible example:

from pyspark.sql.session import SparkSession

# instantiate Spark
spark = SparkSession.builder.getOrCreate()

# make some test data
columns = ['loc', 'id', 'date', 'x', 'y']
vals = [
    ('a', 'b', '2016-07-01', 1, 5),
    ('a', 'b', '2016-07-02', 0, 5),
    ('a', 'b', '2016-07-03', 5, 15),
    ('a', 'b', '2016-07-04', 7, 5),
    ('a', 'b', '2016-07-05', 8, 20),
    ('a', 'b', '2016-07-06', 1, 5)
]

# create DataFrame
temp_sdf = (spark
      .createDataFrame(vals, columns)
      .withColumn('x_ary', collect_list('x').over(Window.partitionBy(['loc','id']).orderBy(desc('date')))))

temp_df = temp_sdf.toPandas()

def test_function(x_ary, y):
  cumsum_array = np.cumsum(x_ary) 
  result = len([x for x in cumsum_array if x <= y])
  return result

test_function_udf = udf(test_function, ArrayType(LongType()))

temp_df['len'] = temp_df.apply(lambda x: test_function(x['x_ary'], x['y']), axis = 1)
display(temp_df)

In Pandas, this is the output:

loc id  date        x   y   x_ary           len
a   b   2016-07-06  1   5   [1]             1
a   b   2016-07-05  8   20  [1,8]           2
a   b   2016-07-04  7   5   [1,8,7]         1
a   b   2016-07-03  5   15  [1,8,7,5]       2
a   b   2016-07-02  0   5   [1,8,7,5,0]     1
a   b   2016-07-01  1   5   [1,8,7,5,0,1]   1

In Spark using temp_sdf.withColumn('len', test_function_udf('x_ary', 'y')), all of len ends up being null.

Would anyone know why this is the case?

Also, replacing cumsum_array = np.cumsum(np.flip(x_ary)) fails in pySpark with error AttributeError: module 'numpy' has no attribute 'flip', but I know it exists as I can run it fine with Pandas dataframe.
Can this issue be resolved, or is there a better way to flip arrays with pySpark?

Thanks in advance for your help.

3
  • flip takes 2 arguments, seems you have not provided axis for it Commented Oct 18, 2019 at 7:48
  • @Sri_Karthik Second argument is optional; numpy flip Commented Oct 18, 2019 at 13:45
  • @Sri_Karthik Turns out I needed to upgrade pyspark to 2.4.4 from 2.4.3 - seems to work now. Commented Oct 18, 2019 at 15:17

1 Answer 1

3

Since test_function returns integer not List/Array. You will get null values as have you mentioned wrong return type. So please remove "ArrayType from udf" or replace return type as LongType() then it will work as given below. :

Note: You can optionally set the return type of your UDF else the default return type is StringType.

Option1:

test_function_udf = udf(test_function) # Returns String type

Option2:

test_function_udf = udf(test_function, LongType())  #Returns Long/integer type

temp_sdf = temp_sdf.withColumn('len', 
           test_function_udf('x_ary', 'y'))
temp_sdf.show()
Sign up to request clarification or add additional context in comments.

4 Comments

that worked, but I sure don't understand why it worked. Can you explain?
Hi Kai, please refer the test_function . The test_function returns integer type not List. Hence Mentioning Array Type will return null values. Please refer my edited answer.
This doesn't work for me. I get back Ljava.lang.Object;@612e71a4 or null. I'm using spark 3.2.1
can you give me sample function how you are writing

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.