0

Connecting to spark streaming with external source like MS SQL server and publishing table data to Kafka.

Getting

java.io.NotSerializableException:org.apache.kafka.clients.producer.KafkaProducer error.

Please find below deails.

    **CustomReceiver.sacla**
    package com.sparkdemo.app
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.receiver.Receiver
    import java.util.List
    import java.util.Map
    import com.sparkdemo.entity.Inventory
    import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
    import java.net.ConnectException
    import scala.util.{Try, Success, Failure}
    import collection.JavaConversions._


    class CustomReceiver(topic: String, kafkaParams: Map[String, Object]) extends Receiver[Inventory](StorageLevel.MEMORY_AND_DISK_2) {

      override def onStart(): Unit = {

        val dataService = new DataService()
        var records: Inventory = dataService.selectAll()

        new Thread("Socket Receiver") {

          override def run() {
            Try {
              val consumer = new KafkaConsumer(kafkaParams)
              consumer.subscribe(topic)
              while (!isStopped && records!=null) {

                // store(tokenData)
                // tokenData = new DataService().selectAll();
                val records = new DataService().selectAll();
                store(records)
              }
            } match {
              case e: ConnectException =>
                restart("Error connecting to...", e)
              case t: Throwable =>
                restart("Error receiving data", t)
            }

          }
        }.start()

      }

      override def onStop(): Unit = {
        println("Nothing")
      }
    }

    **DataService.scala**
    package com.sparkdemo.app;
    import java.sql.Connection
    import java.sql.DriverManager
    import java.sql.ResultSet
    import java.sql.Statement
    import java.util._
    import scala.collection.JavaConversions._
    import com.sparkdemo.entity.Inventory

    class DataService {

      var connect: Connection = DriverManager.getConnection(
        "jdbc:sqlserver://localhost;databaseName=TestDB;user=SA;password=Sqlserver@007")

      var statement: Statement = connect.createStatement()

      var resultSet: ResultSet = null

      var inv: Inventory = new Inventory()

      Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")

      def selectAll(): Inventory = {
        resultSet = statement.executeQuery("select * from Inventory")
        while (resultSet.next()) {
          val inv: Inventory = new Inventory()
          inv.setId(resultSet.getInt("id"))
          inv.setName(resultSet.getString("name"))
          inv.setQuantity(resultSet.getInt("quantity"))
        }
        inv
      }

    }
  Scala main class   **Stream.scala**
    package com.sparkdemo.app
    import org.apache.spark.streaming.dstream.DStream
    import com.sparkdemo.config.Config
    import org.apache.kafka.common.serialization.{ StringDeserializer, StringSerializer }
    import org.apache.kafka.clients.producer.{ ProducerRecord, KafkaProducer }
    import java.util.Properties
    import collection.JavaConversions._
    import com.sparkdemo.entity.Inventory


    object Stream extends Serializable{

      def main(args: Array[String]) {
        import org.apache.spark.streaming._

        def getKafkaParams: Map[String, Object] = {
          Map[String, Object](
            "auto.offset.reset" -> "earliest",
            "bootstrap.servers" -> "localhost:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "group3")
        }

        val properties = new Properties()
        properties.put("bootstrap.servers", "localhost:9092")
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        val topic1 = "topic1"
        val topic2 = "topic2"
        val producer :KafkaProducer[String, Object] = new KafkaProducer(properties)

        val ssc = Config.configReceiver()
        val stream = ssc.receiverStream(new CustomReceiver(topic1, getKafkaParams))

        stream.map(Inventory=>producer.send(new ProducerRecord[String,Object](topic2,Inventory)))

        stream.print()


        ssc.start()
        ssc.awaitTermination()

      }

    }

Entity class: **Inventory.scala**
    package com.sparkdemo.entity

    import scala.beans.{BeanProperty}


    class Inventory extends Serializable{

      @BeanProperty
      var id: Int = _

      @BeanProperty
      var name: String = _

      @BeanProperty
      var quantity: Int = _

    }

Error:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:547)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:547)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:546)
    at com.sparkdemo.app.Stream$.main(Stream.scala:36)
    at com.sparkdemo.app.Stream.main(Stream.scala)
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.producer.KafkaProducer, value: org.apache.kafka.clients.producer.KafkaProducer@557286ad)
    - field (class: com.sparkdemo.app.Stream$$anonfun$main$1, name: producer$1, type: class org.apache.kafka.clients.producer.KafkaProducer)
    - object (class com.sparkdemo.app.Stream$$anonfun$main$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 12 more

2 Answers 2

1

You have ran issue where kafkaproducer is sent unintentionally to executor because of below code stream.map(Inventory=>producer.send(new ProducerRecordString,Object))

You can mappartitions and create producer in mappartitions so that it is not shipped to executors.

Sign up to request clarification or add additional context in comments.

Comments

0

The problem is the type of Serializer you are using for Object type value.

properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

Please write a Serializer to read the Object type values.You can refer below link Send Custom Java Objects to Kafka Topic

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.