2

I am currently learning Scala & was trying to create a SimpleConsumer for retrieving messages from a Kafka partition.

The consumer should be able to handle the following tasks:

  1. Keep track of Offsets.
  2. Figure out which Broker is the lead Broker for a topic and partition
  3. Must be able to handle Broker leader changes.

I was able to find a very good documentation to create this consumer in Java (https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example).

Does anyone have a sample Scala code for creating this simpleconsumer or if you could refer me some documentation which will point me in the right direction it will be greatly appreciated.

2
  • What happens if in case your consumer goes down? Are you persisting the Offset somewhere? Commented Jan 13, 2016 at 16:12
  • @sparkr It migh be a little late though, you have several options in case your consumer dows down, used kafka storage or you can use external storage like Zookeeper or Hbase. A very good article about it is here: blog.cloudera.com/blog/2017/06/… Commented Nov 15, 2017 at 16:57

2 Answers 2

5

Here is the sample code of a Simple Kafka consumer written in Scala. Got it working after few trial and errors.

package com.Kafka.Consumer

import kafka.api.FetchRequest
import kafka.api.FetchRequestBuilder
import kafka.api.PartitionOffsetRequestInfo
import kafka.common.ErrorMapping
import kafka.common.TopicAndPartition
import kafka.javaapi._
import kafka.javaapi.consumer.SimpleConsumer
import kafka.message.MessageAndOffset
import java.nio.ByteBuffer
import java.util.ArrayList
import java.util.Collections
import java.util.HashMap
import java.util.List
import java.util.Map
import SimpleExample._

//remove if not needed
import scala.collection.JavaConversions._

object SimpleExample {

  def main(args: Array[String]) {
    val example = new SimpleExample()
    val maxReads = java.lang.Integer.parseInt(args(0))
    val topic = args(1)
    val partition = java.lang.Integer.parseInt(args(2))
    val seeds = new ArrayList[String]()
    seeds.add(args(3))
    val port = java.lang.Integer.parseInt(args(4))
    try {
      example.run(maxReads, topic, partition, seeds, port)
    } catch {
      case e: Exception => {
        println("Oops:" + e)
        e.printStackTrace()
      }
    }
  }

  def getLastOffset(consumer: SimpleConsumer, 
      topic: String, 
      partition: Int, 
      whichTime: Long, 
      clientName: String): Long = {
    val topicAndPartition = new TopicAndPartition(topic, partition)
    val requestInfo = new HashMap[TopicAndPartition, PartitionOffsetRequestInfo]()
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1))
    val request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName)
    val response = consumer.getOffsetsBefore(request)
    if (response.hasError) {
      println("Error fetching data Offset Data the Broker. Reason: " + 
        response.errorCode(topic, partition))
      return 0
    }
    val offsets = response.offsets(topic, partition)
    offsets(0)
  }
}

class SimpleExample {

  private var m_replicaBrokers: List[String] = new ArrayList[String]()

