1

I have a dataframe in spark. I want to get all the column names into one column(as key) and and all values into column (as value) group by id.

val df = spark.sqlContext.createDataFrame(Seq(("101"," FIXED"," 2000-01-01"," null"," null"," .0125484200"),("102"," VARRY"," 2018-09-14"," 4.3980"," 0.0"," .3518450000"), ("103"," FIXED"," 2001-02-01"," null"," null"," .0000023564"), ("103"," FIXED"," 2011-02-23"," 4.83"," 2414.6887"," .0020154800"), ("104"," FIXED"," 2000-01-01"," null"," null"," .0215487400"))).toDF("Id","type","datecol","value1"," value2","finalvalue")

df.show

+---+------+-----------+-------+----------+------------+
| Id|  type|    datecol| value1|    value2|  finalvalue|
+---+------+-----------+-------+----------+------------+
|101| FIXED| 2000-01-01|   null|      null| .0125484200|
|102| VARRY| 2018-09-14| 4.3980|       0.0| .3518450000|
|103| FIXED| 2001-02-01|   null|      null| .0000023564|
|103| FIXED| 2011-02-23|   4.83| 2414.6887| .0020154800|
|104| FIXED| 2000-01-01|   null|      null| .0215487400|
+---+------+-----------+-------+----------+------------+

I need to convert the dataframe as below

+---+-----------+------------+
| Id|       key |     value  |
+---+-----------+------------+
|101| type      |       FIXED|
|101| datecol   |  2000-01-01|
|101| value1    |        null|
|101| value2    |        null|
|101| finalvalue| .0125484200|
|102| type      |       VARRY|
|102| datecol   |  2000-09-14|
|102| value1    |      4.3980|
|102| value2    |         0.0|
|102| finalvalue| .3518450000|
|103| type      |       FIXED|
|103| datecol   |  2000-02-01|
|103| value1    |        null|
|103| value2    |        null|
|103| finalvalue| .0000023564|
|103| type      |       FIXED|
|103| datecol   |  2000-02-23|
|103| value1    |        4.83|
|103| value2    |   2414.6887|
|103| finalvalue| .0020154800|
|104| type      |       FIXED|
|104| datecol   |  2000-01-01|
|104| value1    |        null|
|104| value2    |        null|
|104| finalvalue| .0215487400|
+---+-----------+------------+

Any suggestions would be helpful

Thanks

3 Answers 3

3

You can achieve this using stack function.

Please see code snippet with output

val df = spark.sqlContext.createDataFrame(Seq(("101"," FIXED"," 2000-01-01"," null"," null"," .0125484200"),("102"," VARRY"," 2018-09-14"," 4.3980"," 0.0"," .3518450000"), ("103"," FIXED"," 2001-02-01"," null"," null"," .0000023564"), ("103"," FIXED"," 2011-02-23"," 4.83"," 2414.6887"," .0020154800"), ("104"," FIXED"," 2000-01-01"," null"," null"," .0215487400"))).toDF("Id","type","datecol","value1","value2","finalvalue")
df.show()
val unPivotDF = df.select($"Id",
expr("stack(5, 'type', type, 'datecol', datecol, 'value1', value1,'value2',value2,'finalvalue',finalvalue) as (Key,Value)"))
unPivotDF.show()

Output : scala> df.show()

+---+------+-----------+-------+----------+------------+
| Id|  type|    datecol| value1|    value2|  finalvalue|
+---+------+-----------+-------+----------+------------+
|101| FIXED| 2000-01-01|   null|      null| .0125484200|
|102| VARRY| 2018-09-14| 4.3980|       0.0| .3518450000|
|103| FIXED| 2001-02-01|   null|      null| .0000023564|
|103| FIXED| 2011-02-23|   4.83| 2414.6887| .0020154800|
|104| FIXED| 2000-01-01|   null|      null| .0215487400|
+---+------+-----------+-------+----------+------------+

scala> unPivotDF.show()

+---+----------+------------+
| Id|       Key|       Value|
+---+----------+------------+
|101|      type|       FIXED|
|101|   datecol|  2000-01-01|
|101|    value1|        null|
|101|    value2|        null|
|101|finalvalue| .0125484200|
|102|      type|       VARRY|
|102|   datecol|  2018-09-14|
|102|    value1|      4.3980|
|102|    value2|         0.0|
|102|finalvalue| .3518450000|
|103|      type|       FIXED|
|103|   datecol|  2001-02-01|
|103|    value1|        null|
|103|    value2|        null|
|103|finalvalue| .0000023564|
|103|      type|       FIXED|
|103|   datecol|  2011-02-23|
|103|    value1|        4.83|
|103|    value2|   2414.6887|
|103|finalvalue| .0020154800|
+---+----------+------------+

