0

I would like to filter two ordered arrays in a struct that has fields dates, and values. Example DataFrame below followed by and explanation and an example of what I am trying to do.

from pyspark.sql import Row
import datetime

rows = [
    Row(
        id ='1111',
        A=Row(
            dates=[datetime.datetime(2015, 7, 29, 14, 27), datetime.datetime(2015, 7, 31, 14, 27)],
            values=[20.0, 100.0]),
        B=Row(
            dates=[datetime.datetime(2015, 4, 18, 17, 52)],
            values=[12.58])
    ),
    Row(
        id='2222',
        A=Row(
            dates=[datetime.datetime(2011, 4, 28, 14, 27), datetime.datetime(2011, 4, 28, 14, 27)],
            values=[100.0, None]),
        B=Row(
            dates=[datetime.datetime(2011, 4, 18, 17, 52)],
            values=[None])
    ),
    Row(
        id='3333',
        A=None,
        B=None)
]

df = spark.createDataFrame(rows)
df.show(10, False)

+----+-----------------------------------------------------------+--------------------------------+
|id  |A                                                          |B                               |
+----+-----------------------------------------------------------+--------------------------------+
|1111|[[2015-07-29 14:27:00, 2015-07-31 14:27:00], [20.0, 100.0]]|[[2015-04-18 17:52:00], [12.58]]|
|2222|[[2011-04-28 14:27:00, 2011-04-28 14:27:00], [100.0,]]     |[[2011-04-18 17:52:00], []]     |
|3333|null                                                       |null                            |
+----+-----------------------------------------------------------+--------------------------------+

df.printSchema()

root
 |-- id: string (nullable = true)
 |-- A: struct (nullable = true)
 |    |-- dates: array (nullable = true)
 |    |    |-- element: timestamp (containsNull = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- B: struct (nullable = true)
 |    |-- dates: array (nullable = true)
 |    |    |-- element: timestamp (containsNull = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)

Here dates and values are ordered such that each element of dates corresponds with each element of values. So the second value in values goes with the second date in dates.

I want to filter the df so that it removes Nulls and their corresponding dates and if values only has Nulls it just returns Null. Like,

new_df.show(10, False)

+----+-----------------------------------------------------------+--------------------------------+
|id  |A                                                          |B                               |
+----+-----------------------------------------------------------+--------------------------------+
|1111|[[2015-07-29 14:27:00, 2015-07-31 14:27:00], [20.0, 100.0]]|[[2015-04-18 17:52:00], [12.58]]|
|2222|[[2011-04-28 14:27:00], [100.0]]                           |null                          |
|3333|null                                                       |null                            |
+----+-----------------------------------------------------------+--------------------------------+

1 Answer 1

1

You can use arrays_zip with filter to do the null filtering, then unzip the arrays back to their original states, while taking care of empty arrays or null columns:

import pyspark.sql.functions as F

df2 = df.withColumn(
    'A', 
    F.expr("filter(arrays_zip(A.dates, A.values), x -> x.values is not null)")
).withColumn(
    'A', 
    F.when(
        (F.size('A') != 0) & (F.col('A').isNotNull()), 
        F.struct(F.col('A.0').alias('dates'), F.col('A.1').alias('values'))
    )
).withColumn(
    'B', 
    F.expr("filter(arrays_zip(B.dates, B.values), x -> x.values is not null)")
).withColumn(
    'B', 
    F.when(
        (F.size('B') != 0) & (F.col('B').isNotNull()), 
        F.struct(F.col('B.0').alias('dates'), F.col('B.1').alias('values'))
    )
)

df2.show(truncate=False)
+----+-----------------------------------------------------------+--------------------------------+
|id  |A                                                          |B                               |
+----+-----------------------------------------------------------+--------------------------------+
|1111|[[2015-07-29 13:27:00, 2015-07-31 13:27:00], [20.0, 100.0]]|[[2015-04-18 16:52:00], [12.58]]|
|2222|[[2011-04-28 13:27:00], [100.0]]                           |null                            |
|3333|null                                                       |null                            |
+----+-----------------------------------------------------------+--------------------------------+

df2.printSchema()
root
 |-- id: string (nullable = true)
 |-- A: struct (nullable = true)
 |    |-- dates: array (nullable = true)
 |    |    |-- element: timestamp (containsNull = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- B: struct (nullable = true)
 |    |-- dates: array (nullable = true)
 |    |    |-- element: timestamp (containsNull = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.