I wish to add columns into a variable number of PySpark dataframes recursively until they all share the same columns. (The added columns will be populated with null values). Below function works for 2 dataframes and my question is: how does this generalize for any number of dataframes (2, 3, etc.)?
def add_missing_col(df_1, df_2):
"""Compare column names in df_1 and df_2
and insert missing columns in df_2 with null values.
"""
missing_cols = [i for i in df_1.schema.names if i not in df_2.schema.names]
for i in missing_cols:
df_2 = df_2.withColumn(i, lit(None).cast(StringType()))
return df_2
I have tried with functools.reduce and defining the function signature as *dfs but am unsure how to go from here:
def add_missing_col_r(*dfs):
"""Compare column names in dfs and insert missing columns with null values recursively."""
return reduce(DataFrame.withColumn(lambda i : i for i in DataFrame.schema.names), dfs)
Is it a good idea to use a lambda function here, or is there a better approach ?
The test dataframes I am using:
# Test dataframes
df1 = spark.createDataFrame([(1, "foo1", "qux1"),
(2, "bar1", "quux1"),
(3, "baz1", "quuz1")],
("a", "b", "c"))
df2 = spark.createDataFrame([(4, "foo2"), (5, "baz2")], ("a", "c"))
df3 = spark.createDataFrame([("bar3", "bar3", "bar3", "bar3"),
("qux3", "quux3", "quuz3", "corge3"),
("grault3", "garply3", "waldo3", "fred3")
],
("b", "d", "e", "f")
)