1

Im using the below custom receiver to consume data from Rabbitmq in Spark-Scala. Below is my code.

def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
   def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
     
     var batchInterval = Seconds(20)
              var ssc = new StreamingContext(sc, batchInterval)
      val host = ""
                val port = ""
                val queueName = ""
                val vHost = ""
                val userName = ""
                val password = ""
val maxMessagesPerPartition = "1000"
                val maxReceiveTime = "0.9"
      
        val receiverStream = RabbitMQUtils.createStream(ssc, Map(
      "host" -> "host",
      "port" -> "port",
      "queueName" -> "queueName",
      "vHost" -> "vHost",
      "userName" -> "userName",
      "password" -> "password", 
"maxMessagesPerPartition" -> "maxMessagesPerPartition",
 "maxReceiveTime"   -> "maxReceiveTime"
          ))
     
       val lines = ssc.receiverStream(new CustomReceiver(host, port.toInt))
    lines.foreachRDD(rdd =>{ val df=rdd.toDF
                        
 
  import sqlContext.implicits._
 
  df.write.format("parquet").mode("append").save("path")
  
})

      lines.print()
      ssc.start() 
      ssc.awaitTermination()

Im getting the below time out error.

java.util.concurrent.TimeoutException
at org.apache.spark.util.ThreadUtils$.runInNewThreadWithTimeout(ThreadUtils.scala:351)
    at org.apache.spark.util.ThreadUtils$.runInNewThread(ThreadUtils.scala:283)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:577)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:87)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:188)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:190)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw.<init>(command-1212830188116081:192)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw.<init>(command-1212830188116081:194)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw.<init>(command-1212830188116081:196)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read.<init>(command-1212830188116081:198)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<init>(command-1212830188116081:202)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<clinit>(command-1212830188116081)
    at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print$lzycompute(<notebook>:7)
    at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print(<notebook>:6)
    at lined28d5369d60244b0a66d1d87a30c93a027.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
     
    

Does this error means that the current spark cluster configuration is not able to handle the incoming message load. Is it related to any memory issue.Could someone please assist.

1 Answer 1

1

I would suggest to Increase spark.driver.memory to higher value.

Also try to increase the broadcastTimeout.

Refer this answer by T. Gawęda

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.