You can cross join to a filtered df2 and use when to only keep the values in df1 when the flag is not equal to 0.
import pyspark.sql.functions as F
out_df_refA = (df1.alias('df1')
.crossJoin(df2.filter("ref = 'A'").drop('ref').alias('df2'))
.select(*[F.when(F.col('df2.' + c) != 0, F.col('df1.' + c)).alias(c) for c in df1.columns])
)
out_df_refA.show()
+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
| abc| null| abc456|
| def| null| xyz098|
+-------+-------+-------+
import pyspark.sql.functions as F
out_df_refB = (df1.alias('df1')
.crossJoin(df2.filter("ref = 'B'").drop('ref').alias('df2'))
.select(*[F.when(F.col('df2.' + c) != 0, F.col('df1.' + c)).alias(c) for c in df1.columns])
)
out_df_refB.show()
+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
| null| null| abc456|
| null| null| xyz098|
+-------+-------+-------+