0

I am a newbee and trying to resolve the following problem. Any help is highly appreciated.

I have the following Json.

{
  "index": "identity",
  "type": "identity",
  "id": "100000",
  "source": {
    "link_data": {
      "source_Id": "0011245"
    },
    "attribute_data": {
      "first": {
        "val": [
          true
        ],
        "updated_at": "2011"
      },
      "second": {
        "val": [
          true
        ],
        "updated_at": "2010"
      }
    }
  }
}

Attributes under "attribute_data" may vary. it can have another attribute, say "third"

I am expecting the result in below format:

_index _type _id        source_Id   attribute_data   val      updated_at
ID     ID    randomid   00000       first            true    2000-08-08T07:51:14Z
ID     ID    randomid   00000       second           true    2010-08-08T07:51:14Z

I tried the following approach.

val df = spark.read.json("sample.json")

val res =  df.select("index","id","type","source.attribute_data.first.updated_at", "source.attribute_data.first.val", "source.link_data.source_id");

It just adds new column not the rows as following

 index     id     type          updated_at    val    source_id
identity 100000  identity        2011        [true]   0011245

2 Answers 2

1

Try the following:

import org.apache.spark.sql.functions._

import spark.implicits._
val df = spark.read.json("sample.json")

df.select($"id", $"index", $"source.link_data.source_Id".as("source_Id"),$"source.attribute_data.first.val".as("first"), explode($"source.attribute_data.second.val").as("second"), $"type")
.select($"id", $"index", $"source_Id", $"second", explode($"first"), $"type").show
Sign up to request clarification or add additional context in comments.

Comments

0

Here you go with the solution. Feel free to ask, if you need to understand anything:

val data = spark.read.json("sample.json")
val readJsonDf = data.select($"index", $"type", $"id", $"source.link_data.source_id".as("source_id"), $"source.attribute_data.*")
readJsonDf.show()

Initial Output:

+--------+--------+------+---------+--------------------+--------------------+ | index| type| id|source_id| first| second| +--------+--------+------+---------+--------------------+--------------------+ |identity|identity|100000| 0011245|[2011,WrappedArra...|[2010,WrappedArra...| +--------+--------+------+---------+--------------------+--------------------+

Then I did the dynamic transformation using the following lines of code:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

def transposeColumnstoRows(df: DataFrame, constantCols: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !constantCols.contains(c)}.unzip

  //a check if the required columns that needs to be transformed to rows are of the same structure
  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")

  val keyColsWIthValues = explode(array(
  cols.map(c => struct(lit(c).alias("columnKey"), col(c).alias("value"))): _*
  ))

  df.select(constantCols.map(col(_)) :+ keyColsWIthValues.alias("keyColsWIthValues"): _*)
}



val newDf = transposeColumnstoRows(readJsonDf, Seq("index","type","id","source_id"))
val requiredDf = newDf.select($"index",$"type",$"id",$"source_id",$"keyColsWIthValues.columnKey".as("attribute_data"),$"keyColsWIthValues.value.updated_at".as("updated_at"),$"keyColsWIthValues.value.val".as("val"))
requiredDf.show()

Final Output:

|   index|    type|    id|source_id|attribute_data|updated_at|   val|
+--------+--------+------+---------+--------------+----------+------+
|identity|identity|100000|  0011245|         first|      2011|[true]|
|identity|identity|100000|  0011245|        second|      2010|[true]|

Hope this solves your issue!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.