1

How to implement ConsumerRebalanceListener using Scala?

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
}

And what would be example of that newly made Scala rebalance listener when subscribing to topics?

Trying to learn and wrap my mind around implementing Java methods/interfaces in Scala..

Thanks.

2 Answers 2

2

You can just extend the interface directly

class MyListener extends ConsumerRebalanceListener {
     ...
 }

And the example from API docs would look like:

  class SaveOffsetsOnRebalance(consumer: Consumer[_, _] ) extends ConsumerRebalanceListener {

   def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
       // save the offsets in an external store using some custom code not described
   partitions.toScala.forEach(
         saveOffsetInExternalStore(consumer.position(partition))
   )
   }

   def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
       // read the offsets from an external store using some custom code not described here
       partitions.forEach(
          consumer.seek(partition, readOffsetFromExternalStore(partition)))
   }
 }

Just add proper imports

Sign up to request clarification or add additional context in comments.

3 Comments

Shiash What is (consumer Consumer[_, _] ) taking as parameters?
Thanks. So the type that is passed is of type Consumer? Is a Consumer a class? This part looks like a map [_, _]. I am just not familiar with this notation (does _ means any?) and where to look for Consumer to see the that type? Thanks..
The Consumer type is taken from the api documentation example. All the code snippet I took from there just translated into Scala syntax. [_, _] in Scala is equivalent to java <?, ?>, basically a generic with 2 type parameters which you don’t care what they are
0

There are traits in Scala corresponding to interfaces in Java. Scala trait gets converted to Java Interfaces internally. And just like we implement interfaces in Java, the same way we extend traits in Scala. So you just need to extend that Java interfaces as if it were a Scala trait because under the hood both are same.

class SaveOffsetsOnRebalance extends ConsumerRebalanceListener {}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.