3

I have a json string as :

{"user_rating": {"rating_text": "Excellent", "rating_color": "3F7E00", "votes": "778", "aggregate_rating": "4.5"}}

I wish to create a DataFrame out of it with DataFrame columns as:

rating_text | rating_color | votes | aggregate_rating

When I code it as :

val pdf = json.select("user_rating")

I get only one column user_rating

I approached this most voted solution but still getting only user_rating column : pdf.show()

Not sure how the Solution1 works exactly.

Solution 1 Solution 2

Unable to access columns by index as provided in Solution2. Getting NoSuchColumn column error.


What is the best possible approach to extract keys(rating_text,rating_color,..) and use as columns in a DataFrame?

Language I am using : Scala


Tried the below way to Iterate over each Row in DataFrame and parse by getting columns:

val pdf = json.select("restaurants.restaurant.user_rating")
pdf.map{Rrowow => (row.getStruct(0).getString(0))}.show()

Getting below exception at map function:

 java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.sql.Row
4
  • Hi @Harshvardhan, try again this time using json.select("user_rating.*") Commented Jan 19, 2020 at 17:20
  • Is a column containing json string or a file containing json rows? Commented Jan 19, 2020 at 17:28
  • @baitmbarek I got org.apache.spark.sql.AnalysisException: Can only star expand struct data types Exception trying with '.*' Commented Jan 20, 2020 at 8:19
  • @Salim Complete json is in a file, of which "user_rating" is a sub json String Commented Jan 20, 2020 at 8:20

3 Answers 3

2

You can parse a column containing JSON String and build a dataframe containing all columns within the JSON. Here is an example -

    val jsonData = """{"rating_text": "Excellent", "rating_color": "3F7E00", "votes": "778", "aggregate_rating": "4.5"}"""

    val schema = {StructType(
      List(
        StructField("rating_text", StringType, nullable = false),
        StructField("rating_color", StringType, nullable = false),
        StructField("votes", StringType, nullable = false),
        StructField("aggregate_rating", StringType, nullable = false)
      ))}

    val df = spark.createDataset(Seq(jsonData)).toDF("user_rating")
    val dfWithParsedJson = df.withColumn("json_data",from_json($"user_rating",schema))

    dfWithParsedJson.select($"user_rating",$"json_data.rating_text", $"json_data.rating_color",$"json_data.votes",$"json_data.aggregate_rating").show()

Result -

+--------------------+-----------+------------+-----+----------------+
|         user_rating|rating_text|rating_color|votes|aggregate_rating|
+--------------------+-----------+------------+-----+----------------+
|{"rating_text": "...|  Excellent|      3F7E00|  778|             4.5|
+--------------------+-----------+------------+-----+----------------+

If the json is on a file then you can simply read it by

    //file contains - {"user_rating": {"rating_text": "Excellent", "rating_color": "3F7E00", "votes": "778", "aggregate_rating": "4.5"}}
    val df = spark.read.json("path to test.txt")
    df.select("user_rating.rating_text").show()

You can read data from the Row object using index like,

    df.map{ row =>
      (row.getStruct(0).getString(0))
    }.show()

    //Used getStruct(index) because the data type is a complex class. for ordinary values you can use getString, getLong etc

I will highly recommend using schema to read and operate on json. This will save you tons of runtime error and much faster.

Sign up to request clarification or add additional context in comments.

5 Comments

Thnx much for the solution. But is there any dynamic way to get columns? Where the column names are not known before hand to construct schema? Like any way to access those columns via index!
Yes dynamic is possible. Let me make an example
Sure..Thank You :)
I have updated answer as per your need. Please accept the answer if it works.
Getting ClassCastException. Please see the updated question. I have pasted log there
1

I got the work around. Basically what I was looking for was explode function. Which returns a Row for each element in the column.

Comments

0

Any level of nested data can be un nested and create flat data frame using this kind of approach.


Logic is as below

  1. Understand the nesting level with either array or struct types.
  2. Loop throuh the nesting level and flatten using the below way.
import org.apache.log4j.Logger
import org.apache.spark.sql.functions.{col, explode}
import org.apache.spark.sql.types.{ArrayType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}

/** *
   * expand_nested_column : 
   * @param json_data_df_temp
   * @return [[DataFrame]]
   */
  def expand_nested_column(json_data_df_temp: DataFrame): DataFrame = {
    var json_data_df: DataFrame = json_data_df_temp
    var select_clause_list = List.empty[String]

    // Iterating each columns again to check if any next json data is exists
    for (column_name <- json_data_df.schema.names) {
      println("Outside isinstance loop: " + column_name)

      // Checking column type is ArrayType
      if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType]) {
        println("Inside isinstance loop: " + column_name)

        //Extracting nested json columns/data using explode function
        json_data_df = json_data_df.withColumn(column_name, explode(json_data_df(column_name)).alias(column_name))
        select_clause_list :+= column_name
      }
      else if (json_data_df.schema(column_name).dataType.isInstanceOf[StructType]) {
        println("Inside isinstance loop of StructType: " + column_name)
        for (field <- json_data_df.schema(column_name).dataType.asInstanceOf[StructType].fields) {
          select_clause_list :+= column_name + "." + field.name
        }
      }
      else {
        select_clause_list :+= column_name
      }
    }

    val columnNames = select_clause_list.map(name => col(name).alias(name.replace('.', '_')))

    // Selecting columns using select_clause_list from dataframe: json_data_df
    json_data_df.select(columnNames: _*)
  }



var json_data_df = spark.read.json(
      "{\"user_rating\": {\"rating_text\": \"Excellent\", \"rating_color\": \"3F7E00\", \"votes\": \"778\", \"aggregate_rating\": \"4.5\"}}".lines.toList.toDS()
    )
    json_data_df.show(10, false)
    json_data_df.printSchema()

    // Process the Nested Structure
    var nested_column_count = 1
    // Run the while loop until the nested_column_count is zero(0)
    while (nested_column_count != 0) {
      println("Printing nested_column_count: " + nested_column_count)

      var nested_column_count_temp = 0
      // Iterating each columns again to check if any next json data is exists

      for (column_name <- json_data_df.schema.names) {
        print("Iterating DataFrame Columns: " + column_name)
        // Checking column type is ArrayType
        if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType]
          || json_data_df.schema(column_name).dataType.isInstanceOf[StructType]) {
          nested_column_count_temp += 1
        }
      }
      if (nested_column_count_temp != 0) {
        json_data_df = expand_nested_column(json_data_df)
        json_data_df.show(100, false)
      }
      print("Printing nested_column_count_temp: " + nested_column_count_temp)
      nested_column_count = nested_column_count_temp
    }

    json_data_df.show(100, false)
    json_data_df.printSchema()

Result :

+-----------------------------+
|user_rating                  |
+-----------------------------+
|[4.5, 3F7E00, Excellent, 778]|
+-----------------------------+

root
 |-- user_rating: struct (nullable = true)
 |    |-- aggregate_rating: string (nullable = true)
 |    |-- rating_color: string (nullable = true)
 |    |-- rating_text: string (nullable = true)
 |    |-- votes: string (nullable = true)

Printing nested_column_count: 1
Iterating DataFrame Columns: user_ratingOutside isinstance loop: user_rating
Inside isinstance loop of StructType: user_rating
+----------------------------+------------------------+-----------------------+-----------------+
|user_rating_aggregate_rating|user_rating_rating_color|user_rating_rating_text|user_rating_votes|
+----------------------------+------------------------+-----------------------+-----------------+
|4.5                         |3F7E00                  |Excellent              |778              |
+----------------------------+------------------------+-----------------------+-----------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.