1

I have a PySpark dataframe, which is like

+---+------+------+
|key|value1|value2|
+---+------+------+
|  a|     1|     0|
|  a|     1|    42|
|  b|     3|    -1|
|  b|    10|    -2|
+---+------+------+

I have defined a pandas_udf like -

schema = StructType([
    StructField("key", StringType())
])

arr = []
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def g(df):
    k = df.key.iloc[0]
    series = [d for d in df.value2]
    arr.append(len(series))
    print(series)
    return pd.DataFrame([k])
df3.groupby("key").apply(g).collect()
print(arr)

As evident, the array arr should have been [2, 2], but it remains empty. The output of print(series) looks correct when I checked driver logs, but the array remains empty.

The return type doesn't matter to me as I'm not changing/processing the data, I just want to push it in a custom class object.

1
  • could you try making arr global like global arr=[]?If it didn't work try to broadcast the variable with sc.broadcast(arr) Commented Jun 26, 2020 at 6:32

1 Answer 1

1

I had to define a custom Accumulator for a list and use it.

from pyspark.accumulators import AccumulatorParam
class ListParam(AccumulatorParam):
    def zero(self, val):
        return []
    def addInPlace(self, val1, val2):
        val1.append(val2)
        return val1
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.