i have a job that run on spark and is written in scala im using spark RDD. because of the expensive group by operations i get this error:
Container killed by YARN for exceeding memory limits. 22.4 GB of 22 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
i increased memory over head but i get the same. i use 10 machines of r4.xlarge. i tried using r4.2xlarge and even r4.4xlarge but also same error. the data is im testing on is 5GB gzipped data( almost 50 unzipped of data and almost 6 million record).
some configurations:
spark.executor.memory: 20480M
spark.driver.memory: 21295M
spark.yarn.executor.memoryOverhead: 2g
spark.executor.instances: 10
And the code looks like this:
val groupedEntitiesRDD = datasetRDD
.groupBy(_.entityId)
.map({ case (key, valueIterator) => key -> valueIterator.toList })
.persist(StorageLevel.MEMORY_AND_DISK)
val deduplicatedRDD = groupedEntitiesRDD
.flatMap({ case (_, entities) => deduplication(entities) })
def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = {
entities
.groupBy(_.deduplicationKey)
.values
.map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond))
.toList
}