0
>> df = hc.createDataFrame([('a', [1.0, 1.0]), ('a',[1.0, 0.2,0.3,0.7]), ('b', [1.0]),('c' ,[1.0, 0.5]), ('d', [0.55, 1.0,1.4]),('e', [1.05, 1.0])])


>> df.show()
+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  a|          [1.0, 1.0]|
|  a|[1.0, 0.2, 0.3, 0.7]|
|  b|               [1.0]|
|  c|          [1.0, 0.5]|
|  d|    [0.55, 1.0, 1.4]|
|  e|         [1.05, 1.0]|
+---+--------------------+

Now, I want to apply a function like a sum or mean on the column, "_2" to create a column, "_3" For example, I created a column using the sum function The result should look like below

+---+--------------------+----+
| _1|                  _2|  _3|
+---+--------------------+----+
|  a|          [1.0, 1.0]| 2.0|
|  a|[1.0, 0.2, 0.3, 0.7]| 2.2|
|  b|               [1.0]| 1.0|
|  c|          [1.0, 0.5]| 1.5|
|  d|    [0.55, 1.0, 1.4]|2.95|
|  e|         [1.05, 1.0]|2.05|
+---+--------------------+----+

Thanks in advance

1 Answer 1

3

TL;DR Unless you use proprietary extensions you have to define an UserDefinedFunction for each operation:

from pyspark.sql.functions import udf
import numpy as np

@udf("double")
def array_sum(xs):
    return np.sum(xs).tolist() if xs is not None else None

@udf("double")
def array_mean(xs):
    return np.mean(xs).tolist() if xs is not None else None

(df
    .withColumn("mean", array_mean("_2"))
    .withColumn("sum", array_sum("_2")))

In some cases you might prefer to explode and aggregate, but it has limited applications and is typically much more expensive, unless data is already partitioned by unique identifier.

from pyspark.sql.functions import monotonically_increasing_id, first, mean, sum, explode

(df
    .withColumn("_id", monotonically_increasing_id()).withColumn("x", explode("_2"))
    .groupBy("_id")
    .agg(first("_1"), first("_2"), mean("x"), sum("x")))
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for your response.
But rather than using a decorator function, I tried to register the function udf and perform the same operation. I received below error TypeError: cannot perform reduce with flexible type I couldn't get the magic done by decorator here - can you please explain ??

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.