6

I have a large dataframe which I created with 800 partitions.

df.rdd.getNumPartitions()
800

When I use dropDuplicates on the dataframe, it changes the partitions to default 200

df = df.dropDuplicates()
df.rdd.getNumPartitions()
200

This behaviour causes problem for me, as it will lead to out of memory.

Do you have any suggestion on fixing this problem? I tried setting spark.sql.shuffle.partition to 800 but it doesn't work. Thanks

1

2 Answers 2

10

This happens because dropDuplicates requires a shuffle. If you want to get a specific number of partitions you should set spark.sql.shuffle.partitions (its default value is 200)

df = sc.parallelize([("a", 1)]).toDF()
df.rdd.getNumPartitions()
## 8

df.dropDuplicates().rdd.getNumPartitions()
## 200

sqlContext.setConf("spark.sql.shuffle.partitions", "800")

df.dropDuplicates().rdd.getNumPartitions()
## 800

An alternative approach (Spark 1.6+) is to repartition first:

df.repartition(801, *df.columns).dropDuplicates().rdd.getNumPartitions()
## 801

It is slightly more flexible but less efficient because doesn't perform local aggregation.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you. I realised my mistake is in missing the last character 's' in spark.sql.shuffle.partition.
0

I found the solution at Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

Use reduceByKey instead of dropDuplicates. reduceByKey also have an option of specifying the number of partitions for the final rdd.

The downside of using reduceByKey in this case is it is slow.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.