1

I'm trying to dedupe a spark dataframe leaving only the latest appearance. The duplication is in three variables:

NAME
ID
DOB

I succeeded in Pandas with the following:

df_dedupe = df.drop_duplicates(subset=['NAME','ID','DOB'], keep='last', inplace=False)

But in spark I tried the following:

df_dedupe = df.dropDuplicates(['NAME', 'ID', 'DOB'], keep='last')

I get this error:

TypeError: dropDuplicates() got an unexpected keyword argument 'keep'

Any ideas?

2
  • quynhcodes.wordpress.com/2016/07/29/… Commented Nov 13, 2018 at 16:07
  • Base on the [document][1] dropDuplicates do not have para keep in pyspark Commented Nov 13, 2018 at 16:09

2 Answers 2

1

Thanks for your help. I followed your directives but the outcome was not as expected:

d1 = [('Bob', '10', '1542189668', '0', '0'),  ('Alice', '10', '1425298030', '154', '39'), ('Bob', '10', '1542189668', '178', '42')]
df1 = spark.createDataFrame(d1, ['NAME', 'ID', 'DOB' , 'Height' , 'ShoeSize'])
df_dedupe = df1.dropDuplicates(['NAME', 'ID', 'DOB'])
df_reverse = df1.sort((["NAME", "ID", "DOB"]), ascending= False)
df_dedupe.join(df_reverse,['NAME','ID','DOB'],'inner')
df_dedupe.show(100, False)

The outcome was:

+-----+---+----------+------+--------+    
|NAME |ID |DOB       |Height|ShoeSize|
+-----+---+----------+------+--------+
|Bob  |10 |1542189668|0     |0       |
|Alice|10 |1425298030|154   |39      |
+-----+---+----------+------+--------+

Showing the "Bob" with corrupted data.

Finally, I changed my approach and converted the DF to Pandas and then back to spark:

p_schema = StructType([StructField('NAME',StringType(),True),StructField('ID',StringType(),True),StructField('DOB',StringType(),True),StructField('Height',StringType(),True),StructField('ShoeSize',StringType(),True)])
d1 = [('Bob', '10', '1542189668', '0', '0'),  ('Alice', '10', '1425298030', '154', '39'), ('Bob', '10', '1542189668', '178', '42')]
df = spark.createDataFrame(d1, p_schema)
pdf = df.toPandas()
df_dedupe = pdf.drop_duplicates(subset=['NAME','ID','DOB'], keep='last', inplace=False)

df_spark = spark.createDataFrame(df_dedupe, p_schema)
df_spark.show(100, False)

This finally brought the correct "Bob":

+-----+---+----------+------+--------+
|NAME |ID |DOB       |Height|ShoeSize|
+-----+---+----------+------+--------+
|Alice|10 |1425298030|154   |39      |
|Bob  |10 |1542189668|178   |42      |
+-----+---+----------+------+--------+

Of course, I'd still like to have a purely Spark solution but the lack of indexing seems to be problematic with Spark.

Thanks!

Sign up to request clarification or add additional context in comments.

1 Comment

I have to thank you a lot for your code, it helped me so much ! I did a function to do the AddOrUpdate the exact way you're doing it, and it works ! If anybody want me to share it, please don't hesitate to ask
0

As you can see in http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html the documentation of the function dropDuplicates(subset=None), it only allows a subset as parameter. Why would you like to keep the last, if they're all equal?

EDIT

As @W-B pointed, u want the other columns. My solution will be to sort the original dataframe in reverse order, and use the df_dedupe on the three repeated columns to make an inner join and only preserve the last values.

df_dedupe.join(original_df,['NAME','ID','DOB'],'inner')

1 Comment

Cause there still other column , he need the last value for them

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.