I have a pairRDD that looks like
(1, {"id":1, "picture": "url1"})
(2, {"id":2, "picture": "url2"})
(3, {"id":3, "picture": "url3"})
...
where second element is a string, i got from function get() from http://alvinalexander.com/scala/how-to-write-scala-http-get-request-client-source-fromurl. here is that function:
@throws(classOf[java.io.IOException])
@throws(classOf[java.net.SocketTimeoutException])
def get(url: String,
connectTimeout: Int = 5000,
readTimeout: Int = 5000,
requestMethod: String = "GET") =
{
import java.net.{URL, HttpURLConnection}
val connection = (new URL(url)).openConnection.asInstanceOf[HttpURLConnection]
connection.setConnectTimeout(connectTimeout)
connection.setReadTimeout(readTimeout)
connection.setRequestMethod(requestMethod)
val inputStream = connection.getInputStream
val content = io.Source.fromInputStream(inputStream).mkString
if (inputStream != null) inputStream.close
content
}
now I want to convert that string to json to get picture url from it. (from this https://stackoverflow.com/a/38271732/1456026)
val step2 = pairRDD_1.map({case(x,y)=>{
val jsonStr = y
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)
(x,y("picture"))
}})
but i'm constantly getting
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
when i printed out first 20 elements and tried to convert strings to json manually one-by-one outside .map it worked.
val rdd = sc.parallelize(Seq("""{"id":1, "picture": "url1"}"""))
val df = sqlContext.read.json(rdd)
println(df)
>>>[id: string, picture: string]
how to convert string to json in spark/scala inside .map?