0

I wish to add columns into a variable number of PySpark dataframes recursively until they all share the same columns. (The added columns will be populated with null values). Below function works for 2 dataframes and my question is: how does this generalize for any number of dataframes (2, 3, etc.)?

def add_missing_col(df_1, df_2):
    """Compare column names in df_1 and df_2
    and insert missing columns in df_2 with null values.
    """
    missing_cols = [i for i in df_1.schema.names if i not in df_2.schema.names]
    for i in missing_cols:
        df_2 = df_2.withColumn(i, lit(None).cast(StringType()))
    return df_2

I have tried with functools.reduce and defining the function signature as *dfs but am unsure how to go from here:

def add_missing_col_r(*dfs):
    """Compare column names in dfs and insert missing columns with null values recursively."""
    return reduce(DataFrame.withColumn(lambda i : i for i in DataFrame.schema.names), dfs)

Is it a good idea to use a lambda function here, or is there a better approach ?

The test dataframes I am using:

# Test dataframes

df1 = spark.createDataFrame([(1, "foo1", "qux1"),
                             (2, "bar1", "quux1"),
                             (3, "baz1", "quuz1")],
                             ("a", "b", "c"))

df2 = spark.createDataFrame([(4, "foo2"), (5, "baz2")], ("a", "c"))

df3 = spark.createDataFrame([("bar3", "bar3", "bar3", "bar3"),
                             ("qux3", "quux3", "quuz3", "corge3"),
                             ("grault3", "garply3", "waldo3", "fred3")
                            ],
                            ("b", "d", "e", "f")
                            )

1 Answer 1

2

I'm not sure if reduce is appropriate here. Just using normal python will be alright. If you want to have the resulting columns in correct order then check out my previous answer to your other question.

dfs = [df1, df2, df3]
all_cols = set(sum([i.columns for i in dfs], []))

def add_missing_col_r(dfs):
    return_dfs = []
    for df in dfs:
        missing_cols = all_cols - set(df.columns)
        for i in sorted(missing_cols):
            df = df.withColumn(i, lit(None).cast(StringType()))
        return_dfs.append(df)
    return return_dfs

new_dfs = add_missing_col_r(dfs)

[x.show() for x in new_dfs]
+---+----+-----+----+----+----+
|  a|   b|    c|   d|   e|   f|
+---+----+-----+----+----+----+
|  1|foo1| qux1|null|null|null|
|  2|bar1|quux1|null|null|null|
|  3|baz1|quuz1|null|null|null|
+---+----+-----+----+----+----+

+---+----+----+----+----+----+
|  a|   c|   b|   d|   e|   f|
+---+----+----+----+----+----+
|  4|foo2|null|null|null|null|
|  5|baz2|null|null|null|null|
+---+----+----+----+----+----+

+-------+-------+------+------+----+----+
|      b|      d|     e|     f|   a|   c|
+-------+-------+------+------+----+----+
|   bar3|   bar3|  bar3|  bar3|null|null|
|   qux3|  quux3| quuz3|corge3|null|null|
|grault3|garply3|waldo3| fred3|null|null|
+-------+-------+------+------+----+----+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.