0

I have this table

df = spark.createDataFrame(
    [
        (1, 12345, "[email protected]", "2020-01-01"),
        (1, 12345, "[email protected]", "2020-01-02"),
        (1, 23456, "[email protected]", "2020-01-03"),
        (1, 34567, "[email protected]", "2020-01-04"),
        (1, 12345, "[email protected]", "2020-01-05"),
        (1, 45678, "[email protected]", "2020-01-06"),
        (1, 45678, "[email protected]", "2020-01-07"),
        (2, 56789, "[email protected]", "2020-01-01"),
        (2, 56789, "[email protected]", "2020-01-02"),
        (2, 56789, "[email protected]", "2020-01-03"),
        (2, 67890, "[email protected]", "2020-01-04"),
        (2, 67890, "[email protected]", "2020-01-05"),
        (3, 78901, "[email protected]", "2020-01-01"),
        (3, 78901, "[email protected]", "2020-01-02"),
        (3, 78901, "[email protected]", "2020-01-03"),
    ],
    ["id", "phone_number", "email", "date"],
)

from which I want to select all the rows that are either the first date per ID, or that the phone number or email address has changed since the previous date.

I have achieved this by creating a temp view and then executing a raw SQL query on it like so:

df.createOrReplaceTempView("df")

df = spark.sql(
    """
    SELECT  a.*
    FROM (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS a
    LEFT JOIN (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS b
    ON a.row = b.row + 1 AND a.id = b.id
    WHERE a.phone_number != b.phone_number OR b.phone_number IS NULL OR a.email != b.email OR b.email IS NULL
    """
)

However, I would prefer to have pure PySpark functions used to acheieve the same result. How can I translate this SQL query into PySpark?

This is what I have tried so far:

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

a = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
b = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))

df = a.join(b, on=[a.row == b.row + 1, a.id == b.id], how="left").where(
    (a.phone_number != b.phone_number)
    | (b.phone_number.isNull())
    | (a.email != b.email)
    | (b.email.isNull())
)
2
  • first date you mean the oldest ? Commented Nov 4, 2020 at 17:20
  • @Steven yes, the oldest record per ID, and all the records that have changed since the previous date per ID Commented Nov 4, 2020 at 17:22

1 Answer 1

1

I would do it a bit differently. Not following your SQL but applying your business rules directly :

w = Window.partitionBy("id").orderBy("date")

df.withColumn(
    "rnk", F.row_number().over(w)
).withColumn(
    "old", F.lag(F.struct([F.col("phone_number"), F.col("email")])).over(w)
).where(
    (F.col("rnk") == 1)
    | (F.col("phone_number") != F.col("old.phone_number"))
    | (F.col("email") != F.col("old.email"))
).show()

+---+------------+-----------+----------+---+--------------------+
| id|phone_number|      email|      date|rnk|                 old|
+---+------------+-----------+----------+---+--------------------+
|  1|       12345|[email protected]|2020-01-01|  1|                null|
|  1|       23456|[email protected]|2020-01-03|  3|[12345, [email protected]]|
|  1|       34567|[email protected]|2020-01-04|  4|[23456, [email protected]]|
|  1|       12345|[email protected]|2020-01-05|  5|[34567, [email protected]]|
|  1|       45678|[email protected]|2020-01-06|  6|[12345, [email protected]]|
|  3|       78901|[email protected]|2020-01-01|  1|                null|
|  2|       56789|[email protected]|2020-01-01|  1|                null|
|  2|       56789|[email protected]|2020-01-03|  3|[56789, [email protected]]|
|  2|       67890|[email protected]|2020-01-04|  4|[56789, [email protected]]|
+---+------------+-----------+----------+---+--------------------+

NB : You can replace the test on rnk with a test on F.col("old").isNull() (and therefore, you do not have to compute the rnk)

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.