5

When streaming from Kafka using Spark 2.0, I am getting the following error:

org.apache.spark.SparkException: 
Job aborted due to stage failure: 
Task 0.0 in stage 1.0 (TID 1) had a not serializable result: 
org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: 
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337, 
CreateTime = 1472871209063, checksum = 2826679694, 
serialized key size = -1, serialized value size = 95874, 
key = null, value = <JSON GOES HERE...>

Here are the relevant portion of the code:

val ssc = new StreamingContext(sc, Seconds(2))

val topics = Array("ecfs")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream
  .map(_.value())
  .flatMap(message =>  {
    // parsing here...
  })
  .foreachRDD(rdd => {
    // processing here...
  })

ssc.start()

From what I can tell, it is this line that's causing the problem .map(_.value()), how can this be fixed?

1 Answer 1

1

You cannot use .map on Dstream:[String,String] like you used there. I think you can use transform and then apply map as follows

val streamed_rdd_final = streamed_rdd.transform{ rdd => rdd.map(x => x.split("\t")).map(x=>Array(check_time_to_send.toString,check_time_to_send_utc.toString,x(1),x(2),x(3),x(4),x(5))).map(x => x(1)+"\t"+x(2)+"\t"+x(3)+"\t"+x(4)+"\t"+x(5)+"\t"+x(6)+"\t"+x(7)+"\t")}

or you can use .map as you used but rather doing _.value() you should try sending a function into the map, like I did below

stream.map{case (x, y) => (y.toString)}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.