0

Is there a way to conditionally apply filter to a window function in pyspark? For every group in col1 I want to keep only rows that have X in col2. If a group doesn't have X in col2 I want to keep all rows in that group.

+------+------+
| col1 | col2 |
+------+------+
| A    |      |
+------+------+
| A    | X    |
+------+------+
| A    |      |
+------+------+
| B    |      |
+------+------+
| B    |      |
+------+------+
| B    |      |
+------+------+

2 Answers 2

2

You can do this with a max window function to denote the group (partitioned by col1) which has 'X' in col2 with an identifier (1 in this case). Groups which don't have 'X' will get assigned null. Thereafter just filter the intermediate dataframe to get the desired result.

from pyspark.sql import Window
from pyspark.sql.functions import max,when
w = Window.partitionBy(df.col1)
df_1 = df.withColumn('x_exists',max(when(df.col2 == 'X',1)).over(w))
df_2 = df_1.filter(((df_1.x_exists == 1) & (df_1.col2 == 'X')) | df_1.x_exists.isNull())
df_2.show()
Sign up to request clarification or add additional context in comments.

Comments

1

An alternative using collect_list with more SQL syntax: NULL value is skipped for collect_list, we use if(col2='X',1,NULL) as list item so that when no 'X' is shown in col2, size of this collect_list is ZERO:

from pyspark.sql.functions import expr                                                                              

df_new = df.withColumn('has_X', expr("size(collect_list(if(col2='X',1,NULL)) OVER (partition by col1))>0")) \
           .filter("col2 = 'X' OR !has_X")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.