3

I want to modify/filter on a property inside a struct. Let's say I have a dataframe with the following column :

#+------------------------------------------+
#|                 arrayCol                 |
#+------------------------------------------+
#| {"a" : "some_value", "b" : [1, 2, 3]}    |
#+------------------------------------------+

Schema:

struct<a:string, b:array<int>>

I want to filter out some values in 'b' property when value inside the array == 1

The result desired is the following :

#+------------------------------------------+
#|                 arrayCol                 |
#+------------------------------------------+
#| {"a" : "some_value", "b" : [2, 3]}       |
#+------------------------------------------+

Is it possible to do it without extracting the property, filter the values, and re-build another struct ?

0

2 Answers 2

4

Update:

For spark 3.1+, withField can be used to update the struct column without having to recreate all the struct. In your case, you can update the field b using filter function to filter the array values like this:

import pyspark.sql.functions as F

df1 = df.withColumn(
    'arrayCol',
    F.col('arrayCol').withField('b', F.filter(F.col("arrayCol.b"), lambda x: x != 1))
)

df1.show()
#+--------------------+
#|            arrayCol|
#+--------------------+
#|{some_value, [2, 3]}|
#+--------------------+

For older versions, Spark doesn’t support adding/updating fields in nested structures. To update a struct column, you'll need to create a new struct using the existing fields and the updated ones:

import pyspark.sql.functions as F

df1 = df.withColumn(
    "arrayCol",
    F.struct(
        F.col("arrayCol.a").alias("a"),
        F.expr("filter(arrayCol.b, x -> x != 1)").alias("b")
    )
)
Sign up to request clarification or add additional context in comments.

1 Comment

This is doing the job, but what I was looking for was a way to do it without recreating the struct object, and modify directly the property inside the struct.
0

One way would be to define a UDF:

Example:

import ast
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, MapType


def remove_value(col):
    col["b"] = str([x for x in ast.literal_eval(col["b"]) if x != 1])
    return col


if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [
            {
                "arrayCol": {
                    "a": "some_value",
                    "b": "[1, 2, 3]",
                },
            },
        ]
    )
    remove_value_udf = spark.udf.register(
        "remove_value_udf", remove_value, MapType(StringType(), StringType())
    )
    df = df.withColumn(
        "result",
        remove_value_udf(F.col("arrayCol")),
    )

Result:

root
 |-- arrayCol: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- result: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---------------------------------+------------------------------+              
|arrayCol                         |result                        |
+---------------------------------+------------------------------+
|{a -> some_value, b -> [1, 2, 3]}|{a -> some_value, b -> [2, 3]}|
+---------------------------------+------------------------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.