3

I have a column in a pyspark df that contains an array of maps like the below:

[{"address": "Fadden", "city": "", "country": "", "note": "", "stateProvince": "Queensland"}]

df.printSchema() returns the following for the column:

 |-- constituencies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- note: string (nullable = true)
 |    |    |-- stateProvince: string (nullable = true)

And I want to nullify all those empty strings. So I thought this would be a perfect problem to solve with F.transform(col, f)

So I created the function, and then I use it in the transform expression like below:

def nullify_vals(d):
  def nullify_string(str_):
    if str_.strip() == "":
      return None
    return str_.strip()
  
  return (
    dict((k, nullify_string(v)) for k, v in d.items())  
  )

Note that the above works when tested on a dictionary:

dd = {"my": "map", "is": "", "not": "   ", "entierly": "  empty , right?"}
d_cln = nullify_vals(dd)  
d_cln["not"] is None # returns True

But when I then use it in Pyspark, it gives me an error:

import pyspark.sql.functions as F
result = kyclean.select(F.transform("constituencies", nullify_vals))

TypeError: 'Column' object is not callable

These are the last lines of the stacktrace:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File <command-899394298900126>:1, in <module>
----> 1 result = kyclean.select(F.transform("constituencies", nullify_vals))

File /databricks/spark/python/pyspark/sql/functions.py:4260, in transform(col, f)
   4214 def transform(col, f):
   4215     """
   4216     Returns an array of elements after applying a transformation to each element in the input array.
   4217 
   (...)
   4258     +--------------+
   4259     """
-> 4260     return _invoke_higher_order_function("ArrayTransform", [col], [f])

File /databricks/spark/python/pyspark/sql/functions.py:4209, in _invoke_higher_order_function(name, cols, funs)
   4206 expr = getattr(expressions, name)
   4208 jcols = [_to_java_column(col).expr() for col in cols]
-> 4209 jfuns = [_create_lambda(f) for f in funs]
   4211 return Column(sc._jvm.Column(expr(*jcols + jfuns)))
0

2 Answers 2

2

Your function nullify_vals should take a Column object of type StructType as your array elements are structs. But you're passing a normal python objects.

Try rewriting it like this instead:

from pyspark.sql import functions as F, Column

def nullify_vals(struct_col: Column, fields: List[str]) -> Column:
    for f in fields:
        struct_col = struct_col.withField(
            f,
            F.when(F.trim(struct_col[f]) == "", None).otherwise(struct_col[f])
        )

    return struct_col

For each field in the inner struct, we use column withField method to update it, if it's equal to empty string then we set it to null.

Applied to your input example:

json_str = '{"constituencies":[{"address":"Fadden","city":"","country":"","note":"","stateProvince":"Queensland"}]}'
df = spark.read.json(spark.sparkContext.parallelize([json_str]))

You can get the list of constituencies struct fields from dataframe schema:

constituencies_fields = df.selectExpr("inline(constituencies)").columns

df1 = df.withColumn(
    "constituencies",
    F.transform("constituencies", lambda x: nullify_vals(x, constituencies_fields))
)

df1.show(truncate=False)
#+----------------------------------------+
#|constituencies                          |
#+----------------------------------------+
#|[{Fadden, null, null, null, Queensland}]|
#+----------------------------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

I see. Let me try.
THANK YOU. This was extremely educational for me thanks :)
1

I'm still looking into the error you got and I'll update the post when I figure out what's wrong. In the meantime, you can do something like this to work around it

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = ArrayType(
    StructType([
      StructField('address', StringType()),
      StructField('city', StringType()),
      StructField('country', StringType()),
      StructField('note', StringType()),
      StructField('stateProvince', StringType()),
  ]), True)
nullify_udf = udf(lambda arr: [[(v if v.strip() != "" else None) for v in area] for area in arr], schema)

result = kyclean.withColumn('constituencies', nullify_udf('constituencies'))

The specific error you got is saying that you can't call d.items() as a function and the input function really needs to be working on the Column object d that gets passed in.

The description of pyspark.sql.functions.transform says, "Returns an array of elements after applying a transformation to each element in the input array."

But inside the description of the accepted function, f, it says, "...and can use methods of Column, functions defined in pyspark.sql.functions and Scala UserDefinedFunctions. Python UserDefinedFunctions are not supported (SPARK-27052)." So it can't take in custom Python UserDefinedFunctions yet, which is sort of what you were trying to do.

2 Comments

but i am applying transform on an array column, an array of maps.
Yeah, sorry, it wasn't clear from the original question description. I'm taking a look at it again now.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.