106

Let's say I have a rather large dataset in the following form:

data = sc.parallelize([('Foo', 41, 'US', 3),
                       ('Foo', 39, 'UK', 1),
                       ('Bar', 57, 'CA', 2),
                       ('Bar', 72, 'CA', 2),
                       ('Baz', 22, 'US', 6),
                       ('Baz', 36, 'US', 6)])

I would like to remove duplicate rows based on the values of the first, third and fourth columns only.

Removing entirely duplicate rows is straightforward:

data = data.distinct()

and either row 5 or row 6 will be removed.

But how do I only remove duplicate rows based on columns 1, 3 and 4 only? I.e. remove either one one of these:

('Baz', 22, 'US', 6)
('Baz', 36, 'US', 6)

In Python, this could be done by specifying columns with .drop_duplicates(). How can I achieve the same in Spark/PySpark?

1
  • What is the sample code in? Scala? Python? Commented Nov 9, 2023 at 20:34

8 Answers 8

149

PySpark does include a dropDuplicates() method, which was introduced in 1.4.

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+
Sign up to request clarification or add additional context in comments.

4 Comments

Is there a way to capture the records that it did drop?
x = usersDf.drop_duplicates(subset=['DETUserId']) - X dataframe will be all the dropped records
@Rodney That is not what the documentation says: "Return a new DataFrame with duplicate rows removed, optionally only considering certain columns." spark.apache.org/docs/2.1.0/api/python/…
The result is non-deterministic, you most probably don't want to use that in production...
28

From your question, it is unclear as-to which columns you want to use to determine duplicates. The general idea behind the solution is to create a key based on the values of the columns that identify duplicates. Then, you can use the reduceByKey or reduce operations to eliminate duplicates.

Here is some code to get you started:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

Now, you have a key-value RDD that is keyed by columns 1,3 and 4. The next step would be either a reduceByKey or groupByKey and filter. This would eliminate duplicates.

r = m.reduceByKey(lambda x,y: (x))

Comments

19

I know you already accepted the other answer, but if you want to do this as a DataFrame, just use groupBy and agg. Assuming you had a DF already created (with columns named "col1", "col2", etc) you could do:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

Note that in this case, I chose the Max of col2, but you could do avg, min, etc.

2 Comments

So far, my experience with DataFrames is that they make everything more elegant and a lot faster.
It should be noted that this answer is written in Scala - for pyspark replace $"col1" with col("col1") etc.
14

Agree with David. To add on, it may not be the case that we want to groupBy all columns other than the column(s) in aggregate function i.e, if we want to remove duplicates purely based on a subset of columns and retain all columns in the original dataframe. So the better way to do this could be using dropDuplicates Dataframe api available in Spark 1.4.0

For reference, see: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame

1 Comment

Do we have corresponding function in SparkR?
10

I used the inbuilt function dropDuplicates(). The Scala code is given below:

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()

Output:

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+

1 Comment

The question specifically asks for pyspark implementation, not scala
5

The below programme will help you drop duplicates on whole, or if you want to drop duplicates based on certain columns, you can even do that:

import org.apache.spark.sql.SparkSession

object DropDuplicates {

    def main(args: Array[String]) {
        val spark =
            SparkSession.builder()
                .appName("DataFrame-DropDuplicates")
                .master("local[4]")
                .getOrCreate()

        import spark.implicits._

        // Create an RDD of tuples with some data
        val custs = Seq(
            (1, "Widget Co", 120000.00, 0.00, "AZ"),
            (2, "Acme Widgets", 410500.00, 500.00, "CA"),
            (3, "Widgetry", 410500.00, 200.00, "CA"),
            (4, "Widgets R Us", 410500.00, 0.0, "CA"),
            (3, "Widgetry", 410500.00, 200.00, "CA"),
            (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
            (6, "Widget Co", 12000.00, 10.00, "AZ")
        )
        val customerRows = spark.sparkContext.parallelize(custs, 4)

        // Convert RDD of tuples to DataFrame by supplying column names
        val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")

        println("*** Here's the whole DataFrame with duplicates")

        customerDF.printSchema()

        customerDF.show()

        // Drop fully identical rows
        val withoutDuplicates = customerDF.dropDuplicates()

        println("*** Now without duplicates")

        withoutDuplicates.show()

        val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))

        println("*** Now without partial duplicates too")

        withoutPartials.show()
    }

}

2 Comments

The comment "// drop fully identical rows" is correct the first time, and incorrect the second time. Perhaps a copy/paste error?
Thanks @JoshuaStafford , removed the bad comment.
0

All the approaches in previous answers are good, and I feel dropduplicates is the best approach.

Below is another way (group by agg, etc..) to drop duplicates without using dropduplicates, but if you note the time/performance, dropduplicates by columns is the champion (time taken: 1563 ms).

Below is the full listing and times

import org.apache.spark.sql.SparkSession

object DropDups {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ReadFromUrl")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._
    spark.sparkContext.setLogLevel("Error")
    val data = sc.parallelize(List(
      ("Foo", 41, "US", 3),
      ("Foo", 39, "UK", 1),
      ("Bar", 57, "CA", 2),
      ("Bar", 72, "CA", 2),
      ("Baz", 22, "US", 6),
      ("Baz", 36, "US", 6)
    )).toDF("x", "y", "z", "count")

    spark.time
    {
      import org.apache.spark.sql.functions.first
      val data = sc.parallelize(List(
        ("Foo", 41, "US", 3),
        ("Foo", 39, "UK", 1),
        ("Bar", 57, "CA", 2),
        ("Bar", 72, "CA", 2),
        ("Baz", 22, "US", 6),
        ("Baz", 36, "US", 6)
      )).toDF("x", "y", "z", "count")

      val deduped = data
        .groupBy("x", "count")
        .agg(
          first("y").as("y"),
          first("z").as("z")
        )
      deduped.show()
    }
    spark.time {
      data.dropDuplicates(Array("x","count")).show()
    }
    spark.stop()
  }
}

Result:

+---+-----+---+---+
|  x|count|  y|  z|
+---+-----+---+---+
|Baz|    6| 22| US|
|Foo|    1| 39| UK|
|Bar|    2| 57| CA|
|Foo|    3| 41| US|
+---+-----+---+---+

Time taken: 7086 ms
+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Bar| 57| CA|    2|
|Foo| 41| US|    3|
+---+---+---+-----+

Time taken: 1563 ms

Comments

-4

This is my Df contain 4 is repeated twice so here will remove repeated values.

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+

scala> val newdf=df.dropDuplicates

scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+

5 Comments

you can check in spark-shell i have shared the correct output.. this ans is s related to how we can remove repeated values in column or df..
Can you provide an example based on OPs question?
I have given example in my answer it self. you can refer that one.
Your post adds no value to this discussion. @vaerek has already posted a PySpark df.dropDuplicates() example including how it can be applied to more than one column (my initial question).
The sentence is incomprehensible. Please add the missing punctuation, missing words, etc. Thanks in advance.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.