3

I need to union two dataframes and combine the columns by keys. The two datafrmaes have the same schema, for example:

root
|-- id: String (nullable = true)
|-- cMap: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)

I want to group by "id" and aggregate the "cMap" together to deduplicate. I tried the code:

val df = df_a.unionAll(df_b).groupBy("id").agg(collect_list("cMap") as "cMap").
rdd.map(x => {
    var map = Map[String,String]()
    x.getAs[Seq[Map[String,String]]]("cMap").foreach( y => 
        y.foreach( tuple =>
        {
            val key = tuple._1
            val value = tuple._2
            if(!map.contains(key))//deduplicate
                map += (key -> value)
        }))

    Row(x.getAs[String]("id"),map)
    })

But it seems collect_list cannnot be used to map structure:

org.apache.spark.sql.AnalysisException: No handler for Hive udf class org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Only primitive type arguments are accepted but map<string,string> was passed as parameter 1..;

Is there other solution for the problem?

2
  • Can you upgrade to 2.x? Hive isn't required for aggregate functions in 2.x Commented May 29, 2017 at 4:11
  • your mistake is that you are using org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList but you have to use is import org.apache.spark.sql.functions.collect_list and it should work then Commented May 29, 2017 at 5:48

3 Answers 3

8

Since Spark 3.0, you can:

  • transform your map to an array of map entries with map_entries
  • collect those arrays by your id using collect_set
  • flatten the collected array of arrays using flatten
  • then rebuild the map from flattened array using map_from_entries

See following code snippet where input is your input dataframe:

import org.apache.spark.sql.functions.{col, collect_set, flatten, map_entries, map_from_entries}

input
  .withColumn("cMap", map_entries(col("cMap")))
  .groupBy("id")
  .agg(map_from_entries(flatten(collect_set("cMap"))).as("cMap"))

Example

Given the following dataframe input:

+---+--------------------+
|id |cMap                |
+---+--------------------+
|1  |[k1 -> v1]          |
|1  |[k2 -> v2, k3 -> v3]|
|2  |[k4 -> v4]          |
|2  |[]                  |
|3  |[k6 -> v6, k7 -> v7]|
+---+--------------------+

The code snippet above returns the following dataframe:

+---+------------------------------+
|id |cMap                          |
+---+------------------------------+
|1  |[k1 -> v1, k2 -> v2, k3 -> v3]|
|3  |[k6 -> v6, k7 -> v7]          |
|2  |[k4 -> v4]                    |
+---+------------------------------+
Sign up to request clarification or add additional context in comments.

Comments

7

You have to use explode function on the map columns first to destructure maps into key and value columns, union the result datasets followed by distinct to de-duplicate and only then groupBy with some custom Scala coding to aggregate the maps.

Stop talking and let's do some coding then...

Given the datasets:

scala> a.show(false)
+---+-----------------------+
|id |cMap                   |
+---+-----------------------+
|one|Map(1 -> one, 2 -> two)|
+---+-----------------------+

scala> a.printSchema
root
 |-- id: string (nullable = true)
 |-- cMap: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

scala> b.show(false)
+---+-------------+
|id |cMap         |
+---+-------------+
|one|Map(1 -> one)|
+---+-------------+

scala> b.printSchema
root
 |-- id: string (nullable = true)
 |-- cMap: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

You should first use explode function on the map columns.

explode(e: Column): Column Creates a new row for each element in the given array or map column.

val a_keyValues = a.select('*, explode($"cMap"))
scala> a_keyValues.show(false)
+---+-----------------------+---+-----+
|id |cMap                   |key|value|
+---+-----------------------+---+-----+
|one|Map(1 -> one, 2 -> two)|1  |one  |
|one|Map(1 -> one, 2 -> two)|2  |two  |
+---+-----------------------+---+-----+

val b_keyValues = b.select('*, explode($"cMap"))

With the following you have distinct key-value pairs which is exactly deduplication you asked for.

val distinctKeyValues = a_keyValues.
  union(b_keyValues).
  select("id", "key", "value").
  distinct // <-- deduplicate
scala> distinctKeyValues.show(false)
+---+---+-----+
|id |key|value|
+---+---+-----+
|one|1  |one  |
|one|2  |two  |
+---+---+-----+

Time for groupBy and create the final map column.

val result = distinctKeyValues.
  withColumn("map", map($"key", $"value")).
  groupBy("id").
  agg(collect_list("map")).
  as[(String, Seq[Map[String, String]])]. // <-- leave Rows for typed pairs
  map { case (id, list) => (id, list.reduce(_ ++ _)) }. // <-- collect all entries under one map
  toDF("id", "cMap") // <-- give the columns their names
scala> result.show(truncate = false)
+---+-----------------------+
|id |cMap                   |
+---+-----------------------+
|one|Map(1 -> one, 2 -> two)|
+---+-----------------------+

Please note that as of Spark 2.0.0 unionAll has been deprecated and union is the proper union operator:

(Since version 2.0.0) use union()

4 Comments

Thanks for the solution. I found after applying select('*, explode($"xxx")), the amount of records decrease a lot, i guess it is because of the null or empty Map value. But I am not sure. As I have another columns in my real projects, I seperate the columns into string type ones and map type one, aggregate seperately and join together. By the way, what is the syntax for '* . Thanks.
"By the way, what is the syntax for '*" <-- it's another version of $"*"
This is phenomenal. Spark devs should work to implement a collect_map() sql function with a couple of different signatures.
@JacekLaskowski Is this works only for particular spark version ? Cuz I followed your approach but getting exception - stackoverflow.com/questions/60750717/…
0

I agree with @Shankar. Your codes seems to be flawless.

The only mistake I assume you are doing is that you are importing wrong library.

You had to import

import org.apache.spark.sql.functions.collect_list

But I guess you are importing

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList

I hope I am guessing it right.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.