0

I am using Hivecontext to read a json file using following code:

df = hive_context.read.json("/Users/duttaam/Downloads/test.json")
df.registerTempTable("df");

By default spark determined the following schema

root
 |-- id: string (nullable = true)
 |-- profiles: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- app_id: string (nullable = true)
 |    |    |-- localytics: struct (nullable = true)
 |    |    |    |-- attributes: struct (nullable = true)
 |    |    |    |    |-- ap: long (nullable = true)
 |    |    |    |    |-- app_version: string (nullable = true)
 |    |    |    |    |-- birthdate: string (nullable = true)
 |    |    |    |    |-- country: string (nullable = true)
 |    |    |    |    |-- device_timezone: string (nullable = true)
 |    |    |    |    |-- language: string (nullable = true)
 |    |    |    |    |-- last_session_date: string (nullable = true)
 |    |    |    |    |-- library_version: string (nullable = true)
 |    |    |    |    |-- os_version: string (nullable = true)
 |    |    |    |    |-- push_enabled: long (nullable = true)
 |    |    |    |    |-- total_sessions: long (nullable = true)
 |    |    |    |    |-- user_type: string (nullable = true) 

My Json looks like as follows

{
  "id": "dsdasdasdsd",
  "profiles": [
    {
      "attributes": {
        "MDN": "eoe/W/5Ru1KAPDMQQ/wq\n/pu/tGRWpA=="
      },
      "localytics": {
        "attributes": {
          "last_session_date": "2016-07-17",
          "device_timezone": "-04:00",
          "country": "us",
          "language": "en",
          "user_type": "known",
          "city_name": "Indianapolis"
        }
      }
    },
    {
      "app_id": "sdas-c824fcf6-bbae-11e5-adasda-asasqwvz",
      "attributes": {
        "Automatic Backup User": "No"
      },
      "localytics": {
        "attributes": {
          "last_session_date": "2016-07-17",
          "os_version": "6.2.1",
          "app_version": "16.2.19.1",
          "library_version": "androida_3.7.0",
          "ap": 1,
          "custom_1": "Unknown (Not Logged In)",
          "total_sessions": 4,
          "birthdate": "2016-07-09",
          "push_enabled": 1,
          "user_type": "known",
          "custom_0": "Unknown (Not Logged In)",
          "seconds_since_last_session": 1457
        }
      }
    }
  ]
}

So by default Spark is not capturing attributes fields in both the profiles. Is there a way we can custom code ad change the schema structure?

Thanks in advance.

Regards, Amit

1
  • why -ive vote? IS this not a valid question Commented Jul 21, 2016 at 21:21

2 Answers 2

1

You can try using hivecontxt.jsonFile(infile):

from pyspark import SparkContext
from pyspark.sql import HiveContext
import json

sc = SparkContext()
hive_contxt = HiveContext(sc)

your_schema = hive_contxt.jsonFile(INFILE)
your_schema.registerTempTable('YOUR TITLE')

You can also query using hive_context.sql(YOUR QUERY).collect()

You can also try dumping your json into memory then using hive_context.jsonRDD(json_dumped_object)

def make_json_single_row(row, field_names):
    row_lst = row.split(';')
    return json.dumps(dict(zip(field_names, row_lst)))

def make_json(rdd, field_names):
    return rdd.map(lambda row: make_json_single_row(row, field_names)

field_names = ['column1','column2','column3']
rdd = sc.textfile(infile)
split_rdd = make_json(rdd, field_names)
your_new_schema = hive_contxt.jsonRDD(split_rdd)
Sign up to request clarification or add additional context in comments.

Comments

0

If you only want your profiles column
in your case, you could do this (but i'm sure not the best way):

Java:

import org.apache.spark.sql.functions;

DataFrame prof = df.select(functions.explode(df.col("profiles")).as("prof"));
prof.select("prof.app_id", "prof.attributes.*", "prof.localytics.attributes.*");

That requires you knowing well your json schema as a condition

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.