10

I have a Pandas dataframe. I have tried to join two columns containing string values into a list first and then using zip, I joined each element of the list with '_'. My data set is like below:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

I wanted to join these two columns in a third column like below for each row of my dataframe.

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

I have successfully done so in python using the code below but the dataframe is quite large and it takes a very long time to run it for the whole dataframe. I want to do the same thing in PySpark for efficiency. I have read the data in spark dataframe successfully but I'm having a hard time determining how to replicate Pandas functions with PySpark equivalent functions. How can I get my desired result in PySpark?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

I have converted the two columns to arrays in PySpark by using the below code

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

Now all I need is to zip each element of the arrays in the two columns using '_'. How can I use zip with this? Any help is appreciated.

5
  • 1
    Why are df['column_1'] and df['column_2'] a single string instead of a list of items? What were they originally? Commented Jan 21, 2019 at 2:22
  • That's how the data is, which I am reading in the dataframe Commented Jan 21, 2019 at 2:25
  • @Falconic so abc , def etc in single row or different rows? similarly column 2 single row? Commented Jan 21, 2019 at 2:28
  • @anky_91 this is one row of dataframe for column_1 and column_2. Each row has multiple items in one column. This is the reason I split the string and then casted to a list. Commented Jan 21, 2019 at 2:34
  • Does this answer your question? Pyspark: Split multiple array columns into rows Commented May 26, 2020 at 5:48

4 Answers 4

19

A Spark SQL equivalent of Python's would be pyspark.sql.functions.arrays_zip:

pyspark.sql.functions.arrays_zip(*cols)

Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

So if you already have two arrays:

from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

You can just apply it on the result

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

Now to combine the results you can transform (How to use transform higher-order function?, TypeError: Column is not iterable - How to iterate over ArrayType()?):

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

Note:

Higher order functions transform and arrays_zip has been introduced in Apache Spark 2.4.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks user10465355. This solution worked for me but just a word of caution. It does not handle null values inside the lists very well. I removed null values manually from both columns before joining them together. Secondly, I had to do every step in my original dataframe. Having ultiple dataframes with same column names did not work very well with PySpark. I had to debug that to see what was wrong in my code. Turns out, I needed to use the same dataframe throughout the different operations.
5

For Spark 2.4+, this can be done using only zip_with function to zip a concatenate on the same time:

df.withColumn("column_3", expr("zip_with(column_1, column_2, (x, y) -> concat(x, '_', y))")) 

The higher-order function takes 2 arrays to merge, element-wise, using a lambda function (x, y) -> concat(x, '_', y).

Comments

4

You can also UDF to zip the split array columns,

df = spark.createDataFrame([('abc,def,ghi','1.0,2.0,3.0')], ['col1','col2']) 
+-----------+-----------+
|col1       |col2       |
+-----------+-----------+
|abc,def,ghi|1.0,2.0,3.0|
+-----------+-----------+ ## Hope this is how your dataframe is

from pyspark.sql import functions as F
from pyspark.sql.types import *

def concat_udf(*args):
    return ['_'.join(x) for x in zip(*args)]

udf1 = F.udf(concat_udf,ArrayType(StringType()))
df = df.withColumn('col3',udf1(F.split(df.col1,','),F.split(df.col2,',')))
df.show(1,False)
+-----------+-----------+---------------------------+
|col1       |col2       |col3                       |
+-----------+-----------+---------------------------+
|abc,def,ghi|1.0,2.0,3.0|[abc_1.0, def_2.0, ghi_3.0]|
+-----------+-----------+---------------------------+

2 Comments

Thanks @suresh. This is definitely a cleaner solution. When I apply it to my own dataframe and run collect function, I get below error TypeError: zip argument #1 must support iteration Any ideas why?
The error is because, zip() doesn't get an iterable as input. can you please post your sample input dataframe and it's schema .
2

For Spark 3.1+, they now provide pyspark.sql.functions.zip_with() with Python lambda function, therefore it can be done like this:

import pyspark.sql.functions as F

df = df.withColumn("column_3", F.zip_with("column_1", "column_2", lambda x,y: F.concat_ws("_", x, y)))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.