2

I have a dataframe where I am using groupBy on the key and using collect_list to create an array of struct using col1 and col2. I want to sort the structs inside collect_list by the 2nd element (col2) after forming the collect_list. I am not sure if I sort the dataframe by col2 initially and then do collect_list it will preserve the sort order or not (I found yes and no both answers in Spark). So I prefer to sort after collect_list is created since my next logic depends on the sort order. I tried udf which sometimes works, but sometimes it's throwing an error.

import pyspark.sql.functions as F
from pyspark.sql.functions import collect_list, collect_set, expr, struct
import operator
from operator import itemgetter

def sorter(key_value_list):
  res= sorted(key_value_list, key=lambda x:x[1], reverse=True)
  return [ [item[0], item[1]] for item in res]

The return here (return [ [item[0], item[1]] for item in res]) I also tried the below, but nothing worked. Only the above statement works sometimes. But on the bulk data it shows error.

return [ concat_ws('|',[item[0],item[1]) for item in res]
return [ array([item[0],item[1]) for item in res]

sort_udf = F.udf(sorter)
    
df1=df.groupBy("group_key").agg( F.collect_list ( F.struct("col1","col2")).alias("key_value"))
df1.withColumn("sorted_key_value", sort_udf("key_value")).show(truncate=False)
df1.printSchema()

Input:

group_key col1 col2
123       a    5
123       a    6
123       b    6
123       cd   3 
123       d    2
123       ab   9
456       d    4  
456       ad   6 
456       ce   7 
456       a    4 
456       s    3 

Normal output without sorting:

group_key key_value_arr
123 [[a, 5], [a, 6], [b, 6], [cd, 3], [d 2], [ab, 9]]
456 [[d, 4], [ad, 6], [ce, 7], [a, 4], [s, 3]]

Intended output. When I get this output I get a string returned. I want a array of string.

group_key key_value_arr
123 [[ab, 9], [a, 6], [b, 6], [a, 5], [cd, 3], [d 2]]
456 [[ce, 7], [ad, 6], [d, 4], [a, 4], [s, 3]]

Error on bulk data:

  File "/hadoop/6/yarn/local/usercache/b_incdata_rw/appcache/application_1660704390900_2796904/container_e3797_1660704390900_2796904_01_000002/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o184.showString.

Another way I tried to return array of string (col1 | delimited with col2)

def sorter(key_value_list):
    l = []
    s = ""
    res = sorted(key_value_list, key=lambda x:x[1], reverse=True)
    for item in res:
        s = F.concat_ws('|', item[0], item[1])
        l.append(s)
    return l

sort_udf = F.udf(sorter, ArrayType(StringType()))

df6 = df4.withColumn("sorted_key_value", sort_udf("key_value"))
df6.show(truncate=False)
df6.printSchema()

I tried to return res directly as a list that also has same error.

 File "/hadoop/10/yarn/local/usercache/b_incdata_rw/appcache/application_1660704390900_2799810/container_e3797_1660704390900_2799810_01_000453/pyspark.zip/pyspark/sql/functions.py", line 1398, in concat_ws
    return Column(sc._jvm.functions.concat_ws(sep, _to_seq(sc, cols, _to_java_column)))
AttributeError: 'NoneType' object has no attribute '_jvm'

3rd approach tried

def def_sort(x):
        return sorted(x, key=lambda x:x.split('|')[1], reverse=True)

udf_sort = F.udf(def_sort, ArrayType(StringType()))

df_full_multi2.withColumn("sorted_list", array_distinct(udf_sort("key_value"))).show(100, truncate=False)

I get intended result as below

group_key sorted_list
123 [ab|9, a|6, b|6, a|5, cd|3, d|2]
456 [[ce|7, ad|6, d|4, a|4, s|3]

However, when I run it write to parquet I get error

An error occurred while calling o178.parquet.Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 807 in stage 2.0 failed 4 times, most recent failure: Lost task 807.3 in stage 2.0 (TID 3495, hdc42-mcc10-01-1110-4105-004-tess0029.stratus.rno.ebay.com, executor 1006): ExecutorLostFailure (executor 1006 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 21.1 GB of 21 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)  File "/hadoop/10/yarn/local/usercache/b_incdata_rw/appcache/application_1663728370843_1731/container_e3798_1663728370843_1731_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o178.parquet.

1 Answer 1

1
  • create struct the opposite way - first "col2", then "col1"
  • sort the array descending using sort_array(... , False)
  • flip the fields inside struct using transform

Input:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(123, 'a', 5),
     (123, 'a', 6),
     (123, 'b', 6),
     (123, 'cd', 3 ),
     (123, 'd', 2),
     (123, 'ab', 9),
     (456, 'd', 4  ),
     (456, 'ad', 6 ),
     (456, 'ce', 7 ),
     (456, 'a', 4 ),
     (456, 's', 3 )],
    ['group_key', 'col1', 'col2'])

Script:

df1 = (df
    .groupBy("group_key")
    .agg(F.sort_array(F.collect_list(F.struct("col2", "col1")), False).alias("key_value"))
)
df2 = df1.withColumn("key_value", F.expr("transform(key_value, x -> struct(x.col1, x.col2))"))

df2.show(truncate=0)
# +---------+--------------------------------------------------+
# |group_key|key_value                                         |
# +---------+--------------------------------------------------+
# |123      |[{ab, 9}, {b, 6}, {a, 6}, {a, 5}, {cd, 3}, {d, 2}]|
# |456      |[{ce, 7}, {ad, 6}, {d, 4}, {a, 4}, {s, 3}]        |
# +---------+--------------------------------------------------+

If you need a more advanced sorting, you may create a comparator function. Refer to this question for examples on sorting arrays of struct.


In case you want array of string:, use this:

df1 = (df
    .groupBy("group_key")
    .agg(F.sort_array(F.collect_list(F.struct("col2", "col1")), False).alias("key_value"))
)
df2 = df1.withColumn("key_value", F.expr("transform(key_value, x -> concat(x.col1, '|', x.col2))"))

df2.show(truncate=0)
# +---------+--------------------------------+
# |group_key|key_value                       |
# +---------+--------------------------------+
# |123      |[ab|9, b|6, a|6, a|5, cd|3, d|2]|
# |456      |[ce|7, ad|6, d|4, a|4, s|3]     |
# +---------+--------------------------------+
Sign up to request clarification or add additional context in comments.

3 Comments

"create struct the opposite way - first "col2", then "col1" - this did the trick. Thanks. I could use "transform" from spark 3.1. df2 = df1.withColumn("key_value_sorted", F.array_distinct(F.expr("transform(key_value, x -> concat(x.col1, '|', x.col2))") ) )
After closer look are the sort_array function on apache documents and the resulted data, looks like if the array of struct has multiple columns, then the sorting goes first for col1 , then for col2 and then for col3 and so on ...in the same order asc or desc. So after the structs get sorted by col2 in desc order, then it sort by col1 in desc order as well. Is this correct understanding ?
Yes, the sort order is the same for all the fields inside the struct. Also, it first assesses the first field, then goes to other fields. If you need a more complex sorting, please refer to the linked page regarding sorting. There you will find an example of sorting using comparator function in Spark.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.