3

Please I'm new to Spark (Stackoverflow as well). For the following RDD and DataFrame (same data) I want to get the most viewed tags of playlists with over N videos. My issue is that tags are in an array, in addition I don't know where to start as it seems advanced.

RDD

(id,playlist,tags,videos,views)
(1,playlist_1,[t1, t2, t3],9,200)
(2,playlist_2,[t4, t5, t7],64,793)
(3,playlist_3,[t4, t6, t3],51,114)
(4,playlist_4,[t1, t6, t2],8,115)
(5,playlist_5,[t1, t6, t2],51,256)
(2,playlist_6,[t4, t5, t2],66,553)
(3|playlist_7,[t4, t6, t2],77,462)

DataFrame

+---+------------+--------------+--------+-------+
| id| playlist   | tags         | videos | views |
+---+------------+--------------+--------+-------+
| 1 | playlist_1 | [t1, t2, t3] | 9      |  200  |
| 2 | playlist_2 | [t4, t5, t7] | 64     |  793  |
| 3 | playlist_3 | [t4, t6, t3] | 51     |  114  |
| 4 | playlist_4 | [t1, t6, t2] | 8      |  115  |
| 5 | playlist_5 | [t1, t6, t2] | 51     |  256  |
| 2 | playlist_6 | [t4, t5, t2] | 66     |  553  |
| 3 | playlist_7 | [t4, t6, t2] | 77     |  462  |
+---+-------------+-------------+--------+-------+

Expected Result

Tags for playlists with more than (N = 65) videos

+-----+-------+
| tag | views |
+-----+-------+
| t2  | 1015  |
| t4  | 1015  |
| t5  | 553   |
| t6  | 462   |
+-----+-------+
3
  • Welcome to StackOverflow! For the given example, can you edit the post to include the expected result? Your description (most viewed tags of playlists with over N videos) is somewhat vague, an example would help resolve any ambiguity. Commented Oct 19, 2017 at 15:29
  • Added table based on OP's description of desired results, "most viewed tags [for] (playlists with over N videos)" Commented Oct 19, 2017 at 18:15
  • Exactly! Please can you help on that @TzachZohar Commented Oct 19, 2017 at 18:47

1 Answer 1

2

Here's a solution using DataFrames:

import org.apache.spark.sql.functions._
import spark.implicits._

val N = 65

val result = df.where($"videos" > N)           // filter playlists with enough views
  .select(explode($"tags") as "tag", $"views") // explode tags into separate records
  .groupBy("tag")                              // group by tag
  .sum("views")                                // sum views per tag

result.show(false)
// +---+----------+
// |tag|sum(views)|
// +---+----------+
// |t5 |553       |
// |t4 |1015      |
// |t2 |1015      |
// |t6 |462       |
// +---+----------+

And with RDDs:

// given 
val rdd: RDD[(Int, String, Array[String], Int, Int)] = ???

val N = 65

val result: RDD[(String, Int)] = rdd
  .filter(_._4 > N)
  .flatMap { case (_, _, tags, _, views) => tags.map(tag => (tag, views)) }
  .reduceByKey(_ + _)
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks a lot @Tzach, is there a way to do it with RDD since I want to know difference ...
added solution with RDDs, but I recommend reading through the documentation for the RDD API which should give you everything you need to figure out this type of transformation
Ok I will give it a shot. BTW for dataframes I'm getting duplicated tags (e.g. t2 with X sum of views and t2 with Y sum of views, instead I want t2 | X+Y)
Oh I solved, I just had to remove spaces after the split ;)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.