24

I need to creeate an new Spark DF MapType Column based on the existing columns where column name is the key and the value is the value.

As Example - i've this DF:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),
                      ('d23d', 1.5, 2.0, 2.2), 
                      ('as3d', 2.2, 4.3, 9.0)
                          ])
schema = StructType([StructField('key', StringType(), True),
                     StructField('metric1', FloatType(), True),
                     StructField('metric2', FloatType(), True),
                     StructField('metric3', FloatType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

+----+-------+-------+-------+
| key|metric1|metric2|metric3|
+----+-------+-------+-------+
|123k|    1.3|    6.3|    7.6|
|d23d|    1.5|    2.0|    2.2|
|as3d|    2.2|    4.3|    9.0|
+----+-------+-------+-------+

I'm already so far that i can create a structType from this:

nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric")
df2 = df.select("key", nameCol)

+----+-------------+
| key|       metric|
+----+-------------+
|123k|[1.3,6.3,7.6]|
|d23d|[1.5,2.0,2.2]|
|as3d|[2.2,4.3,9.0]|
+----+-------------+

But what i need is an metric column with am MapType where the key is the column name:

+----+-------------------------+
| key|                   metric|
+----+-------------------------+
|123k|Map(metric1 -> 1.3, me...|
|d23d|Map(metric1 -> 1.5, me...|
|as3d|Map(metric1 -> 2.2, me...|
+----+-------------------------+

Any hints how i can transform the data?

Thanks!

2 Answers 2

43

In Spark 2.0 or later you can use create_map. First some imports:

from pyspark.sql.functions import lit, col, create_map
from itertools import chain

create_map expects an interleaved sequence of keys and values which can be created for example like this:

metric = create_map(list(chain(*(
    (lit(name), col(name)) for name in df.columns if "metric" in name
)))).alias("metric")

and used with select:

df.select("key", metric)

With example data the result is:

+----+---------------------------------------------------------+
|key |metric                                                   |
+----+---------------------------------------------------------+
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)      |
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)      |
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)      |
+----+---------------------------------------------------------+

If you use an earlier version of Spark you'll have to use UDF:

from pyspark.sql import Column
from pyspark.sql.functions import struct
from pyspark.sql.types import DataType, DoubleType, StringType, MapType

def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column:
    args = [struct(lit(name), col(name)) for name in cols]
    as_map_ = udf(
        lambda *args: dict(args),
        MapType(StringType(), key_type)
    )
    return as_map_(*args)

which could be used as follows:

df.select("key", 
    as_map(*[name for name in df.columns if "metric" in name]).alias("metric"))
Sign up to request clarification or add additional context in comments.

2 Comments

your solutions looks nice, could it be used to answer: stackoverflow.com/questions/45445077/… ?
Saved me! Thank you
3

Just a bit cleaner version without chain

from pyspark.sql import functions as f
cols = ['col1', 'col2', 'col3']
cols_for_map = [func(col) for col in cols for func in [f.lit, f.col]]
df = df.withColumn('mapped_cols', f.create_map(*cols_for_map))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.