Using Scala
1. Reading the House JSON data. Please note that, I am giving one json input row in single line
{ "House1": { "House_Id": "1", "Cover": "1.000", "HouseType": "bungalow", "Facing": "South", "Region": "YVR", "Ru": "1", "HVAC": [ "FAGF", "FPG", "HP" ] } }
{ "House2": { "House_Id": "2", "Cover": "1.000", "HouseType": "bungalow", "Facing": "North", "Region": "YVR", "Ru": "1", "HVAC": [ "FAGF", "FPG", "HP" ] } }
Code
val houseDS = spark.read.json("<JSON_FILE_PATH>");
houseDS.printSchema
root
|-- House1: struct (nullable = true)
| |-- Cover: string (nullable = true)
| |-- Facing: string (nullable = true)
| |-- HVAC: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- HouseType: string (nullable = true)
| |-- House_Id: string (nullable = true)
| |-- Region: string (nullable = true)
| |-- Ru: string (nullable = true)
|-- House2: struct (nullable = true)
| |-- Cover: string (nullable = true)
| |-- Facing: string (nullable = true)
| |-- HVAC: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- HouseType: string (nullable = true)
| |-- House_Id: string (nullable = true)
| |-- Region: string (nullable = true)
| |-- Ru: string (nullable = true)
houseDS.show(false)
+----------------------------------------------------+----------------------------------------------------+
|House1 |House2 |
+----------------------------------------------------+----------------------------------------------------+
|[1.000, South, [FAGF, FPG, HP], bungalow, 1, YVR, 1]|null |
|null |[1.000, North, [FAGF, FPG, HP], bungalow, 2, YVR, 1]|
+----------------------------------------------------+----------------------------------------------------+
2. We are using the stack() function to separate multiple columns into rows. Here is the stack function syntax: stack(n, expr1, ..., exprk) - Separates expr1, ..., exprk into n rows.
val houseDS2 = houseDS.select(expr("stack(2,House1, 'House1', House2, 'House2') as (house,HouseName)")).na.drop
houseDS2.printSchema
root
|-- house: struct (nullable = true)
| |-- Cover: string (nullable = true)
| |-- Facing: string (nullable = true)
| |-- HVAC: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- HouseType: string (nullable = true)
| |-- House_Id: string (nullable = true)
| |-- Region: string (nullable = true)
| |-- Ru: string (nullable = true)
|-- HouseName: string (nullable = true)
3. Then selecting all the required columns from above houseDS2 DataSet
val finalHouseDS = houseDS2.select("HouseName","house.House_Id","house.Cover","house.HouseType","house.Facing","house.Region","house.Ru","house.HVAC")
finalHouseDS.show(false)
Your expected output
+---------+--------+-----+---------+------+------+---+---------------+
|HouseName|House_Id|Cover|HouseType|Facing|Region|Ru |HVAC |
+---------+--------+-----+---------+------+------+---+---------------+
|House1 |1 |1.000|bungalow |South |YVR |1 |[FAGF, FPG, HP]|
|House2 |2 |1.000|bungalow |North |YVR |1 |[FAGF, FPG, HP]|
+---------+--------+-----+---------+------+------+---+---------------+
You can implement similarly in Java. Please let me know if you face any performance issue for larger DataSet.
Using Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.functions.*;
public class ParseJson {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "D:\\Software\\Hadoop");
SparkSession spark = SparkSession
.builder()
.appName("Testing")
.master("local[*]")
.getOrCreate();
// Read json data
Dataset<Row> houseDS = spark.read().json("<JSON_FILE_PATH>");
houseDS.printSchema();
Dataset<Row> houseDS2 = houseDS.selectExpr("stack(2,House1, 'House1', House2, 'House2') as (house,HouseName)").na().drop();
houseDS2.printSchema();
Dataset<Row> finalHouseDS = houseDS2.select("HouseName","house.House_Id","house.Cover","house.HouseType","house.Facing","house.Region","house.Ru","house.HVAC");
finalHouseDS.show(false);
}
}