3

I have a Dataframe, which contains the following data:

df.show()

    +-----+------+--------+
    | id_A| idx_B| B_value|
    +-----+------+--------+
    |    a|     0|       7|
    |    b|     0|       5|
    |    b|     2|       2|
    +-----+------+--------+

Assuming B have total of 3 possible indices, I want to create a table that will merge all indices and values into a list (or numpy array) that looks like this:

final_df.show()

    +-----+----------+
    | id_A|  B_values|
    +-----+----------+
    |    a| [7, 0, 0]|
    |    b| [5, 0, 2]|
    +-----+----------+

I've managed to go up to this point:

from pyspark.sql import functions as f

temp_df = df.withColumn('B_tuple', f.struct(df['idx_B'], df['B_value']))\
            .groupBy('id_A').agg(f.collect_list('B_tuple').alias('B_tuples'))
temp_df.show()

    +-----+-----------------+
    | id_A|         B_tuples|
    +-----+-----------------+
    |    a|         [[0, 7]]|
    |    b| [[0, 5], [2, 2]]|
    +-----+-----------------+

But now I can't run a proper udf function to turn temp_df into final_df.

Is there a simpler way to do so?

If not, what is the proper function I should use to finish the transformation?

2 Answers 2

3

So I've found a solution,

def create_vector(tuples_list, size):
    my_list = [0] * size
    for x in tuples_list:
        my_list[x["idx_B"]] = x["B_value"]
    return my_list

create_vector_udf = f.udf(create_vector, ArrayType(IntegerType()))

final_df = temp_df.with_column('B_values', create_vector_udf(temp_df['B_tuples'])).select(['id_A', 'B_values'])

final_df.show()

    +-----+----------+
    | id_A|  B_values|
    +-----+----------+
    |    a| [7, 0, 0]|
    |    b| [5, 0, 2]|
    +-----+----------+
Sign up to request clarification or add additional context in comments.

1 Comment

Nice solution. I don't have much experience with UDF's yet so this is a good example for me when they can be useful :)
1

If you already know the size of the array, you can do this without a udf.

Take advantage of the optional second argument to pivot(): values. This takes in a

List of values that will be translated to columns in the output DataFrame

So groupBy the id_A column, and pivot the DataFrame on the idx_B column. Since not all indices may be present, you can pass in range(size) as the values argument.

import pyspark.sql.functions as f
size = 3
df = df.groupBy("id_A").pivot("idx_B", values=range(size)).agg(f.first("B_value"))
df = df.na.fill(0)
df.show()
#+----+---+---+---+
#|id_A|  0|  1|  2|
#+----+---+---+---+
#|   b|  5|  0|  2|
#|   a|  7|  0|  0|
#+----+---+---+---+

The indices that are not present in the data will default to null, so we call na.fill(0) as this is the default value.

Once you have your data in this format, you just need to create an array from the columns:

df.select("id_A", f.array([f.col(str(i)) for i in range(size)]).alias("B_values")).show()
#+----+---------+
#|id_A| B_values|
#+----+---------+
#|   b|[5, 0, 2]|
#|   a|[7, 0, 0]|
#+----+---------+

1 Comment

Maybe, but you do get a big performance boost by specifying the values for the pivot. I'm not sure how that compares with serialization to python for the udf. This may still turn out to be better.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.