0

I am daily importing 10 Million records from Mysql to Hive using Spark scala program and comparing datasets yesterdays and todays datasets.

val yesterdayDf=sqlContext.sql("select * from t_yesterdayProducts");
val todayDf=sqlContext.sql("select * from t_todayProducts");
val diffDf=todayDf.except(yesterdayDf);

I am using 3 node cluster and program working fine for 4 million records. For more than 4 million we are facing out of memory issue as RAM memory is not sufficient.

I would like to know best way to compare two large datasets.

3
  • The dataframe has any kind of Unique key? Commented Aug 9, 2016 at 19:42
  • Yes Thiago. Table has one unique key. Commented Aug 10, 2016 at 2:14
  • Actually, SparkSQL api call of except is an implicit call for substract of spark api. If you have the key , can you try todayDf.subtractByKey(yesterdayDf); Commented Aug 10, 2016 at 15:38

4 Answers 4

2

Have you tried findout out how many partitions do you have: yesterdayDf.rdd.partitions.size will give you that information for yesterdayDf dataframe and you can do the same for other dataframes too.

You can also use yesterdayDf.repartition(1000) // (a large number) to see if the OOM problem goes away.

Sign up to request clarification or add additional context in comments.

6 Comments

Thanks for your quick response.Tried this way val yesterdayDf=sqlContext.sql("select * from t_yesterdayProducts"); yesterdayPartDf=yesterdayDf.repartition(1000) val todayDf=sqlContext.sql("select * from t_todayProducts"); todayPartDf=todayDf.repartition(1000) val diffDf=todayPartDf.except(yesterdayPartDf); but after 993 tasks completed of first df throwing OOM problem
@ArvindKumarAnugula try 1200 or 1500 instead of 1000. Also specify --executor-memory 32G or higher
Exact memory issue is GC memory overhead exceeded.
Each node RAM size is 16GB only. can I use --executor-memory 16G ?
you can see config here what I am passing: --master yarn-client --executor-memory 16G --num-executors 12 --executor-cores 4 --driver-memory 8G
|
0

The reason for this issue is hard to say. But the issue could be that for some reason the workers are taking too many data. Try to clear the data frames to do the except. According to my question in comments, you said that you have key columns so take only they like this:

val yesterdayDfKey = yesterdayDf.select("key-column")
val todayDfKey = todayDf.select("key-column")
val diffDf=todayDfKey.except(yesterdayDfKey);

With that you will take an data frame with the keys. Than you can make a filter with that using join like this post.

Comments

0

you also need to make sure your yarn.nodemanager.resource.memory-mb is larger than your --executor-memory.

1 Comment

This kind of suggestion should be kept in the comments
0

you can also try joining two df on keys with left_anti join and then check count of number of records

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.