1

I have this two case class :

case class Doc(posts: Seq[Post], test: String)
case class Post(postId: Int, createdTime: Long)

I create a sample df :

val df = spark.sparkContext.parallelize(Seq(
Doc(Seq(
  Post(1, 1),
  Post(2, 3),
  Post(3, 8),
  Post(4, 15)
), null),
Doc(Seq(
  Post(5, 6),
  Post(6, 9),
  Post(7, 12),
  Post(8, 20)
), "hello") )).toDF()

So what i want is , return online Doc with posts where createTime is between x et y . For example, for x = 2 et y = 9, i want this result with the same schema of the origin df :

+--------------+
|         posts|
+--------------+
|[[2,3], [3,8]]|
|[[5,6], [6,9]]|
+--------------+

So i tried lot of combination of where, but i doesn't work. I tried to use map(_.filter(...)), but the problem i don't want to do toDF().as[Doc]

Any help ? Thank you

1
  • Can you add one of the things that you have tried ? Commented Jul 11, 2017 at 15:31

1 Answer 1

1

There are a couple of ways to do this:

  • By using UDF
  • By using explode and collect
  • By using databricks tools

UDF

UDF is the catch all way. You basically create a custom function to do the work. Unlike converting to dataset it will not construct the entire Doc class but instead would handle only the relevant data:

def f(posts: Seq[Row]): Seq[Post] = {
  posts.map(r => Post(r.getAs[Int](0), r.getAs[Long](1))).filter(p => p.createdTime > 3 && p.createdTime < 9))
}
val u = udf(f _)
val filtered = df.withColumn("posts", u($"posts"))

Using explode and collect_list

df.withColumn("posts", explode($"posts")).filter($"posts.createdTime" > 3 && $"posts.createdTime" < 9).groupBy("test").agg(collect_list("posts").as("posts"))

This is probably less efficient than the previous one but it is a one liner (and at one point or another in the future it might get optimized).

Using databricks tools

If you are working on databricks cloud you can use higher order functions. see here for more information. Since this is not a general general spark option I will not go over it.
Hopefully, in the future they will integrate it into standard spark (I found this jira on the subject but it is not currently supported).

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.