0

I am trying to remove duplicates from my Dataset in Spark SQL in Java. My dataset has three columns. Let's say the name of the column are name, timestamp, and score. The name is the String representation of employee name and timestamp is in long (epoch representation) of the activity that an employee does. The score is the integer filed representing the score of the employee.

Now, Let's say I have the following dataset:

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
John --> 1595242800000   -->  10
Bob  --> 1595246400000   -->  20
John --> 1595239200000   -->  10  

Note that in the above dataset the first and fourth row are the same.

When I use distinct() function over the above dataset by doing something like this

myDataset.distinct()

I get the result as

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
John --> 1595242800000   -->  10
Bob  --> 1595246400000   -->  20

The fourth row in this case is eliminated which is the expected behaviour.

What I wanted is convert the timestamp field into yyyy-MM-dd format and then remove duplicates with the combination of Name field. So from the original dataset the first, second and fourth row have the same value of date that is 2020-07-20 for Name = John. I would only want to have one row for the name = 'John'.

So from the above dataset after removing the duplicate row as explained above the resulting dataset would become

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
Bob  --> 1595246400000   -->  20

Note that I do not have any constraint to keep the first timestamp only for the same name. Any of the timestamp would work for me as long as they all are belonging to the same date.

What I have tried so far is

Dataset<Row> duplicateRemovedDataset = myDataset
                .dropDuplicates("Name", String.valueOf(functions.from_unixtime
                        (functions.col("timestamp").divide(1000), "yyyy-MM-dd")));

But this is producing me this error

User class threw exception: org.apache.spark.sql.AnalysisException: 
Cannot resolve column name "from_unixtime((timestamp / 1000), yyyy-MM-dd)" among list of my column name

How should I go about doing this?

Or in more general term how to call a custom function while calling dropDuplicates on a dataset?

2 Answers 2

1

You can create a new column with date format you need and drop duplicates on columns you want as below

For Java

import static org.apache.spark.sql.functions.*;
Dataset<Row> resultDF = df.withColumn("date", to_date(to_timestamp(df.col("Timestamp").divide(1000)), "yyyy-MM-dd"));

resultDF.dropDuplicates("Name", "date")
        .drop("date")
        .show(false);

For Scala

import org.apache.spark.sql.functions._
val resultDF = df.withColumn("date", to_date(to_timestamp(col("Timestamp") / 1000), "yyyy-MM-dd"))

resultDF.dropDuplicates("Name", "date")
  .drop("date")
  .show(false)

Output:

+----+-------------+-----+
|Name|Timestamp    |score|
+----+-------------+-----+
|Bob |1595246400000|20   |
|John|1595239200000|10   |
+----+-------------+-----+
Sign up to request clarification or add additional context in comments.

4 Comments

I am not able to find this function to_timestamp. Did you mean from_unixtime here? Because I already have a timestamp value.
what version of spark are you using, it's only available since 2.2.0
I was using 2.1.0. Have upgraded to 2.2.0 and got to_timestamp function. But I do not really need to call to_timestamp because the column timestamp in my example is there in the epoch format. So I would rather call from_unixtime on my timestamp column. And that would make my thing. Thanks for the idea for adding a column first in the dataset and then do dropDuplicates and then drop the added column.
Ok perfect, from_timestamp should work for your case
1

Try this:


val myDataset = Seq(("John","1595239200000",10),           
              ("John", "1595242800000" ,10),
             ("Bob", "1595246400000" ,20),
             ("John", "1595239200000" ,10)
            )
.toDF("Name", "timestamp","score")
myDataset.show()

+----+-------------+-----+
|Name|    timestamp|score|
+----+-------------+-----+
|John|1595239200000|   10|
|John|1595242800000|   10|
| Bob|1595246400000|   20|
|John|1595239200000|   10|
+----+-------------+-----+

import org.apache.spark.sql.functions.{col, to_date, to_timestamp}

myDataset.withColumn("datestamp",to_date(from_unixtime($"timestamp" / 1000))).dropDuplicates("name","datestamp").show()

+----+-------------+-----+----------+
|name|    timestamp|score| datestamp|
+----+-------------+-----+----------+
| Bob|1595246400000|   20|2020-07-20|
|John|1595239200000|   10|2020-07-20|
+----+-------------+-----+----------+

1 Comment

So the quick answer to your problem is that dropDuplicates operates on existing columns by exact name whereas you were passing a date value.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.