0

Suppose, I have the following the dataframe:

 id | col1 | col2 
-----------------
 x  |  p1  |  a1  
-----------------
 x  |  p2  |  b1
-----------------
 y  |  p2  |  b2
-----------------
 y  |  p2  |  b3
-----------------
 y  |  p3  |  c1

The distinct values from col1 which are (p1, p2, p3) alone with id will be used as columns for the final dataframe. Here, the id y has two col2 values (b2 and b3) for the same col1 value p2, so, p2 will be treated as an array type column. Therefore, the final dataframe will be

  id  |   p1   |   p2   |   p3
--------------------------------
  x   |   a1   |  [b1]  |  null
--------------------------------
  y   |  null  |[b2, b3]|  c1

How can I achieve the second dataframe efficiently from the first dataframe?

1 Answer 1

1

You are basically looking for table pivoting; for your case, groupBy id, pivot col1 as headers, and aggregate col2 as list using collect_list function:

df.groupBy("id").pivot("col1").agg(collect_list("col2")).show
+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|[a1]|    [b1]|  []|
|  y|  []|[b2, b3]|[c1]|
+---+----+--------+----+

If it's guaranteed that there's at most one value in p1 and p3 for each id, you can convert those columns to String type by getting the first item of the array:

df.groupBy("id").pivot("col1").agg(collect_list("col2"))
  .withColumn("p1", $"p1"(0)).withColumn("p3", $"p3"(0))
  .show
+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|  a1|    [b1]|null|
|  y|null|[b2, b3]|  c1|
+---+----+--------+----+

If you need to convert the column types dynamically, i.e. only use array type column types when you have to:

// get array Type columns
val arrayColumns = df.groupBy("id", "col1").agg(count("*").as("N"))
    .where($"N" > 1).select("col1").distinct.collect.map(row => row.getString(0))
// arrayColumns: Array[String] = Array(p2)

// aggregate / pivot data frame
val aggDf = df.groupBy("id").pivot("col1").agg(collect_list("col2"))
// aggDf: org.apache.spark.sql.DataFrame = [id: string, p1: array<string> ... 2 more fields]

// get string columns
val stringColumns = aggDf.columns.filter(x => x != "id" && !arrayColumns.contains(x))

// use foldLeft on string columns to convert the columns to string type
stringColumns.foldLeft(aggDf)((df, x) => df.withColumn(x, col(x)(0))).show
+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|  a1|    [b1]|null|
|  y|null|[b2, b3]|  c1|
+---+----+--------+----+
Sign up to request clarification or add additional context in comments.

8 Comments

I want p1 and p3 to be string type but not an array type.
I'm not really sure if that makes a lot of sense unless you are sure, there are at most one value in p1 and p3 for each id. In which case, you can extract the first element from p1 and p3. df.groupBy("id").pivot("col1").agg(collect_list("col2")).withColumn("p1", $"p1"(0)).withColumn("p3", $"p3"(0)).show should give what you need.
The thing is the distinct values from col1 are not fixed, they can be any number of elements.
In that case I would keep all the pivoted columns as Array Type instead of String type. At the end of day, if you are not even sure how many columns you have, how do you know what type they should be in?
I can find the names of the Array Type columns from the first dataframe using SELECT DISTINCT col1 FROM (SELECT id, col1, COUNT (*) AS rowCount FROM df GROUP BY id , col1 HAVING rowCount > 1. The main problem is that creating the final dataframe dynamically using the name of Array Type columns.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.