I need to applying a window function is PySpark but have to ignore certain rows while doing it.
I have tried the below code.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = (sc.parallelize([
{"id":"900","service":"MM", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-09-13 13:38:17.229" },
{"id":"900","service":"MM", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-09-13 13:38:17.242" },
{"id":"1527","service":"RA", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-10-17 14:52:02.331" },
{"id":"1527","service":"RT", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-10-17 14:52:02.490" },
{"id":"1527","service":"RP", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-10-17 14:52:02.647" },
{"id":"1504","service":"RA", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-10-17 22:28:25.095" },
{"id":"1504","service":"RT", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-10-17 22:28:25.253" },
{"id":"1504","service":"RP", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-10-17 22:28:25.372" },
{"id":"1504","service":"RV", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-10-17 22:28:25.732" },
{"id":"1504","service":"RA", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-09 02:05:53.445" },
{"id":"1504","service":"MT", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-09 02:05:53.643" },
{"id":"1504","service":"RA", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-09 02:05:53.924" },
{"id":"1504","service":"RT", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-09 02:05:54.094" },
{"id":"1504","service":"RP", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-09 02:05:54.243" },
{"id":"1504","service":"RV", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-09 02:05:54.732" },
{"id":"1504","service":"RA", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-11 20:52:30.764" },
{"id":"1504","service":"RV", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-11 20:52:31.099" },
{"id":"1504","service":"RT", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-11 20:52:33.363" },
{"id":"1504","service":"RV", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-11 20:52:33.677" },
{"id":"1504","service":"RP", "guid":"43158A8E-3DF2-4FD2-90C9-B73411BBE683" ,"time":"2018-11-11 20:52:39.572" }
]).toDF()
)
(
df
.withColumn
(
'rank',
F.when
(
(F.col('id') != 900),
F.row_number()
.over
(
Window.partitionBy
(
#F.when
#(
# (
# (F.col('id') != 90000)
#),
F.col('guid')
#)
)
.orderBy
(
F.col('time').asc()
)
)
)
)
.select
(
'id',
'service',
'guid',
'time',
'rank'
)
.show(truncate = False)
)
I almost have it but the row_numbers need to start from 1 instead of three in this case. So in the rank column the number after the two nulls should be 1 instead of 3.
