1

i'm here again, i trying to read data from kafka_0.9.0.0 topic using spark streaming_1.6.1 class written in scala -2.10.5. Its a simple program i built it in sbt_0.13.12. When i run the program i'm getting this exception

(run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: [B cannot be cast to java.lang.String [error] at org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [error] at org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [error] at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [error]
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) [error] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [error] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) [error] at org.apache.spark.scheduler.Task.run(Task.scala:89) [error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) [error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [error] at java.lang.Thread.run(Thread.java:745) [error] [error] Driver stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: [B cannot be cast to java.lang.String

Here is the scala program,

1 package org.kafka.receiver
      2 case class mobileData(action: String, tenantid: Int, lat: Float, lon: Float, memberid: Int, event_name: String, productUpccd: Int, device_type: String, device_os_ver: Float, item_na        me: String)
      3 import java.util.HashMap
      4 import org.apache.avro.SchemaBuilder
      5 import org.apache.spark.SparkConf
      6 import org.apache.spark.SparkContext
      7 import org.apache.spark.serializer.KryoSerializer
      8 import org.apache.spark.storage.StorageLevel
      9 import org.apache.spark.streaming.Seconds
     10 import org.apache.spark.streaming.StreamingContext
     11 import org.apache.spark.streaming.StreamingContext._
     12 import org.apache.spark.SparkContext._
     13 import org.apache.spark.streaming.dstream.ReceiverInputDStream
     14 import org.apache.spark.streaming.kafka.KafkaUtils
     15 import kafka.serializer.DefaultDecoder
     16 import org.apache.spark.sql.SQLContext
     17 import com.sun.istack.internal.logging.Logger
     18 object AvroCons {
     19   val eventSchema = SchemaBuilder.record("eventRecord").fields
     20     .name("action").`type`().stringType().noDefault()
     21     .name("tenantid").`type`().intType().noDefault()
     22     .name("lat").`type`().doubleType().noDefault()
     23     .name("lon").`type`().doubleType().noDefault()
     24     .name("memberid").`type`().intType().noDefault()
     25     .name("event_name").`type`().stringType().noDefault()
     26     .name("productUpccd").`type`().intType().noDefault()
     27     .name("device_type").`type`().stringType().noDefault()
     28     .name("device_os_ver").`type`().stringType().noDefault()
     29     .name("item_name").`type`().stringType().noDefault().endRecord
     30     def main(args: Array[String]): Unit = {
     31
     32     val sparkConf = new SparkConf().setAppName("Avro Consumer").
     33       set("spark.driver.allowMultipleContexts", "true").setMaster("local[2]")
     34     sparkConf.set("spark.cores.max", "2")
     35     sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
     36     sparkConf.set("spark.sql.tungsten.enabled", "true")
     37     sparkConf.set("spark.eventLog.enabled", "true")
     38     sparkConf.set("spark.app.id", "KafkaConsumer")
     39     sparkConf.set("spark.io.compression.codec", "snappy")
     40     sparkConf.set("spark.rdd.compress", "true")
     41     sparkConf.set("spark.streaming.backpressure.enabled", "true")
     42     sparkConf.set("spark.sql.avro.compression.codec", "snappy")
     43     sparkConf.set("spark.sql.avro.mergeSchema", "true")
     44     sparkConf.set("spark.sql.avro.binaryAsString", "true")
     45       val sc = new SparkContext(sparkConf)
     46     sc.hadoopConfiguration.set("avro.enable.summary-metadata", "false")
     47     val ssc = new StreamingContext(sc, Seconds(2))
     48     val kafkaConf = Map[String, String]("metadata.broker.list" -> "############:9092",
     49         "zookeeper.connect" -> "#############",
     50         "group.id" -> "KafkaConsumer",
     51         "zookeeper.connection.timeout.ms" -> "1000000")
     52       val topicMaps = Map("fishbowl" -> 1)
     53       val messages  = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

     54   messages.print()    
     55   val lines = messages.map(x=>x._2);  lines.foreachRDD((rdd,time)=>{
     56 val count = rdd.count()
     57 if(count>0)
     58 rdd.foreach(record=>{println(record)})})
     59
     60   ssc.start()
     61     ssc.awaitTermination()
     62     }
     63
     64 }

And here is my build.sbt

name := "AvroConsumer" 
version := "1.0" 
scalaVersion := "2.10.6"
jarName in assembly := "AvroConsumer.jar" 

libraryDependencies  += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided"    

libraryDependencies  += "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided" 

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1"

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-assembly_2.10" % "1.6.1"

libraryDependencies  += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1" 

libraryDependencies  += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13"

libraryDependencies += "org.openrdf.sesame" % "sesame-rio-api" % "2.7.2" 

libraryDependencies += "com.databricks" % "spark-csv_2.10" %  "0.1"

libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"

libraryDependencies += "log4j" % "log4j" % "1.2.17"

libraryDependencies += "org.apache.avro" % "avro-tools" % "1.7.4"

assemblyMergeStrategy in assembly := {  case PathList("META-INF", xs @
_*) => MergeStrategy.discard  case x => MergeStrategy.first }   

I'm preparing this code to create a DF from the kafka topic, so i had to set all those properties in sparkConf(). Here is the schema of my incoming data,

{
  "action": "AppEvent",
  "tenantid": 299,
  "lat": 0.0,
  "lon": 0.0,
  "memberid": 16445,
  "event_name": "CATEGORY_CLICK",
  "productUpccd": 0,
  "device_type": "iPhone",
  "device_os_ver": "10.1",
  "item_name": "CHICKEN"
}

And here is my kafka producer class.

public class KafkaAvroProducer {

    /* case class
     TopicData("action":"AppEvent","tenantid":1173,"lat":0.0,"lon":0.0,"memberid":55,
     "event_name":"CATEGORY_CLICK",
     "productUpccd":0,"device_type":"iPhone","device_os_ver":"10.1","item_name":"CHICKEN",*/

    public static final String EVENT_SCHEMA = "{" + "\"type\":\"record\","
            + "\"name\":\"eventrecord\"," + "\"fields\":["
            + "  { \"name\":\"action\", \"type\":\"string\" },"
            + "  { \"name\":\"tenantid\", \"type\":\"int\" },"
            + "  { \"name\":\"lat\", \"type\":\"double\" },"
            + "  { \"name\":\"lon\", \"type\":\"double\" },"
            + "  { \"name\":\"memberid\", \"type\":\"int\" },"
            + "  { \"name\":\"event_name\", \"type\":\"string\" },"
            + "  { \"name\":\"productUpccd\", \"type\":\"int\" },"
            + "  { \"name\":\"device_type\", \"type\":\"string\" },"
            + "  { \"name\":\"device_os_ver\", \"type\":\"string\" },"
            + "{ \"name\":\"item_name\", \"type\":\"string\" }" + "]}";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "##########:9092");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("producer.type", "async");
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(EVENT_SCHEMA);
        Injection<GenericRecord, String> avroRecords = GenericAvroCodecs.toJson(schema);
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i<300;i++){
            GenericData.Record avroRecord = new GenericData.Record(schema);
            setEventValues(i, avroRecord);
            String messages = avroRecords.apply(avroRecord);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("fishbowl",String.valueOf(i),messages);
            System.out.println(producerRecord);
            producer.send(producerRecord);

        }
        producer.close();
    }

    private static void setEventValues(int i, Record avroRecord) {

        avroRecord.put("action", "AppEvent");
        avroRecord.put("tenantid", i);
        avroRecord.put("lat", i*0.0);
        avroRecord.put("lon", 0.0);
        avroRecord.put("memberid", i*55);
        avroRecord.put("event_name", "CATEGORY_CLICK");
        avroRecord.put("productUpccd", 0);
        avroRecord.put("device_type", "iPhone");
        avroRecord.put("device_os_ver", "10.1");
        avroRecord.put("item_name", "CHICKEN");
    }

}
10
  • What's the type of the data inserted in Kafka? Commented Dec 15, 2016 at 7:41
  • I will provide you the producer class also, see for the changes. Commented Dec 15, 2016 at 8:27
  • Are you sure that Kafka is empty of other messages when this program starts? The exception says that we're trying to cast a Byte[] as a String but the Kafka deserializers look well configured as well as the producer. Try pruning kafka of all messages before testing. Commented Dec 16, 2016 at 12:34
  • Maybe you need to check consistency in the system. From your code, I read: Producer = KafkaProducer<String, String> producer and consumer = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder] which are matching correctly with each other. Commented Dec 17, 2016 at 12:07
  • ([B@2c15dae5,[B@4ed606ff) ([B@456577c8,[B@89b0f10) ([B@7bf9e7d9,[B@53e49725) ([B@7867a787,[B@5d27cdd2) ([B@6039c5c5,[B@70e739dd) ([B@5c881613,[B@6a6168da) ([B@baf522d,[B@233c0e59) ([B@3bd20721,[B@59d22635) ([B@73ca8fa4,[B@779c935f) ([B@488f7f52,[B@44f005b5) Commented Dec 17, 2016 at 12:12

1 Answer 1

5

The Kafka consumer should be configured to use the right decoder:

Instead of:

KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder]

For String it should be:

KafkaUtils.createStream[String, String,StringDecoder, StringDecoder]
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks Maasg for your help.
@jackAKAkarthik For a new issue, ask a new question. Don't ask follow-up questions on comments.
(0,[B@370ed56a) (1,[B@2edd3e63) (2,[B@3ba2944d) (3,[B@2eb669d1) (4,[B@49dd304c) (5,[B@4f6af565) (6,[B@7714e29e) (7,[B@7c2a3025) (8,[B@35bf0cd2) (9,[B@11e2ca8f)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.