1

I have table with map in it. I want to make 2 separate columns out of that map - 1. keys column 2. values column.

input.show();

+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
|addedSkuWithTimestamp|     fbaSKUAdditions|fbaSKURemovals|      merchantId|mfnSKUAdditions|mfnSKURemovals|removedSkuWithTimestamp|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
| [Test1 -> 1234567...|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|ANOTHER_MERCHANT|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|ANOTHER_MERCHANT|             []|          null|                   null|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+

But I want output as

test1  123456789 

Test2  123456780 

How to get 2 different columns (key column and value column) from map?

Dataset<Row> removed_skus = input
                    .withColumn("sku", functions.explode(input.col("removedSkuWithTimestamp")))
                    .withColumn("skuType", functions.lit("MFN"))
                    .select(input.col("merchantId").alias("merchant_id"), new Column("sku").,
                            new Column("skuType"))
                    .distinct()
                    .groupBy("merchant_id")
                    .agg(functions.collect_list("sku").alias("removedSkus"));

2 Answers 2

3

Using the same input from other answer

val df = Seq(
    (Map("timestamp1"->1585008000, "timestamp3"-> 1584921600), "AFN"),
    (Map("timestamp2"-> 1584835200), "AFN"),
    (null, "AFN") 
).toDF("addedSkuWithTimestamp", "skuType")

Try using explode, I tested this in spark 2.2.1 and 2.3.1

df.select(explode($"addedSkuWithTimestamp")).show(false)
+----------+----------+
|key       |value     |
+----------+----------+
|timestamp1|1585008000|
|timestamp3|1584921600|
|timestamp2|1584835200|
+----------+----------+
Sign up to request clarification or add additional context in comments.

Comments

1

First let's create some data:

val df = Seq(
    (Map("sku1"->"timestamp1"), "AFN"),
    (Map("sku2"->"timestamp2"), "AFN"),
    (null, "AFN") 
).toDF("addedSkuWithTimestamp", "skuType")
.show(false)

+---------------------+-------+
|addedSkuWithTimestamp|skuType|
+---------------------+-------+
| [sku1 -> timestamp1]|    AFN|
| [sku2 -> timestamp2]|    AFN|
|                 null|    AFN|
+---------------------+-------+

This will have the following schema:

scala> df.printSchema()

root
 |-- addedSkuWithTimestamp: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- skuType: string (nullable = true)

Spark < 2.3

The next code will extract the columns sku_key and sku_value from addedSkuWithTimestamp column using the mapToTupleUDF udf function:

val mapToTupleUDF = udf((sku: Map[String, String]) => if(sku != null) sku.toSeq(0) else null)

df.withColumn("addedSkuWithTimestamp", mapToTupleUDF($"addedSkuWithTimestamp"))
  .withColumn("Sku", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._1"))
  .withColumn("Timestamp", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._2"))
  .show(false)

+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1, timestamp1]   |AFN    |sku1|timestamp1|
|[sku2, timestamp2]   |AFN    |sku2|timestamp2|
|null                 |AFN    |null|null      |
+---------------------+-------+----+----------+

Note that we can access addedSkuWithTimestamp._1 only if addedSkuWithTimestamp is not null.

Spark >= 2.3

Since Spark 2.3.0 you can use the build-in map_values and map_keys:

df.withColumn("Sku", map_keys($"addedSkuWithTimestamp").getItem(0))
  .withColumn("Timestamp", map_values($"addedSkuWithTimestamp").getItem(0))
  .show(false)

Output:

+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1 -> timestamp1] |AFN    |sku1|timestamp1|
|[sku2 -> timestamp2] |AFN    |sku2|timestamp2|
|null                 |AFN    |null|null      |
+---------------------+-------+----+----------+

4 Comments

Its bit different. My structure is like this- scala> input.printSchema(); root |-- addedSkuWithTimestamp: map (nullable = true) | |-- key: string | |-- value: long (valueContainsNull = true) |-- fbaSKUAdditions: array (nullable = true) | |-- element: string (containsNull = true) |-- fbaSKURemovals: array (nullable = true) | |-- element: string (containsNull = true) |-- merchantId: string (nullable = true)
Got this output- scala> val trial3 = input.withColumn("sku", $"addedSkuWithTimestamp.key").withColumn("timestamp", $"addedSkuWithTimestamp.value").select(col("sku"), col("timestamp")); trial3: org.apache.spark.sql.DataFrame = [sku: bigint, timestamp: bigint] scala> trial3.show(); +----+---------+ | sku|timestamp| +----+---------+ |null| null| |null| null| |null| null|
Hi @Kavita can you please update your question with this information because is very hard to read it from comments :)?
Updated answer as well

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.