3

I am trying to write a user defined function in pyspark that determines whether a given entry in a dataframe is bad(Null or NaN). I can't seem to figure out what I am doing wrong in this function:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

def is_bad(value):
   if (value != value | (value.isNull())):
       return True
   else:
       return False

isBadEntry = UserDefinedFunction(lambda x: is_bad(x),BooleanType())

df_test = sql.createDataFrame([(1,1,None ), (1,2, 5), (1,3, None), (1,4, None), (1,5, 10), (1,6,None )], ('session',"timestamp", "id"))
df_test =df_test.withColumn("testing", isBadEntry(df_test.id)).show()

This is crashing with a cryptic error:

Py4JJavaErrorTraceback (most recent call last)
<ipython-input-379-b4109047ba40> in <module>()
  1 df_test = sql.createDataFrame([(1,1,None ), (1,2, 5), (1,3, None), (1,4, None), (1,5, 10), (1,6,None )], ('session',"timestamp", "id"))
  2 #df_test.show()
----> 3 df_test =df_test.withColumn("testing", isBadEntry(df_test.id)).show()

/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate)
285         +---+-----+
286         """
--> 287         print(self._jdf.showString(n, truncate))
288 
289     def __repr__(self):

/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

 /usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61     def deco(*a, **kw):
 62         try:
---> 63             return f(*a, **kw)
 64         except py4j.protocol.Py4JJavaError as e:
 65             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317                 raise Py4JJavaError(
318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
320             else:
321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o29884.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 43.0 failed 4 times, most recent failure: Lost task 0.3 in stage 43.0 (TID 167, 172.16.193.79): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
 func = lambda _, it: map(mapper, it)
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
 File "<ipython-input-378-2aac40340a6c>", line 14, in <lambda>
 File "<ipython-input-378-2aac40340a6c>", line 9, in is_bad
AttributeError: 'NoneType' object has no attribute 'isNull'

Could someone please help?

1
  • 2
    change value.isNull() to value is None. Commented Nov 3, 2017 at 14:50

1 Answer 1

8

As Psidom implies in the comment, in Python, the NULL object is the singleton None (source); changing the function as follows works OK:

def is_bad(value):
   if (value != value) | (value is None):
       return True
   else:
       return False

isBadEntry = UserDefinedFunction(lambda x: is_bad(x),BooleanType())
df_test.withColumn("testing", is_bad(df_test.id)).show()
# +-------+---------+----+-------+ 
# |session|timestamp|  id|testing|
# +-------+---------+----+-------+
# |      1|        1|null|   true|
# |      1|        2|   5|  false|
# |      1|        3|null|   true|
# |      1|        4|null|   true|
# |      1|        5|  10|  false|
# |      1|        6|null|   true|
# +-------+---------+----+-------+

And works with NaN's as well:

from pyspark.sql import Row

# toy data:
df = spark.createDataFrame([Row(1.0, 7., None),
                          Row(2., 4., float('nan')),
                          Row(3., 3., 5.0),
                          Row(4., 1., 4.0),
                          Row(5., 1., 1.0)],
                          ["col_1", "col_2", "col_3"])

df.withColumn("testing", isBadEntry(df.col_3)).show()
# +-----+-----+-----+-------+ 
# |col_1|col_2|col_3|testing|
# +-----+-----+-----+-------+ 
# |  1.0|  7.0| null|   true|
# |  2.0|  4.0|  NaN|   true|
# |  3.0|  3.0|  5.0|  false|
# |  4.0|  1.0|  4.0|  false|
# |  5.0|  1.0|  1.0|  false|
# +-----+-----+-----+-------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.