5

Spark 2.3.0 with Scala 2.11. I'm implementing a custom Aggregator according to the docs here. The aggregator requires 3 types for input, buffer, and output.

My aggregator has to act upon all previous rows in the window so I declared it like this:

case class Foo(...)

object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
    // other override methods
    override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}

One of the override methods is supposed to return the encoder for the buffer type, which in this case is a ListBuffer. I can't find any suitable encoder for org.apache.spark.sql.Encoders nor any other way to encode this so I don't know what to return here.

I thought of creating a new case class which has a single property of type ListBuffer[Foo] and using that as my buffer class, and then using Encoders.product on that, but I am not sure if that is necessary or if there is something else I am missing. Thanks for any tips.

3
  • Just analyzing what you said to understand things better. "My aggregator has to act upon all previous rows in the window so I declared it like this:" does it make any difference whether your aggregator acts upon previous rows or any rows? Just curious how important this is. Commented May 4, 2018 at 20:34
  • In my use case say there are 5 rows, the result for row 1 depends on row 1, the result for row 2 depends on rows 1 and 2, the result for row 3 depends on 1-3, etc . This depends on a specific row sort. If every row depended on all other rows I guess I'd have to do this in 2 passes, first to collect all of the values for the window with collect_list or collect_set, and another pass to calculate the aggregated values. Commented May 8, 2018 at 18:25
  • Isn't that window aggregation and the requirement of "previous rows" a window specification? Commented May 8, 2018 at 20:11

2 Answers 2

7

You should just let Spark SQL do its work and find the proper encoder using ExpressionEncoder as follows:

scala> spark.version
res0: String = 2.3.0

case class Mod(id: Long)

import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you that is remarkably simple and appears to work. The encoder that is implicitly created is not one that I could have created explicitly.
2

I cannot see anything in org.apache.spark.sql.Encoders that could be used to directly encode a ListBuffer, or for that matter even a List

One option seems to be going with putting it in a case class, as you suggested:

import org.apache.spark.sql.Encoders

case class Foo(field: String)
case class Wrapper(lb: scala.collection.mutable.ListBuffer[Foo])
Encoders.product[Wrapper]

Another option could be to use kryo:

Encoders.kryo[scala.collection.mutable.ListBuffer[Foo]]

Or finally you could look at ExpressionEncoders, which extend Encoder:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]]

This is the best solution, as it keeps everything transparent to catalyst and therefore allows it to do all of its wonderful optimisations.

One thing I noticed whilst having a play:

ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]].schema == ExpressionEncoder[List[Foo]].schema

I haven't tested any of the above whilst executing aggregations, so there may be runtime issues. Hope this is helpful.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.