0

i have a job that run on spark and is written in scala im using spark RDD. because of the expensive group by operations i get this error: Container killed by YARN for exceeding memory limits. 22.4 GB of 22 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. i increased memory over head but i get the same. i use 10 machines of r4.xlarge. i tried using r4.2xlarge and even r4.4xlarge but also same error. the data is im testing on is 5GB gzipped data( almost 50 unzipped of data and almost 6 million record).

some configurations:

spark.executor.memory: 20480M spark.driver.memory: 21295M spark.yarn.executor.memoryOverhead: 2g spark.executor.instances: 10

And the code looks like this:

val groupedEntitiesRDD = datasetRDD 
  .groupBy(_.entityId) 
  .map({ case (key, valueIterator) => key -> valueIterator.toList }) 
  .persist(StorageLevel.MEMORY_AND_DISK) 

val deduplicatedRDD = groupedEntitiesRDD 
  .flatMap({ case (_, entities) => deduplication(entities) }) 

def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = { 
  entities 
    .groupBy(_.deduplicationKey) 
    .values 
    .map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond)) 
    .toList 
}
4
  • What version of Spark are you using? Commented Aug 3, 2017 at 9:18
  • spark version: 2.1.1 Commented Aug 3, 2017 at 9:20
  • And how have you configured your memory settings? How much memory on each executor, how much overhead and how many executors? Commented Aug 3, 2017 at 9:24
  • @GlennieHellesSindholt i added the info to question Commented Aug 3, 2017 at 9:28

1 Answer 1

2

From my experience and from what I have read in the release notes of Spark 2.x, one needs to allocate a lot more off heap memory (spark.yarn.executor.memoryOverhead) than in Spark 1.x.

You have only assigned 2G to memoryOverhead and 20GB memory. I believe you would get better results if you change that to say 8G memoryOverhead and 14GB executor memory.

Should you still still run into memory issues (like actual OOMs being thrown), you will need to look into data skews. Especially groupBy operations will frequently cause serious data skews.

One final thing, you write that you use RDDs - I hope you mean DataFrames or DataSets? RDDs has very low performance with groupBy (see for instance this blog post for reason why) so if you are on RDDs you should use reduceByKey instead. BUT essentially you should use DataFrames (or DataSets) instead, where groupBy is indeed the right way to go.

EDIT!

You asked in a comment how to convert groupBy to reduceByKey. You can do that like this:

datasetRDD
  .map{case(entityID, streamObject) => (entityID, List(streamObject))}
  .reduceByKey(_++_)
  .flatMap{case(_, entities) => deduplication(entities)

You haven't specified the data structure of these entities, but it looks like you are looking for some max value and in effect throwing away unwanted data. That should be build into the reduceByKey-operation, such that you filter away unnecessary data while reducing.

Sign up to request clarification or add additional context in comments.

9 Comments

Thanks. i will try to change the configuration for executor memory and memory overhead and test again. regarding choosing RDD this article: community.hortonworks.com/articles/42027/… says that its the fastest for many aggregations operations(including group by)
Yes, but if you read the code for the "groupBy" operation on the RDD, you will notice that they use reduceByKey ;-)
And I mean read the code from the article you refer to - they use reduceByKey and not groupBy.
ok thanks. do you have an example of using group by with data frames. or how to use reduce instead of data frames. i have a deduplication application and in this case reduce can not be used instead of group by
reduceByKey is a function on RDDs and it can ALWAYS be used instead of the groupBy-function. If you post your code, I can help you transform it to use reduceByKey instead of groupBy
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.