only showing top 20 rows

==== updated part with forming columns dynamically

val df = spark.sqlContext.createDataFrame(Seq(("101"," FIXED"," 2000-01-01"," null"," null"," .0125484200"),("102"," VARRY"," 2018-09-14"," 4.3980"," 0.0"," .3518450000"), ("103"," FIXED"," 2001-02-01"," null"," null"," .0000023564"), ("103"," FIXED"," 2011-02-23"," 4.83"," 2414.6887"," .0020154800"), ("104"," FIXED"," 2000-01-01"," null"," null"," .0215487400"))).toDF("Id","type","datecol","value1","value2","finalvalue")
df.show()

val skipColumn = "Id"
var columnCount = df.schema.size -1
df.columns
var columnsStr = ""
var counter = 0
for ( col <- df.columns ) {
    counter = counter + 1
    if(col != skipColumn) {
        if(counter == df.schema.size) {
        columnsStr = columnsStr + s"'$col', $col"    
    }
    else {
        columnsStr = columnsStr + s"'$col', $col,"
    }
    }
}
val unPivotDF = df.select($"Id",
expr(s"stack($columnCount, $columnsStr) as (Key,Value)"))
unPivotDF.show()

scala> unPivotDF.show()
+---+----------+------------+
| Id|       Key|       Value|
+---+----------+------------+
|101|      type|       FIXED|
|101|   datecol|  2000-01-01|
|101|    value1|        null|
|101|    value2|        null|
|101|finalvalue| .0125484200|
|102|      type|       VARRY|
|102|   datecol|  2018-09-14|
|102|    value1|      4.3980|
|102|    value2|         0.0|
|102|finalvalue| .3518450000|
|103|      type|       FIXED|
|103|   datecol|  2001-02-01|
|103|    value1|        null|
|103|    value2|        null|
|103|finalvalue| .0000023564|
|103|      type|       FIXED|
|103|   datecol|  2011-02-23|
|103|    value1|        4.83|
|103|    value2|   2414.6887|
|103|finalvalue| .0020154800|
+---+----------+------------+
only showing top 20 rows
Sign up to request clarification or add additional context in comments.

2 Comments

can we get columns names dynmically at expr("stack(5, 'type', type, 'datecol', datecol, 'value1', value1,'value2',value2,'finalvalue',finalvalue)
It is String that you can form from columns. I have updated answer with example even though it is very basic way.
1

If you are certain that all of your columns are strings, you can use something like this. Otherwise, you will have to think of another solution because your value column cannot contain mixed types. You can add a filter to exclude rows where key == "Id"if you do not want them in your result dataframe.

import org.apache.spark.sql.DataFrame
def toKeyValuePairs(df: DataFrame, keyColumn: String): DataFrame = {
  import spark.implicits._
  val n = df.schema.length
  val fieldNames = df.schema.fieldNames
  df.rdd.flatMap {
    row =>
      (0 until n).map { i => (row.getAs[String](keyColumn), fieldNames(i), row.getAs[String](i)) }
  }.toDF(keyColumn, "key", "value")
}
toKeyValuePairs(df, "Id").show(10)

+---+----------+------------+
| Id|       key|       value|
+---+----------+------------+
|101|        Id|         101|
|101|      type|       FIXED|
|101|   datecol|  2000-01-01|
|101|    value1|        null|
|101|    value2|        null|
|101|finalvalue| .0125484200|
|102|        Id|         102|
|102|      type|       VARRY|
|102|   datecol|  2018-09-14|
|102|    value1|      4.3980|
+---+----------+------------+

Comments

1

You can try map_from_arrays and then explode

val df2 = df.select(array('*).as("v"), lit(df.columns).as("k"))
    .select('v.getItem(0).as("id"), map_from_arrays('k,'v).as("map"))
    .select('id, explode('map))

result:

df2.show(10)

+---+----------+------------+
| id|       key|       value|
+---+----------+------------+
|101|        Id|         101|
|101|      type|       FIXED|
|101|   datecol|  2000-01-01|
|101|    value1|        null|
|101|    value2|        null|
|101|finalvalue| .0125484200|
|102|        Id|         102|
|102|      type|       VARRY|
|102|   datecol|  2018-09-14|
|102|    value1|      4.3980|
+---+----------+------------+
only showing top 10 rows

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.