10

I have a dataframe like this:

df = spark.createDataFrame([(0, ["B","C","D","E"]),(1,["E","A","C"]),(2, ["F","A","E","B"]),(3,["E","G","A"]),(4,["A","C","E","B","D"])], ["id","items"])

which creates a data frame df like this:

+---+-----------------+
|  0|     [B, C, D, E]|
|  1|        [E, A, C]|
|  2|     [F, A, E, B]|
|  3|        [E, G, A]|
|  4|  [A, C, E, B, D]|
+---+-----------------+ 

I would like to get a result like this:

+---+-----+
|all|count|
+---+-----+
|  F|    1|
|  E|    5|
|  B|    3|
|  D|    2|
|  C|    3|
|  A|    4|
|  G|    1|
+---+-----+

Which essentially just finds all distinct elements in df["items"] and counts their frequency. If my data was of a more manageable size, I would just do this:

all_items = df.select(explode("items").alias("all")) 
result = all_items.groupby(all_items.all).count().distinct() 
result.show()

But because my data has millions of rows and thousands of elements in each list, this is not an option. I was thinking of doing this row by row, so that I only work with 2 lists at a time. Because most elements are frequently repeated in many rows (but the list in each row is a set), this approach should solve my problem. But the problem is, I don't really know how to do this in Spark, as I've only just started learning it. Could anyone help, please?

4
  • 1
    explode or flatMap won't increase amount of data so there should be no problem with this approach (if the example pipeline is correct distinct is obsolete). Performance of individual functions might be, but otherwise it is the way to go, Commented Oct 12, 2018 at 10:33
  • @user6910411 Thank you for your comment, but when I use explode(), the length of my dataframe all_items becomes approximately (length_of array_in_row)x(number_of_rows). Since my arrays hold about 10,000 items and there are 20 mln rows, it creates a dataframe with 10,000 x 20 mln rows and that's too big to work with. Commented Oct 12, 2018 at 10:40
  • 1
    Can you give more details on why you think But because my data has millions of rows and thousands of elements in each list, this is not an option.? Are you getting a specific error when you try to use explode? Explode is the appropriate option here, so if you're getting an error we probably need to look at tuning other areas of your config. Commented Oct 13, 2018 at 12:25
  • @Silvio Thank you for your comment Silvio. I think you are correct. There is no specific error about explode. Just Out of memory: Java heap space error. I've started checking other things that might have gone wrong. I suspect now the cluster for some reason doesn't assign enough executors to my job (despite the configuration). So execute() has nowhere to distribute it to. Commented Oct 14, 2018 at 8:07

2 Answers 2

14

What you need to do is reduce the size of your partitions going into the explode. There are 2 options to do this. First, if your input data is splittable you can decrease the size of spark.sql.files.maxPartitionBytes so Spark reads smaller splits. The other option would be to repartition before the explode.

The default value of maxPartitionBytes is 128MB, so Spark will attempt to read your data in 128MB chunks. If the data is not splittable then it'll read the full file into a single partition in which case you'll need to do a repartition instead.

In your case since you're doing an explode, say it's 100x increase with 128MB per partition going in, you're ending up with 12GB+ per partition coming out!

The other thing you may need to consider is your shuffle partitions since you're doing an aggregation. So again, you may need to increase the partitioning for the aggregation after the explode by setting spark.sql.shuffle.partitions to a higher value than the default 200. You can use the Spark UI to look at your shuffle stage and see how much data each task is reading in and adjust accordingly.

I discuss this and other tuning suggestions in the talk I just gave at Spark Summit Europe.

Sign up to request clarification or add additional context in comments.

4 Comments

Thank you very much for taking time and explaining this to me. It's much clearer now. I've also enjoyed your talk. Very helpful. Thank you for your help.
Thanks! Please give it a try and if it helps, please do accept it as the answer.
please refer link i think it will be helpful hadoopist.wordpress.com/2016/05/16/…
@Silvio your link redirects to youtube.com/c/Databricks now
2

Observation:

explode won't change overall amount of data in your pipeline. The total amount of required space is the same in both wide (array) and long (exploded) format. Moreover the latter one distributes better in Spark, which better suited for long and narrow than short and wide data. So

df.select(explode("items").alias("item")).groupBy("item").count()

is the way to go.

However if you really want to avoid that (for whatever reason) you can use RDD and aggregate.

from collections import Counter

df.rdd.aggregate(
  Counter(), 
  lambda acc, row: acc + Counter(row.items),
  lambda acc1, acc2: acc1 + acc2
)
# Counter({'B': 3, 'C': 3, 'D': 2, 'E': 5, 'A': 4, 'F': 1, 'G': 1}) 

Not that, unlike the DataFrame explode, it stores all data in memory and is eager.

2 Comments

Thank you for your detailed explanation. I appreciate it. The reason why I suspected explode() is the problem, is that if I only read in let's say 5000 rows, it all works like a charm and the code runs just fine. For larger data, I'm getting an error: Out of memory Java heap space. If I only run code before the line where the explode() is, I can read in the whole data set without any problems.
can these be done in SparkSQL?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.