5

I need to transform a DataFrame in which one of the columns consists of a list of tuples, each item in each of the tuples has to be a separate column.

Here is an example and a solution in Pandas:

import pandas as pd

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame.from_dict(df_dict)
print(df)  # intial structure

           a    d
    1   stuff   [(1, 2), (3, 4)]
    2   stuff2  [(1, 2), (3, 4)]

# first transformation, let's separate each list item into a new row
row_breakdown = df.set_index(["a"])["d"].apply(pd.Series).stack()
print(row_breakdown)

            a        
    stuff   0    (1, 2)
            1    (3, 4)
    stuff2  0    (1, 2)
            1    (3, 4)
    dtype: object

row_breakdown = row_breakdown.reset_index().drop(columns=["level_1"])
print(row_breakdown)

    a   0
    0   stuff   (1, 2)
    1   stuff   (3, 4)
    2   stuff2  (1, 2)
    3   stuff2  (3, 4)

# second transformation, let's get each tuple item into a separate column
row_breakdown.columns = ["a", "d"]
row_breakdown = row_breakdown["d"].apply(pd.Series)
row_breakdown.columns = ["value_1", "value_2"]
print(row_breakdown)

        value_1 value_2
    0   1   2
    1   3   4
    2   1   2
    3   3   4

This is the pandas solution. I need to be able to do the same but using PySpark (2.3). I have started working on it, but immediately got stuck:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)

row_breakdown = ddf.set_index(["a"])["d"].apply(pd.Series).stack()

    AttributeError: 'DataFrame' object has no attribute 'set_index'

Apparently, Spark doesn't support indexing. Any pointers appreciated.

2
  • Spark DataFrames are unordered- so they don't have an index. What's the purpose of the key a here? All the information you need to construct the output is in d, unless I'm missing something. Commented Sep 9, 2018 at 14:43
  • 1
    I think it could be as simple as row_breakdown = spark.createDataFrame(chain.from_iterable(df_dict["d"].values()), ["value1", "value2"]) (where chain is from itertools import chain) Commented Sep 9, 2018 at 15:10

2 Answers 2

2

This might do:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
import pandas as pd

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)


exploded = ddf.withColumn('d', F.explode("d"))
exploded.show()

Result:

+------+------+
|     a|     d|
+------+------+
| stuff|[1, 2]|
| stuff|[3, 4]|
|stuff2|[1, 2]|
|stuff2|[3, 4]|
+------+------+

I feel more comfortable using SQL for this:

exploded.createOrReplaceTempView("exploded")
spark.sql("SELECT a, d._1 as value_1, d._2 as value_2 FROM exploded").show()

Important note: the reason why this is using the _1 and _2 accessors is because spark parsed the tuple as a structure and gave it default keys. If in your real implementation the dataframe contains an array<int>, you should use the [0] syntax.

The final result is:

+------+-------+-------+
|     a|value_1|value_2|
+------+-------+-------+
| stuff|      1|      2|
| stuff|      3|      4|
|stuff2|      1|      2|
|stuff2|      3|      4|
+------+-------+-------+
Sign up to request clarification or add additional context in comments.

Comments

1

Update

If you're starting from a DataFrame with the following schema:

ddf.printSchema()
#root
# |-- a: string (nullable = true)
# |-- d: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- _1: long (nullable = true)
# |    |    |-- _2: long (nullable = true)

You have to use pyspark.sql.functions.explode to explode the array into columns, but after that you can use the * selector to turn the struct into columns:

from pyspark.sql.functions import explode

row_breakdown = ddf.select("a", explode("d").alias("d")).select("a", "d.*")
row_breakdown.show()
#+------+---+---+
#|     a| _1| _2|
#+------+---+---+
#| stuff|  1|  2|
#| stuff|  3|  4|
#|stuff2|  1|  2|
#|stuff2|  3|  4|
#+------+---+---+

And to rename the columns, you can use a list comprehension with str.replace:

from pyspark.sql.functions import col

row_breakdown = row_breakdown.select(
    *[col(c).alias(c.replace("_", "value")) for c in row_breakdown.columns]
)
row_breakdown.show()
#+------+------+------+
#|     a|value1|value2|
#+------+------+------+
#| stuff|     1|     2|
#| stuff|     3|     4|
#|stuff2|     1|     2|
#|stuff2|     3|     4|
#+------+------+------+

Original Answer

If you're starting from the dictionary, you don't need to use pandas at all for this.

Instead, you can create your DataFrame directly from your dictionary. The key is to transform your dictionary into the appropriate format, and then use that to build your Spark DataFrame.

In your example, it seems like you are not using the values under the a key at all.

As I mentioned in my comment, you can achieve the described output with the following code:

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

from itertools import chain
row_breakdown = spark.createDataFrame(
    chain.from_iterable(df_dict["d"].values()), ["value1", "value2"]
)
row_breakdown.show()
#+------+------+
#|value1|value2|
#+------+------+
#|     1|     2|
#|     3|     4|
#|     1|     2|
#|     3|     4|
#+------+------+

If you want an index-like column, you can achieve that by simply using enumerate, as in the following example. Here I am also sorting the values by the key, as that seems to be your intention.

data = (
    (i,) + v for i, v in enumerate(
        chain.from_iterable(
            v for k, v in sorted(df_dict["d"].items(), key=lambda (key, val): key)
        )
    )
)
columns = ["index", "value1", "value2"]
row_breakdown = spark.createDataFrame(data, columns)
row_breakdown.show()
#+-----+------+------+
#|index|value1|value2|
#+-----+------+------+
#|    0|     1|     2|
#|    1|     3|     4|
#|    2|     1|     2|
#|    3|     3|     4|
#+-----+------+------+

As you can see here, we can pass a generator expression to spark.createDataFrame, and this solution does not require us to know the length of the tuples ahead of time.

1 Comment

I don't start with a dictionary. I already use a DataFrame as per ddf = spark.createDataFrame(df), the dictionary is just there for example. Using chain.from_iterable(ddf["d"].values()), ["value1", "value2"] gives me an error in PySpark: the column is not callable.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.