5

I don't know if this question is a repetition but somehow all the answers I came across don't seem to work for me (maybe I'm doing something wrong).

I have a class defined thus:

case class myRec(
                 time: String,
                 client_title: String,
                 made_on_behalf: Double,
                 country: String,
                 email_address: String,
                 phone: String)

and a sample Json file that contains records or objects in the form

[{...}{...}{...}...] 

i.e

[{"time": "2015-05-01 02:25:47",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Brussel",
"email_address": "[email protected]"},
{"time": "2015-05-01 04:15:03",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Bundesliga",
"email_address": "[email protected]"},
{"time": "2015-05-01 06:29:18",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Japan",
"email_address": "[email protected]"}...]

my build.sbt has libraryDependencies += "com.owlike" % "genson-scala_2.11" % "1.3" for scalaVersion := "2.11.7",

I have a scala function defined thus

//PS: Other imports already made
import com.owlike.genson.defaultGenson_

//PS: Spark context already defined
def prepData(infile:String):RDD[myRec] = {

val input = sc.textFile(infile)
//Read Json Data into my Record Case class
input.mapPartitions( records =>
  records.map( record => fromJson[myRec](record))
)}

And I'm calling the function

prepData("file://path/to/abc.json")

Is there any way of doing this or is there any other Json library I can use to convert to RDD

I also tried this too and both don't seem to work

Using ScalaObjectMapper

PS: I don't want to go through spark SQL to process the json file

Thanks!

1
  • Is it your exact input? Commented Sep 3, 2015 at 19:06

2 Answers 2

3

Jyd, not using Spark SQL for JSON is an interesting choice, but its very much doable. There is an example of how to do this is in the Learning Spark book's examples (disclaimer I am one of the co-authors so a little biased). The examples are on github https://github.com/databricks/learning-spark, but here is the relevant code snippet:

case class Person(name: String, lovesPandas: Boolean) // Note: must be a top level class

object BasicParseJsonWithJackson {

  def main(args: Array[String]) {
    if (args.length < 3) {
      println("Usage: [sparkmaster] [inputfile] [outputfile]")
      exit(1)
      }
    val master = args(0)
    val inputFile = args(1)
    val outputFile = args(2)
    val sc = new SparkContext(master, "BasicParseJsonWithJackson", System.getenv("SPARK_HOME"))
    val input = sc.textFile(inputFile)

    // Parse it into a specific case class. We use mapPartitions beacuse:
    // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
    //     on the driver and have to send data back to the driver to go through the singleton object.
    //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
    // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
    //     partition with mapPartitions. Solves serialization and object creation performance hit.
    val result = input.mapPartitions(records => {
        // mapper object created on each executor node
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.registerModule(DefaultScalaModule)
        // We use flatMap to handle errors
        // by returning an empty list (None) if we encounter an issue and a
        // list with one element if everything is ok (Some(_)).
        records.flatMap(record => {
          try {
            Some(mapper.readValue(record, classOf[Person]))
          } catch {
            case e: Exception => None
          }
        })
    }, true)
    result.filter(_.lovesPandas).mapPartitions(records => {
      val mapper = new ObjectMapper with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)
      records.map(mapper.writeValueAsString(_))
    })
      .saveAsTextFile(outputFile)
    }
}

Note this uses Jackson (specifically "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3" & "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.3.3" dependencies).

I just noticed that your question had some sample input and as @zero323 pointed out line by line parsing isn't going to work. Instead you would do:

    val input = sc.wholeTextFiles(inputFile).map(_._2)

    // Parse it into a specific case class. We use mapPartitions beacuse:
    // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
    //     on the driver and have to send data back to the driver to go through the singleton object.
    //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
    // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
    //     partition with mapPartitions. Solves serialization and object creation performance hit.
    val result = input.mapPartitions(records => {
        // mapper object created on each executor node
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.registerModule(DefaultScalaModule)
        // We use flatMap to handle errors
        // by returning an empty list (None) if we encounter an issue and a
        // list with one element if everything is ok (List(_)).
        records.flatMap(record => {
          try {
            mapper.readValue(record, classOf[List[Person]])
          } catch {
            case e: Exception => None
          }
        })
    })
Sign up to request clarification or add additional context in comments.

12 Comments

Hi, Holden, If I'm not mistaken I saw one of your presentations on you tube, nice piece, I'm actually following you on linkedIn :). The thing is for some obvious reasons, I don't want to use the filter in the function, I actually have the book and Pg 75 of the book didn't say much on how the function should look like, and I didn't quite understand it though, I've tried it the code snippet in your answer, Whenever I run it (without the filter), it returns a count of dataset of 0. Can you advice on what to do?
@Jyd I am pretty sure this fails for the same reason as your other attempts - it tries to parse input line by line and there is no line in your input that is a valid JSON by itself.
Awesome Jyd :), as @zero323 mentioned (and I didn't notice when looking at your input), you are going to need to do file at a time parsing - so I've updated the answer to include how to do that as well.
@zero323 hmm, during the map phase, the record is something similar to a valid JSON object..I presume?
Yah the record here each represents an entire file worth of data.
|
1

Just for fun you can try to split individual documents using specific delimiter. While it won't work on complex nested documents it should handle example input without using wholeTextFiles:

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.conf.Configuration
import net.liftweb.json.{parse, JObject, JField, JString, JInt}

case class MyRec(
  time: String,
  client_title: String,
  made_on_behalf: Double,
  country: String,
  email_address: String)

@transient val conf = new Configuration
conf.set("textinputformat.record.delimiter", "},\n{")

def clean(s: String) = {
   val p = "(?s)\\[?\\{?(.*?)\\}?\\]?".r
   s match {
     case p(x) => Some(s"{$x}")
     case _ => None
   }
}

def toRec(os: Option[String]) = {
  os match {
    case Some(s) => 
      for {
        JObject(o) <- parse(s);
        JField("time", JString(time)) <- o;
        JField("client_title", JString(client_title)) <- o;
        JField("made_on_behalf", JInt(made_on_behalf)) <- o
        JField("country", JString(country)) <- o;
        JField("email_address", JString(email)) <- o
      } yield MyRec(time, client_title, made_on_behalf.toDouble, country, email)
    case _ => Nil
  }
}

val records = sc.newAPIHadoopFile("some.json",
    classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
      .map{case (_, txt) => clean(txt.toString)}
      .flatMap(toRec)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.