1

I am trying to to create a strongly typed dataset for the case class Person. This is my code right now:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types._

case class Person(name: String,phone: String,address :Map[String, String])

val schema = ArrayBuffer[StructField]()
schema.appendAll(List(StructField("name", StringType), StructField("phone", StringType)))
schema.append(StructField("address", MapType(StringType, StringType)))

implicit val personEncoder = org.apache.spark.sql.Encoders.kryo[Person]

val sparkConf = new SparkConf().setAppName("dynamic-json-schema").setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._

val jsonDF = spark.read
.schema(StructType(schema.toList))
.json("""apath\data.json""")
.toDF()

jsonDF.as[Person].select("name", "phone")

And this is the input json data:

{"name":"Michael","phone":"2342233","address":{"street":"Lincoln", "number":"344", "postcode":"3245NM"}}
{"name":"Tony","phone":"4342223","address":{"street":"Pizla", "number":"12", "postcode":"9088AL"}}
{"name":"Maria","phone":"32233454","address":{"street":"Coco", "number":"32", "postcode":"8900PO"}}

Although I am getting the next error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<address:struct<number:string,postcode:string,street:string>,name:string,phone:string> to Tuple1, but failed as the number of fields does not line up.;

I am using spark 2.2.0. I understand that somehow is related to nested json and the mapping to class Person but what is the exact reason that spark can't convert Dataset[Row] -> Dataset[Person]?

10
  • 1
    I would try nested classes and a StructType rather than a Map Commented Mar 11, 2018 at 22:16
  • Hi @cricket_007, thank you for the response. You mean to change the Map[String, String] -> StructType for the Person class? Commented Mar 11, 2018 at 22:34
  • Because to set address to StructType I have tried already and I had exactly the same error Commented Mar 11, 2018 at 22:36
  • I more meant to create an Address class to be placed inside the Person Commented Mar 11, 2018 at 22:47
  • I just tried it here is the error again 'Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<name:string,phone:string,address:struct<street:string,number:string,postcode:string>> to Tuple1, but failed as the number of fields does not line up.; ' Commented Mar 11, 2018 at 22:54

1 Answer 1

2

If I remove the Kyro encoder, this works fine.

The nesting of your data isn't the problem, as it also works on non-nested JSON

import org.apache.spark.sql.SparkSession

case class Address(street: String, number: String, postcode: String)
case class Person(name: String, phone: String, address: Address)

object JsonReader extends App {
    val sparkSession = SparkSession.builder()
      .master("local")
      .getOrCreate()

    import sparkSession.implicits._

    val p = JsonReader.getClass.getClassLoader.getResource("input.json").toURI.getPath
    val df = sparkSession.read.json(p).as[Person]
    df.printSchema()
    df.show()

    df.select($"address.*").show
}
Sign up to request clarification or add additional context in comments.

1 Comment

Yes cricket indeed that was it. I tried with the Map[String, String] as well and it works fine also. Thank you for your help

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.