0

I have a pairRDD that looks like

(1, {"id":1, "picture": "url1"})
(2, {"id":2, "picture": "url2"})
(3, {"id":3, "picture": "url3"})
...

where second element is a string, i got from function get() from http://alvinalexander.com/scala/how-to-write-scala-http-get-request-client-source-fromurl. here is that function:

@throws(classOf[java.io.IOException])
@throws(classOf[java.net.SocketTimeoutException])
def get(url: String,
        connectTimeout: Int = 5000,
        readTimeout: Int = 5000,
        requestMethod: String = "GET") =
{
    import java.net.{URL, HttpURLConnection}
    val connection = (new URL(url)).openConnection.asInstanceOf[HttpURLConnection]
    connection.setConnectTimeout(connectTimeout)
    connection.setReadTimeout(readTimeout)
    connection.setRequestMethod(requestMethod)
    val inputStream = connection.getInputStream
    val content = io.Source.fromInputStream(inputStream).mkString
    if (inputStream != null) inputStream.close
    content
}

now I want to convert that string to json to get picture url from it. (from this https://stackoverflow.com/a/38271732/1456026)

val step2 = pairRDD_1.map({case(x,y)=>{
val jsonStr = y
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)
(x,y("picture"))
}})

but i'm constantly getting

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

when i printed out first 20 elements and tried to convert strings to json manually one-by-one outside .map it worked.

val rdd = sc.parallelize(Seq("""{"id":1, "picture": "url1"}"""))
val df = sqlContext.read.json(rdd)
println(df)
>>>[id: string, picture: string]

how to convert string to json in spark/scala inside .map?

3 Answers 3

1

You cannot use SparkContext in a distributed operation. In the code above, you cannot access SparkContext in the map operation on pairRDD_1.

Consider using a JSON library to perform the conversion.

Sign up to request clarification or add additional context in comments.

Comments

0

Typically when you see this message, it's because you are using a resource in your map function (read anonymous function) that was defined outside of it, and is not able to be serialized.

Running in clustered mode, the anonymous function will be running on a different machine altogether. On that separate machine, a new instance of your app is instantiated and it's state (variables/values/etc) is set from data that has been serialized by the driver and sent to the new instance. If you anonymous function is a closure (i.e. utilizes variables outside of it's scope), then those resources must be serializable, in order to be sent to the worker nodes.

For example, a map function may attempt to use a database connection to grab some information for each record in the RDD. That database connection is only valid on the host that created it (from a networking perspective, of course), which is typically the driver program, so it cannot be serialized, sent, and utilized from a different host. In this particular example, you would do a mapPartitions() to instantiate a database connection from the worker itself, then map() each of the records within that partition to query the database.

I can't provide much further help without your full code example, to see what potential value or variable is unable to be serialized.

Comments

0

One of the answers is to use json4s library. source: http://muster.json4s.org/docs/jawn_codec.html

//case class defined outside main()
case class Pictures(id: String, picture: String)

// import library
import muster._
import muster.codec.jawn._

// here all the magic happens 
val json_read_RDD = pairRDD_1.map({case(x,y) =>
      {
          val json_read_to_case_class = JawnCodec.as[Pictures](y)
          (x, json_read_to_case_class.picture)
    }})

// add to build.sbt
libraryDependencies ++= Seq(
"org.json4s" %% "muster-codec-json" % "0.3.0",
"org.json4s" %% "muster-codec-jawn" % "0.3.0")

credits goes to Travis Hegner, who explained why original code didn't work and to Anton Okolnychyi for advice of using json library.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.