1

Suppose we have a DataFrame with a column of map type.

val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")
df.show()
// +--------------------+
// |           mapColumn|
// +--------------------+
// |{foo -> 1, bar -> 2}|
// +--------------------+

What is the most straightforward way to convert it to a struct (or, equivalently, define a new column with the same keys and values but as a struct type)? See the following spark-shell (2.4.5) session, for an insanely inefficient way of going about it:

val jsonStr = df.select(to_json($"mapColumn")).collect()(0)(0).asInstanceOf[String]

spark.read.json(Seq(jsonStr).toDS()).show()
// +---+---+
// |bar|foo|
// +---+---+
// |  2|  1|
// +---+---+

Now, obviously collect() is very inefficient, and this is generally an awful way to do things in Spark. But what is the preferred way to accomplish this conversion? named_struct and struct both take a sequence of parameter values to construct the results, but I can't find any way to "unwrap" the map key/values to pass them to these functions.

4 Answers 4

2

I would use explode function:

+--------------------+
|           mapColumn|
+--------------------+
|[foo -> 1, bar -> 2]|
+--------------------+

df.select(explode('mapColumn)).select(struct('*).as("struct"))

output:

+--------+
|  struct|
+--------+
|[foo, 1]|
|[bar, 2]|
+--------+

root
 |-- struct: struct (nullable = false)
 |    |-- key: string (nullable = false)
 |    |-- value: integer (nullable = false)
Sign up to request clarification or add additional context in comments.

3 Comments

This does exactly what I want, but the weird, seemingly unbalanced single quotes are throwing me off...
in case it's not obvious, in a real life scenario with multiple columns you would use .select('*', struct('mapCol'))
I finally learned that my confusion was due to the Symbol syntax. I.e. this: stackoverflow.com/a/918613/375670
1

I see @chlebek answer but in case it should be kept in one row you can use an UDF

scala> val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")
df: org.apache.spark.sql.DataFrame = [mapColumn: map<string,int>]

scala> df.show
+--------------------+
|           mapColumn|
+--------------------+
|[foo -> 1, bar -> 2]|
+--------------------+

scala> case class KeyValue(key: String, value: String)
defined class KeyValue

scala> val toArrayOfStructs = udf((value: Map[String, String]) => value.map {
     |   case (k, v) => KeyValue(k, v)
     | }.toArray )
toArrayOfStructs: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StructType(StructField(key,StringType,true), StructField(value,StringType,true)),true),Some(List(MapType(StringType,StringType,true))))

scala> df.withColumn("alfa", toArrayOfStructs(col("mapColumn")))
res4: org.apache.spark.sql.DataFrame = [mapColumn: map<string,int>, alfa: array<struct<key:string,value:string>>]

scala> res4.show
+--------------------+--------------------+
|           mapColumn|                alfa|
+--------------------+--------------------+
|[foo -> 1, bar -> 2]|[[foo, 1], [bar, 2]]|
+--------------------+--------------------+


scala> res4.printSchema
root
 |-- mapColumn: map (nullable = false)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)
 |-- alfa: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)

Comments

1

Your method doesn't seem to work with more rows, and especially if they have different maps, like this one:

val df = Seq(
    (Map("foo"->1, "bar"->2)),
    (Map("foo"->3, "baz"->4))
).toDF("mapColumn")

df.show()
// +--------------------+
// |           mapColumn|
// +--------------------+
// |{foo -> 1, bar -> 2}|
// |{foo -> 3, baz -> 4}|
// +--------------------+

Your script would return...

val jsonStr = df.select(to_json($"mapColumn")).collect()(0)(0).asInstanceOf[String]
spark.read.json(Seq(jsonStr).toDS()).show()
// +---+---+
// |bar|foo|
// +---+---+
// |  2|  1|
// +---+---+

Solutions

  • map to columns:

    val json_col = to_json($"mapColumn")
    val json_schema = spark.read.json(df.select(json_col).as[String]).schema
    val df2 = df.withColumn("_c", from_json(json_col, json_schema)).select("_c.*")
    
    df2.show()
    // +----+----+---+
    // | bar| baz|foo|
    // +----+----+---+
    // |   2|null|  1|
    // |null|   4|  3|
    // +----+----+---+
    
  • map to struct (field names: "key", "value"):

    val df2 = df.select(explode(map_entries($"mapColumn")).as("struct"))
    df2.show()
    // +--------+
    // |  struct|
    // +--------+
    // |{foo, 1}|
    // |{bar, 2}|
    // |{foo, 3}|
    // |{baz, 4}|
    // +--------+
    
  • map to struct (field names: "foo", "bar", "baz"):

    val json_col = to_json($"mapColumn")
    val json_schema = spark.read.json(df.select(json_col).as[String]).schema
    val df2 = df.select(from_json(json_col, json_schema).as("struct"))
    
    df2.show()
    // +------------+
    // |      struct|
    // +------------+
    // |{2, null, 1}|
    // |{null, 4, 3}|
    // +------------+
    

Comments

0

Define a case class

case class Bean56(foo: Int, bar: Int)
//Define a bean Encoder
val personEncoder = Encoders.bean[Bean56](classOf[Bean56])

    val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")

//Map the output to required bean
    val Bean56s = df.map(row => {
      val map = row.getMap[String, Int](0)
      Bean56(map.getOrElse("foo", -1), map.getOrElse("bar", -1))
    })(personEncoder)  // Supply implicit Encoder of the bean
    Bean56s.foreach(println(_)) // Print the bean

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.