2

I have the following PySpark DataFrame:

+------+----------------+
|    id|          data  |
+------+----------------+
|     1|    [10, 11, 12]|
|     2|    [20, 21, 22]|
|     3|    [30, 31, 32]|
+------+----------------+

At the end, I want to have the following DataFrame

+--------+----------------------------------+
|    id  |          data                    |
+--------+----------------------------------+
| [1,2,3]|[[10,20,30],[11,21,31],[12,22,32]]|
+--------+----------------------------------+

I order to do this. First I extract the data arrays as follow:

tmp_array = df_test.select("data").rdd.flatMap(lambda x: x).collect()
a0 = tmp_array[0]
a1 = tmp_array[1]
a2 = tmp_array[2]
samples = zip(a0, a1, a2)
samples1 = sc.parallelize(samples)

In this way, I have in samples1 an RDD with the content

[[10,20,30],[11,21,31],[12,22,32]]
  • Question 1: Is that a good way to do it?

  • Question 2: How to include that RDD back into the dataframe?

4
  • Does your dataframe only have 2 rows? Commented Apr 12, 2018 at 15:47
  • Usually I have more than 2 Commented Apr 12, 2018 at 16:02
  • Ok and so you want to get back just one row? The length of data is a constant right (I don't see how it works otherwise)? Commented Apr 12, 2018 at 16:03
  • The length of data could be different, but for now I will consider that is a constant Commented Apr 12, 2018 at 16:06

2 Answers 2

3

Here is a way to get your desired output without serializing to rdd or using a udf. You will need two constants:

  • The number of rows in your DataFrame (df.count())
  • The length of data (given)

Use pyspark.sql.functions.collect_list() and pyspark.sql.functions.array() in a double list comprehension to pick out the elements of "data" in the order you want using pyspark.sql.Column.getItem():

import pyspark.sql.functions as f
dataLength = 3
numRows = df.count()
df.select(
    f.collect_list("id").alias("id"),
    f.array(
        [
            f.array(
                [f.collect_list("data").getItem(j).getItem(i) 
                 for j in range(numRows)]
            ) 
            for i in range(dataLength)
        ]
    ).alias("data")
)\
.show(truncate=False)
#+---------+------------------------------------------------------------------------------+
#|id       |data                                                                          |
#+---------+------------------------------------------------------------------------------+
#|[1, 2, 3]|[WrappedArray(10, 20, 30), WrappedArray(11, 21, 31), WrappedArray(12, 22, 32)]|
#+---------+------------------------------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

1 Comment

This is awesome!
2

You can simply use a udf function for the zip function but before that you will have to use collect_list function

from pyspark.sql import functions as f
from pyspark.sql import types as t
def zipUdf(array):
    return zip(*array)

zipping = f.udf(zipUdf, t.ArrayType(t.ArrayType(t.IntegerType())))

df.select(
    f.collect_list(df.id).alias('id'), 
    zipping(f.collect_list(df.data)).alias('data')
).show(truncate=False)

which would give you

+---------+------------------------------------------------------------------------------+
|id       |data                                                                          |
+---------+------------------------------------------------------------------------------+
|[1, 2, 3]|[WrappedArray(10, 20, 30), WrappedArray(11, 21, 31), WrappedArray(12, 22, 32)]|
+---------+------------------------------------------------------------------------------+

7 Comments

Usually I have more than 2. Thanks
You can generalize this to more than 2 rows by changing your udf to return zip(*array)
@RameshMaharjan I hope you don't mind, but I edited the answer to just have the final answer, which works for all cases.
I wanted to do it this way as well but I wanted to put your name so I just added in it @pault thanks a lot
@RameshMaharjan I have the following error: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for builtins.iter) Do you know what could be?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.