  def run(a_maxReads: Int, 
      a_topic: String, 
      a_partition: Int, 
      a_seedBrokers: List[String], 
      a_port: Int) {
    val metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition)
    if (metadata == null) {
      println("Can't find metadata for Topic and Partition. Exiting")
      return
    }
    if (metadata.leader == null) {
      println("Can't find Leader for Topic and Partition. Exiting")
      return
    }
    var leadBroker = metadata.leader.host
    val clientName = "Client_" + a_topic + "_" + a_partition
    var consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName)
    var readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime, clientName)
    var numErrors = 0
    //while (a_maxReads > 0) {
      if (consumer == null) {
        consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName)
      }
      val req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 
        100000)
        .build()
      val fetchResponse = consumer.fetch(req)
      if (fetchResponse.hasError) {
        numErrors += 1
        val code = fetchResponse.errorCode(a_topic, a_partition)
        println("Error fetching data from the Broker:" + leadBroker + 
          " Reason: " + 
          code)
        if (numErrors > 5) //break
        if (code == ErrorMapping.OffsetOutOfRangeCode) {
          readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime, clientName)
          //continue
        }
        consumer.close()
        consumer = null
        leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port)
        //continue
      }
      numErrors = 0
      var numRead = 0
      for (messageAndOffset <- fetchResponse.messageSet(a_topic, a_partition)) {
        val currentOffset = messageAndOffset.offset
        if (currentOffset < readOffset) {
          println("Found an old offset: " + currentOffset + " Expecting: " + 
            readOffset)
          //continue
        }
        readOffset = messageAndOffset.nextOffset
        val payload = messageAndOffset.message.payload
        val bytes = Array.ofDim[Byte](payload.limit())
        payload.get(bytes)
        println(String.valueOf(messageAndOffset.offset) + ": " + new String(bytes, "UTF-8"))
        numRead += 1
       // a_maxReads -= 1
      }
      if (numRead == 0) {
        try {
          Thread.sleep(1000)
        } catch {
          case ie: InterruptedException => 
        }
      }
    //}
    if (consumer != null) consumer.close()
  }

  private def findNewLeader(a_oldLeader: String, 
      a_topic: String, 
      a_partition: Int, 
      a_port: Int): String = {
    for (i <- 0 until 3) {
      var goToSleep = false
      val metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition)
      if (metadata == null) {
        goToSleep = true
      } else if (metadata.leader == null) {
        goToSleep = true
      } else if (a_oldLeader.equalsIgnoreCase(metadata.leader.host) && i == 0) {
        goToSleep = true
      } else {
        return metadata.leader.host
      }
      if (goToSleep) {
        try {
          Thread.sleep(1000)
        } catch {
          case ie: InterruptedException => 
        }
      }
    }
    println("Unable to find new leader after Broker failure. Exiting")
    throw new Exception("Unable to find new leader after Broker failure. Exiting")
  }

  private def findLeader(a_seedBrokers: List[String], 
      a_port: Int, 
      a_topic: String, 
      a_partition: Int): PartitionMetadata = {
    var returnMetaData: PartitionMetadata = null

    for (seed <- a_seedBrokers) {
      var consumer: SimpleConsumer = null
      try {
        consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup")
        val topics = Collections.singletonList(a_topic)
        val req = new TopicMetadataRequest(topics)
        val resp = consumer.send(req)
        val metaData = resp.topicsMetadata
        for (item <- metaData; part <- item.partitionsMetadata){
          if (part.partitionId == a_partition) {
          returnMetaData = part
         //break
        }
        }
      } catch {
        case e: Exception => println("Error communicating with Broker [" + seed + "] to find Leader for [" + 
          a_topic + 
          ", " + 
          a_partition + 
          "] Reason: " + 
          e)
      } finally {
        if (consumer != null) consumer.close()
      }
    }
    if (returnMetaData != null) {
      m_replicaBrokers.clear()
      for (replica <- returnMetaData.replicas) {
        m_replicaBrokers.add(replica.host)
      }
    }
    returnMetaData
  }
}
Sign up to request clarification or add additional context in comments.

4 Comments

can u provide sample run time arguments to main method
The example expects the following parameters: 1. Maximum number of messages to read (so we don’t loop forever) 2. Topic to read from 3. Partition to read from 4. One broker to use for Metadata lookup 5. Port the brokers listen on
This is not Scala, but rather Java in disguise!
what kafka version are you using? (sbt/maven)
0

I built a simple kafka consumer and producer using scala.

consumer:

package com.kafka

import java.util.concurrent._
import java.util.{Collections, Properties}

import com.sun.javafx.util.Logging
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}

import scala.collection.JavaConversions._

class Consumer(val brokers: String,
               val groupId: String,
               val topic: String) extends Logging {

  val props = createConsumerConfig(brokers, groupId)
  val consumer = new KafkaConsumer[String, String](props)
  var executor: ExecutorService = null

  def shutdown() = {
    if (consumer != null)
      consumer.close()
    if (executor != null)
      executor.shutdown()
  }

  def createConsumerConfig(brokers: String, groupId: String): Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  def run() = {
    consumer.subscribe(Collections.singletonList(this.topic))

    Executors.newSingleThreadExecutor.execute(new Runnable {
      override def run(): Unit = {
        while (true) {
          val records = consumer.poll(1000)

          for (record <- records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
          }
        }
      }
    })
  }
}

object Consumer extends App{
  val newArgs = Array("localhost:9092", "2","test")
  val example = new Consumer(newArgs(0), newArgs(1), newArgs(2))
  example.run()
}

producer:

package com.kafka

import java.util.{Date, Properties}

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord

object Producer extends App{
  val newArgs = Array("20","test","localhost:9092")
  val events = newArgs(0).toInt
  val topic = newArgs(1)
  val brokers = newArgs(2)
  val props = new Properties()
  props.put("bootstrap.servers", brokers)
  props.put("client.id", "producer")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


  val producer = new KafkaProducer[String, String](props)
  val t = System.currentTimeMillis()
  for (nEvents <- Range(0, events)) {
    val key = "messageKey " + nEvents.toString
    val msg = "test message"
    val data = new ProducerRecord[String, String](topic, key, msg)

    //async
    //producer.send(data, (m,e) => {})
    //sync
    producer.send(data)
  }

  System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t))
  producer.close()
}

1 Comment

what kafka version are you using? (sbt/maven)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.