1

I have a pyspark Dataframe spark version < 2.4

Example dataframe:

column_1<Array>             |        column_2 <Array>           |  column_3 <Array>   |  join_columns
----------------------------------------------------------------------------------------------------------------------------------------      
["2345", "98576", "09857"]  |    null                           |  ["9857"]          |  ["2345", "98576", "09857", "9857"]
----------------------------------------------------------------------------------------------------------------------------------------
null                        | ["87569", "9876"]                 |  ["76586"]          |  ["87569", "9876","76586"]
----------------------------------------------------------------------------------------------------------------------------------------
["08798","07564"]           | ["12345","5768","89687","7564"]   |  ["7564"]          |  ["08798","07564","12345","5768","89687", "7564"]
----------------------------------------------------------------------------------------------------------------------------------------
["03456", "09867"]          |         ["87586"]                 |  []                 |  ["03456", "09867","87586"]
------------------------------------------------------------------------------------------------------------------------------------------

I would like to combine the 3 columns column_1, column_2 and column_3 in one "join_columns" and to drop the duplicates values. I used concat, it combined the 3 columns but only when I have only one value in the column, because may be "concat" is working only on Strings

df.withColumn("join_columns", concat(df.s, df.d)).drop_duplicates()

How can I combine the values of array columns ? Thank you

3 Answers 3

1

Before Spark 2.4, you can use a udf:

from pyspark.sql.functions import udf

@udf('array<string>')
def array_union(*arr):
    return list(set([e.lstrip('0').zfill(5) for a in arr if isinstance(a, list) for e in a]))

df.withColumn('join_columns', array_union('column_1','column_2','column_3')).show(truncate=False)

Note: we use e.lstrip('0').zfill(5) so that for each array item, we first remove the leading 0 and then fill 0s to left if the length of string is less than 5.

Sign up to request clarification or add additional context in comments.

5 Comments

do you have a suggestion about this issue stackoverflow.com/questions/59055915/…
@verojoucla, looks fine to me. you can also use a list comprehension: df.selectExpr([ 'if({0} = array(""), null, {0}) AS {0}'.format(c) for c in df.columns]). if the array is actual EMPTY, just change array("") to array().
but the proposed solution in the question link is not working.
very good :) ok I have a new challenge here ;) stackoverflow.com/questions/59104192/…
@verojoucla, added an answer, let me know if it' works.
1

In Spark 2.4 you can combine these 3 columns and then use the flatten function:

df.withColumn("join_columns", flatten(array("column1", "column2", "column2")))

In earlier spark versions you can make a UDF to do this flatten:

from pyspark.sql.functions import udf

flatten = udf(lambda arr: str(arr).replace("[", "").replace("]", "").split(","), ArrayType())
df.withColumn("join_columns", flatten(array("column1", "column2", "column2")))

2 Comments

I got this error in the udf function: TypeError: __init__() takes at least 2 arguments (1 given)
I just copy / paste your code, and I replaced the columns and df names.
1

Can you try using solution below, spark 2.4

import pyspark.sql.functions as F

df = df.withColumn('col12', F.array_union(df.column_1, df.column_2))
df = df.withColumn('join_columns_dup', F.array_union(df.col12, df.column_3))
df = df.withColumn('join_columns', F.array_distinct(df.join_columns_dup))

With Spark < 2.4, you can use

def array_concat(c1, c2, c3):
    return list(set((list() if c1 is None else c1) + (list() if c2 is None else c2) + (list() if c3 is None else c3)))

arrayConcatUdf = F.udf(array_concat, Types.ArrayType(Types.StringType()))
df = df.withColumn('join_columns', arrayConcatUdf(df.c1, df.c2, df.c3))

Crud but works fine with null value as well

3 Comments

Thanks for your answer, but I'm not using spark 2.4, I already mentioned in my question above.
I remarked when one of the columns is "null" it do not the join, but when the column is empty [] it do the join. How can test on the value of the column if is "null" ?
F.col('colName').isNull()

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.