50

I have a dataframe which consists lists in columns similar to the following. The length of the lists in all columns is not same.

Name  Age  Subjects                  Grades
[Bob] [16] [Maths,Physics,Chemistry] [A,B,C]

I want to explode the dataframe in such a way that i get the following output-

Name Age Subjects Grades
Bob  16   Maths     A
Bob  16  Physics    B
Bob  16  Chemistry  C

How can I achieve this?

2
  • 2
    You want to match the index in a given array with other arrays in the row? Like how Maths -> A, Physics -> B, and Chemistry -> C. So something like Maths -> B would be wrong. Commented Jun 28, 2018 at 13:12
  • Yes @Tanjin that would be wrong Commented Jun 28, 2018 at 13:30

8 Answers 8

77

PySpark has added an arrays_zip function in 2.4, which eliminates the need for a Python UDF to zip the arrays.

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])
df = df.withColumn("new", F.arrays_zip("Subjects", "Grades"))\
       .withColumn("new", F.explode("new"))\
       .select("Name", "Age", F.col("new.Subjects").alias("Subjects"), F.col("new.Grades").alias("Grades"))
df.show()

+-----+----+---------+------+
| Name| Age| Subjects|Grades|
+-----+----+---------+------+
|[Bob]|[16]|    Maths|     A|
|[Bob]|[16]|  Physics|     B|
|[Bob]|[16]|Chemistry|     C|
+-----+----+---------+------+
Sign up to request clarification or add additional context in comments.

Comments

19

This works,

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])
df.show()

+-----+----+--------------------+---------+
| Name| Age|            Subjects|   Grades|
+-----+----+--------------------+---------+
|[Bob]|[16]|[Maths, Physics, ...|[A, B, C]|
+-----+----+--------------------+---------+

Use udf with zip. Those columns needed to explode have to be merged before exploding.

combine = F.udf(lambda x, y: list(zip(x, y)),
              ArrayType(StructType([StructField("subs", StringType()),
                                    StructField("grades", StringType())])))

df = df.withColumn("new", combine("Subjects", "Grades"))\
       .withColumn("new", F.explode("new"))\
       .select("Name", "Age", F.col("new.subs").alias("Subjects"), F.col("new.grades").alias("Grades"))
df.show()


+-----+----+---------+------+
| Name| Age| Subjects|Grades|
+-----+----+---------+------+
|[Bob]|[16]|    Maths|     A|
|[Bob]|[16]|  Physics|     B|
|[Bob]|[16]|Chemistry|     C|
+-----+----+---------+------+

2 Comments

what should I do if I need to put A B and C in different columns rather than rows
UDFs are not the efficient and performant. They should be avoided if a pyspark API solution exists.
15

Arriving late to the party :-)

The simplest way to go is by using inline that doesn't have python API but is supported by selectExpr.

df.selectExpr('Name[0] as Name','Age[0] as Age','inline(arrays_zip(Subjects,Grades))').show()

+----+---+---------+------+
|Name|Age| Subjects|Grades|
+----+---+---------+------+
| Bob| 16|    Maths|     A|
| Bob| 16|  Physics|     B|
| Bob| 16|Chemistry|     C|
+----+---+---------+------+

Comments

1

Have you tried this

df.select(explode(split(col("Subjects"))).alias("Subjects")).show()

you can convert the data frame to an RDD.

For an RDD you can use a flatMap function to separate the Subjects.

1 Comment

I've tried using a flat map as df.rdd.flatMap(lambda x: zip(*[x[c] for c in dcols])).toDF(dcols) but it is only giving me the first row and ignoring the remaining rows- |16 |A |Bob |Maths |
1

I've used the very elegant solution from @Nasty but if you have a lot of columns to explode, the scheduler on server side might run into issues if you generate lots of new dataframes with "withColumn()". So I slightly adapted the code to run more efficient and is more convenient to use:

def explode_all(df: DataFrame, index=True, cols: list = []):
"""Explode multiple array type columns.
loop through explodable signals [array type columns] and explode multiple columns.
First, colums need to be zipped into the df:
[a, [1, 2, 3]], [b, [10, 20, 30]] -> [[[a, 1], [b, 10]], [[a, 2], [b, 20]], [[a, 3], [b, 30]]]
In a for loop the zipped df is passed into separate columns#

df -- Required: Input dataframe

index -- Optional: if an index is required (default: True)

cols -- Optional: list of column names.
"""

if not isinstance(df, DataFrame):           # Error handling
    raise TypeError("Input data must be a PySpark DataFrame")

if cols:
    expl_cols = [col for col, col_type in df.dtypes if (col_type[:5] == 'array' and col in cols)]
else:
    expl_cols = [col for col, col_type in df.dtypes if col_type[:5] == 'array']

if len(expl_cols) == 0:                     # Error handling
    raise ValueError("Input data does not contain any array columns")

if index:
    df = df.select("*", F.posexplode(F.arrays_zip(*expl_cols)).alias("signal_index", "exp_combo")).drop(*expl_cols)
else:
    df = df.select("*", F.explode(F.arrays_zip(*expl_cols)).alias("exp_combo")).drop(*expl_cols)

df = df.select("*", *[F.column('exp_combo.' + name).alias(name) for name in expl_cols]).drop(F.col('exp_combo'))

return df

Comments

0

Copy/paste function if you need to repeat this quickly and easily across a large number of columns in a dataset

cols = ["word", "stem", "pos", "ner"]

def explode_cols(self, data, cols):
    data = data.withColumn('exp_combo', f.arrays_zip(*cols))
    data = data.withColumn('exp_combo', f.explode('exp_combo'))
    for col in cols:
        data = data.withColumn(col, f.col('exp_combo.' + col))

    return data.drop(f.col('exp_combo'))

result = explode_cols(data, cols)

Your welcome :)

2 Comments

arrays_zip does not take list as input
@Quynh-MaiChu you need to use a starred expression as written in the function above f.arrays_zip(*cols).. so that python will process your list as arguments instead of a list object. you can also do ..._zip(f.col('col1'), f.col('col2')) if you find that clearer
0

When Exploding multiple columns, the above solution comes in handy only when the length of array is same, but if they are not. It is better to explode them separately and take distinct values each time.

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])

df = df.withColumn('Subjects',F.explode('Subjects')).select('Name','Age','Subjects', 'Grades').distinct()

df = df.withColumn('Grades',F.explode('Grades')).select('Name','Age','Subjects', 'Grades').distinct()

df.show()

 +----+---+---------+------+
|Name|Age| Subjects|Grades|
+----+---+---------+------+
| Bob| 16|    Maths|     A|
| Bob| 16|  Physics|     B|
| Bob| 16|Chemistry|     C|
+----+---+---------+------+

Comments

0

Thanks @nasty for saving the day. Just small tweaks to get the code working.

def explode_cols( df, cl):
df = df.withColumn('exp_combo', arrays_zip(*cl))
df = df.withColumn('exp_combo', explode('exp_combo'))
for colm in cl:
    final_col = 'exp_combo.'+ colm 
    df = df.withColumn(final_col, col(final_col))
    
    #print col
    #print ('exp_combo.'+ colm)
return df.drop(col('exp_combo'))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.