1

I am new to Spark/Scala. I have a master data frame which consists of over 100 million records

+--------+
|  ttm_id|
+--------+
|39622109|
|39622178|
|39578322|
+--------+

And a changelist DataFrame which has around 40 million records

+----------+--------+
|__change__|  ttm_id|
+----------+--------+
|    DELETE|18001570|
|    DELETE|   50520|
|    DELETE|  144440|
|    DELETE|   93130|
|    DELETE|   93140|
+----------+--------+

How would I go about comparing these two data frames so that:

If __change__ = DELETE and masterlist.ttm_id = changeset.ttm_id then remove matching ttm_id record from the Masterlist

Thanks!

3 Answers 3

1

I like @MaxU's solution using except. Here's another approach using left_anti join:

master.join( changelist.where($"__change__" === "DELETE"),
  Seq("ttm_id"), "left_anti"
)

Note that for large DataFrames, this approach can be expensive.

Sign up to request clarification or add additional context in comments.

2 Comments

I liked this answer the best. Now I have a new dataframe which has the records that need to be added from the ChangeSet to the MasterSet, where ChangeSet.auditid != MasterSet.auditid. How would I best create a new dataframe with joining the DF created from your code to the InsertDF?
@Phil Baines, sorry I don't think I understand your new question. Given that the requirement seems rather different from the original question's, I suggest that you post it as a separate question with sample input and expected output.
0

IIUC you can do it using the following query:

select * from masterlist
where not exists (select 1 from changeset
                  where masterlist.ttm_id = changeset.ttm_id
                    and masterlist.__change__='DELETE');

Demo:

scala> m.show
+--------+
|  ttm_id|
+--------+
|39622109|
|39622178|
|39578322|
+--------+


scala> c.show
+----------+--------+
|__change__|  ttm_id|
+----------+--------+
|    DELETE|39622109|
|    DELETE|   50520|
+----------+--------+


scala> val q="""
     | select * from masterlist
     | where not exists (select ttm_id from changeset
     |                   where masterlist.ttm_id = changeset.ttm_id
     |                     and changeset.__change__='DELETE')
     | """
q: String =
"
select * from masterlist
where not exists (select ttm_id from changeset
                  where masterlist.ttm_id = changeset.ttm_id
                    and changeset.__change__='DELETE')
"

scala> val res = spark.sql(q)
res: org.apache.spark.sql.DataFrame = [ttm_id: int]

scala> res.show
+--------+
|  ttm_id|
+--------+
|39622178|
|39578322|
+--------+

Another solution:

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> m.withColumn("__change__", lit("DELETE")).except(c.select("ttm_id","__change__")).select("ttm_id").show
+--------+
|  ttm_id|
+--------+
|39578322|
|39622178|
+--------+

2 Comments

Yes, that is the correct format for SQL however looking to replicate that using Scala :)
Except will not work as the two tables have different column counts (change set has extra column to indicate whether to insert or delete for Master) But I am happy for the exposure to a different way to do it in the future
0

Broadcasting a smaller dataframe should help reduce the shuffle needed for joining the dataframes.

You can use join, filter and drop after broadcasting the changedset dataframe to get your desired result

val broadcastedMasterList = sc.broadcast(changeset)
masterlist.join(broadcastedMasterList.value, Seq("ttm_id"), "left")
  .filter($"__change__".isNull || $"__change__" =!= "DELETE")
  .drop("__change__")
  .show(false)

I hope the answer is helpful.

2 Comments

Doesn't Spark perform a BroadcashHashJoin by itself if the table is small enough?
@philantrovert, default is shuffleHashJoin for spark :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.