3

I want to read the JSON file in the below format:-

 {
  "titlename": "periodic",
    "atom": [
         {
          "usage": "neutron",
          "dailydata": [
    {
      "utcacquisitiontime": "2017-03-27T22:00:00Z",
      "datatimezone": "+02:00",
      "intervalvalue": 28128,
      "intervaltime": 15          
    },
    {
      "utcacquisitiontime": "2017-03-27T22:15:00Z",
      "datatimezone": "+02:00",
      "intervalvalue": 25687,
      "intervaltime": 15          
    }
   ]
  }
 ]
}

I am writing my read line as:

sqlContext.read.json("user/files_fold/testing-data.json").printSchema

But I not getting the desired result-

root                                                                            
  |-- _corrupt_record: string (nullable = true)

Please help me on this

2

6 Answers 6

5

I suggest using wholeTextFiles to read the file and apply some functions to convert it to a single-line JSON format.

val json = sc.wholeTextFiles("/user/files_fold/testing-data.json").
  map(tuple => tuple._2.replace("\n", "").trim)

val df = sqlContext.read.json(json)

You should have the final valid dataframe as

+--------------------------------------------------------------------------------------------------------+---------+
|atom                                                                                                    |titlename|
+--------------------------------------------------------------------------------------------------------+---------+
|[[WrappedArray([+02:00,15,28128,2017-03-27T22:00:00Z], [+02:00,15,25687,2017-03-27T22:15:00Z]),neutron]]|periodic |
+--------------------------------------------------------------------------------------------------------+---------+

And valid schema as

root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
Sign up to request clarification or add additional context in comments.

1 Comment

If I need to flatten this dataframe, how can it be acheived? simple explode is not working
3

Spark 2.2 introduced multiLine option which can be used to load JSON (not JSONL) files:

spark.read
.option("multiLine", true).option("mode", "PERMISSIVE")
  .json("/path/to/user.json")

Comments

1

This has already been answered nicely by other contributors, but I had one question which is how do i access each nested value/unit of the dataframe.

So, for collections, we can use explode and for struct types we can directly call the unit by dot(.).

scala> val a = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json("file:///home/hdfs/spark_2.json")
a: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string]

scala> a.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)


scala> val b = a.withColumn("exploded_atom", explode(col("atom")))
b: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 1 more field]

scala> b.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
 |-- exploded_atom: struct (nullable = true)
 |    |-- dailydata: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |-- usage: string (nullable = true)


scala>

scala> val c = b.withColumn("exploded_atom_struct", explode(col("`exploded_atom`.dailydata")))
c: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 2 more fields]

scala>

scala> c.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
 |-- exploded_atom: struct (nullable = true)
 |    |-- dailydata: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |-- usage: string (nullable = true)
 |-- exploded_atom_struct: struct (nullable = true)
 |    |-- datatimezone: string (nullable = true)
 |    |-- intervaltime: long (nullable = true)
 |    |-- intervalvalue: long (nullable = true)
 |    |-- utcacquisitiontime: string (nullable = true)


scala> val d = c.withColumn("exploded_atom_struct_last", col("`exploded_atom_struct`.utcacquisitiontime"))
d: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 3 more fields]


scala> d.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
 |-- exploded_atom: struct (nullable = true)
 |    |-- dailydata: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |-- usage: string (nullable = true)
 |-- exploded_atom_struct: struct (nullable = true)
 |    |-- datatimezone: string (nullable = true)
 |    |-- intervaltime: long (nullable = true)
 |    |-- intervalvalue: long (nullable = true)
 |    |-- utcacquisitiontime: string (nullable = true)
 |-- exploded_atom_struct_last: string (nullable = true)


scala> val d = c.select(col("titlename"), col("exploded_atom_struct.*"))
d: org.apache.spark.sql.DataFrame = [titlename: string, datatimezone: string ... 3 more fields]

scala> d.show
+---------+------------+------------+-------------+--------------------+
|titlename|datatimezone|intervaltime|intervalvalue|  utcacquisitiontime|
+---------+------------+------------+-------------+--------------------+
| periodic|      +02:00|          15|        28128|2017-03-27T22:00:00Z|
| periodic|      +02:00|          15|        25687|2017-03-27T22:15:00Z|
+---------+------------+------------+-------------+--------------------+

So thought of posting it here, in case if anyone has similar questions seeing this question.

Comments

0

It probably has something to do with the JSON object stored inside your file, could you print it or make sure it's the one you provided in the question? I'm asking because I took that one and it runs just fine:

val json =
  """
    |{
    |  "titlename": "periodic",
    |  "atom": [
    |    {
    |      "usage": "neutron",
    |      "dailydata": [
    |        {
    |          "utcacquisitiontime": "2017-03-27T22:00:00Z",
    |          "datatimezone": "+02:00",
    |          "intervalvalue": 28128,
    |          "intervaltime": 15
    |        },
    |        {
    |          "utcacquisitiontime": "2017-03-27T22:15:00Z",
    |          "datatimezone": "+02:00",
    |          "intervalvalue": 25687,
    |          "intervaltime": 15
    |        }
    |      ]
    |    }
    |  ]
    |}
  """.stripMargin

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.read
  .json(spark.sparkContext.parallelize(Seq(json)))
  .printSchema()

Comments

0

From the Apache Spark SQL Docs

Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object.

Thus,

{ "titlename": "periodic","atom": [{ "usage": "neutron", "dailydata": [ {"utcacquisitiontime": "2017-03-27T22:00:00Z","datatimezone": "+02:00","intervalvalue": 28128,"intervaltime":15},{"utcacquisitiontime": "2017-03-27T22:15:00Z","datatimezone": "+02:00", "intervalvalue": 25687,"intervaltime": 15 }]}]}

And then:

val jsonDF = sqlContext.read.json("file")
jsonDF: org.apache.spark.sql.DataFrame = 
[atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, 
titlename: string]

Comments

0

You just need to add this statement with your read statement. It happens because your json is multiline option("multiLine", true).

spark.read.option("multiLine", true).option("mode", "PERMISSIVE")  .json("/path/to/user.json")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.