I have a column in a pyspark df that contains an array of maps like the below:
[{"address": "Fadden", "city": "", "country": "", "note": "", "stateProvince": "Queensland"}]
df.printSchema() returns the following for the column:
|-- constituencies: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address: string (nullable = true)
| | |-- city: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- note: string (nullable = true)
| | |-- stateProvince: string (nullable = true)
And I want to nullify all those empty strings. So I thought this would be a perfect problem to solve with F.transform(col, f)
So I created the function, and then I use it in the transform expression like below:
def nullify_vals(d):
def nullify_string(str_):
if str_.strip() == "":
return None
return str_.strip()
return (
dict((k, nullify_string(v)) for k, v in d.items())
)
Note that the above works when tested on a dictionary:
dd = {"my": "map", "is": "", "not": " ", "entierly": " empty , right?"}
d_cln = nullify_vals(dd)
d_cln["not"] is None # returns True
But when I then use it in Pyspark, it gives me an error:
import pyspark.sql.functions as F
result = kyclean.select(F.transform("constituencies", nullify_vals))
TypeError: 'Column' object is not callable
These are the last lines of the stacktrace:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File <command-899394298900126>:1, in <module>
----> 1 result = kyclean.select(F.transform("constituencies", nullify_vals))
File /databricks/spark/python/pyspark/sql/functions.py:4260, in transform(col, f)
4214 def transform(col, f):
4215 """
4216 Returns an array of elements after applying a transformation to each element in the input array.
4217
(...)
4258 +--------------+
4259 """
-> 4260 return _invoke_higher_order_function("ArrayTransform", [col], [f])
File /databricks/spark/python/pyspark/sql/functions.py:4209, in _invoke_higher_order_function(name, cols, funs)
4206 expr = getattr(expressions, name)
4208 jcols = [_to_java_column(col).expr() for col in cols]
-> 4209 jfuns = [_create_lambda(f) for f in funs]
4211 return Column(sc._jvm.Column(expr(*jcols + jfuns)))