6

I have a file A and B which are exactly the same. I am trying to perform inner and outer joins on these two dataframes. Since I have all the columns as duplicate columns, the existing answers were of no help. The other questions that I have gone through contain a col or two as duplicate, my issue is that the whole files are duplicates of each other: both in data and in column names.

My code:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrameReader, DataFrameWriter
from datetime import datetime

import time

# @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("All imports were successful.")

df = spark.read.orc(
    's3://****'
)
print("First dataframe read with headers set to True")
df2 = spark.read.orc(
    's3://****'
)
print("Second dataframe read with headers set to True")

# df3 = df.join(df2, ['c_0'], "outer")

# df3 = df.join(
#     df2,
#     df["column_test_1"] == df2["column_1"],
#     "outer"
# )

df3 = df.alias('l').join(df2.alias('r'), on='c_0') #.collect()

print("Dataframes have been joined successfully.")
output_file_path = 's3://****'
)

df3.write.orc(
    output_file_path
)
print("Dataframe has been written to csv.")
job.commit()

The error that I am facing is:

pyspark.sql.utils.AnalysisException: u'Duplicate column(s): "c_4", "c_38", "c_13", "c_27", "c_50", "c_16", "c_23", "c_24", "c_1", "c_35", "c_30", "c_56", "c_34", "c_7", "c_46", "c_49", "c_57", "c_45", "c_31", "c_53", "c_19", "c_25", "c_10", "c_8", "c_14", "c_42", "c_20", "c_47", "c_36", "c_29", "c_15", "c_43", "c_32", "c_5", "c_37", "c_18", "c_54", "c_3", "__created_at__", "c_51", "c_48", "c_9", "c_21", "c_26", "c_44", "c_55", "c_2", "c_17", "c_40", "c_28", "c_33", "c_41", "c_22", "c_11", "c_12", "c_52", "c_6", "c_39" found, cannot save to file.;'
End of LogType:stdout
5
  • And how can I explicitly select the columns? Do you mean to say df.select('c_0' as 'df_c_0', 'c_1' as 'df_c_1', .... 'c_49' as 'df_c_49').join(df2.select('c_0' as 'df2_c_0', 'c_1' as 'df2_c_1', .... 'c_49' as 'df2_c_49')) ? Commented Mar 11, 2019 at 14:35
  • 1
    Possible duplicate of Spark Dataframe distinguish columns with duplicated name Commented Mar 11, 2019 at 14:44
  • the answer is the same. you need to alias the column names. Commented Mar 11, 2019 at 14:49
  • No, none of the answers could solve my problem. Yes, it is because of my weakness that I could not extrapolate the aliasing further but asking this question helped me to get to know about selectExpr function. Hence, I request you to take back the duplicate remark, it will be helpful for a rookie like me. Commented Mar 11, 2019 at 14:51
  • 1
    My vote to close as a duplicate is just a vote. I still need 4 others (or one gold badge holder) to agree with me, and regardless of the outcome this doesn't count against you. The solution you are looking for is contained in this answer. At the bottom, they show how to dynamically rename all the columns. selectExpr is not needed (though it's one alternative). If you still feel that this is different, edit your question and explain exactly how it's different. Commented Mar 11, 2019 at 14:55

3 Answers 3

11

There is no shortcut here. Pyspark expects the left and right dataframes to have distinct sets of field names (with the exception of the join key).

One solution would be to prefix each field name with either a "left_" or "right_" as follows:

# Obtain columns lists
left_cols = df.columns
right_cols = df2.columns

# Prefix each dataframe's field with "left_" or "right_"
df = df.selectExpr([col + ' as left_' + col for col in left_cols])
df2 = df2.selectExpr([col + ' as right_' + col for col in right_cols])

# Perform join
df3 = df.alias('l').join(df2.alias('r'), on='c_0')
Sign up to request clarification or add additional context in comments.

1 Comment

Recommend that you don't use col in the for loop as it overrides the native PySpark funtion col and then it won't be recognised.
5

Here is a helper function to join two dataframes adding aliases:

def join_with_aliases(left, right, on, how, right_prefix):
    renamed_right = right.selectExpr(
        [
            col + f" as {col}_{right_prefix}"
            for col in df2.columns
            if col not in on
        ]
        + on
    )
    right_on = [f"{x}{right_prefix}" for x in on]
    return left.join(renamed_right, on=on, how=how)

and here an example in how to use it:

df1 = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"]], ("id", "value"))
df2 = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"]], ("id", "value"))

join_with_aliases(
   left=df1,
   right=df2,
   on=["id"],
   how="inner",
   right_prefix="_right"
).show()

+---+-----+------------+
| id|value|value_right|
+---+-----+------------+
|  1|    a|           a|
|  3|    c|           c|
|  2|    b|           b|
+---+-----+------------+

2 Comments

Thanks for function. df2.columns is right.column in the definition of the function
This is a great approach. It makes join in Spark behave like joins in Python Pandas and R. Its crazy Spark does not give unique column names by default, I think.
1

I did something like this but in scala, you can convert the same into pyspark as well...

  • Rename the column names in each dataframe

    dataFrame1.columns.foreach(columnName => {
      dataFrame1 = dataFrame1.select(dataFrame1.columns.head, dataFrame1.columns.tail: _*).withColumnRenamed(columnName, s"left_$columnName")
    })
    
    dataFrame1.columns.foreach(columnName => {
      dataFrame2 = dataFrame2.select(dataFrame2.columns.head, dataFrame2.columns.tail: _*).withColumnRenamed(columnName, s"right_$columnName")
    })
    
  • Now join by mentioning the column names

    resultDF = dataframe1.join(dataframe2, dataframe1("left_c_0") === dataframe2("right_c_0"))
    

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.