I have this table
df = spark.createDataFrame(
[
(1, 12345, "[email protected]", "2020-01-01"),
(1, 12345, "[email protected]", "2020-01-02"),
(1, 23456, "[email protected]", "2020-01-03"),
(1, 34567, "[email protected]", "2020-01-04"),
(1, 12345, "[email protected]", "2020-01-05"),
(1, 45678, "[email protected]", "2020-01-06"),
(1, 45678, "[email protected]", "2020-01-07"),
(2, 56789, "[email protected]", "2020-01-01"),
(2, 56789, "[email protected]", "2020-01-02"),
(2, 56789, "[email protected]", "2020-01-03"),
(2, 67890, "[email protected]", "2020-01-04"),
(2, 67890, "[email protected]", "2020-01-05"),
(3, 78901, "[email protected]", "2020-01-01"),
(3, 78901, "[email protected]", "2020-01-02"),
(3, 78901, "[email protected]", "2020-01-03"),
],
["id", "phone_number", "email", "date"],
)
from which I want to select all the rows that are either the first date per ID, or that the phone number or email address has changed since the previous date.
I have achieved this by creating a temp view and then executing a raw SQL query on it like so:
df.createOrReplaceTempView("df")
df = spark.sql(
"""
SELECT a.*
FROM (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS a
LEFT JOIN (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS b
ON a.row = b.row + 1 AND a.id = b.id
WHERE a.phone_number != b.phone_number OR b.phone_number IS NULL OR a.email != b.email OR b.email IS NULL
"""
)
However, I would prefer to have pure PySpark functions used to acheieve the same result. How can I translate this SQL query into PySpark?
This is what I have tried so far:
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
a = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
b = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
df = a.join(b, on=[a.row == b.row + 1, a.id == b.id], how="left").where(
(a.phone_number != b.phone_number)
| (b.phone_number.isNull())
| (a.email != b.email)
| (b.email.isNull())
)
first dateyou mean the oldest ?