1

I am trying to write a Python utility function that accepts an object of a locally-defined class, and uses one of that class's methods as a user-defined function (UDF), in a PySpark DataFrame withColumn call. The utility function signature is:

 def spark_analyze(lp: LogProcessor):

In the LogProcessor class, I have a method that I'd like to use as a UDF. The method definition is:

schema = StructType([
  StructField("total", IntegerType(), False),
  StructField("other", IntegerType(), False)
])

def ProcessLog(self, log_file):
    self.PrepareForLog()
    for event in pyspark_utils.spark_events_from_file(log_file):
      self.ProcessEvent(event)
      return [total, other]

In the spark_analyze, I do the following, where lp is the passed-in object of type LogProcessor:

@udf(lp.schema)
def lpf(lcm_file):
    lp.ProcessLog(lcm_file)
return (df.withColumn('results', lpf(col('logfile_dir')))
...

The produces a long Python stack trace, which starts like this:

/home/david/libs.zip/pyspark_utils.py in spark_analyze(lp) 132 def lpf(lcm_file): 133 lp.ProcessLog(lcm_file) --> 134 return (df.withColumn('results', lpf(col('logfile_dir'))) 135 .withColumn('log name', spark_get_dataset_name(col('logfile_dir'))) 136 .select('log name', 'results.*')

/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/functions.py in wrapper(*args) 1955 @functools.wraps(f) 1956 def wrapper(*args): -> 1957 return udf_obj(*args) 1958 1959 wrapper.func = udf_obj.func

and ends with:

/home/david/libs.zip/pyspark_utils.py in spark_analyze(lp) 132 def lpf(lcm_file): 133 lp.ProcessLog(lcm_file) --> 134 return (df.withColumn('results', lpf(col('logfile_dir'))) 135 .withColumn('log name', spark_get_dataset_name(col('logfile_dir'))) 136 .select('log name', 'results.*')

/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/functions.py in wrapper(*args) 1955 @functools.wraps(f) 1956 def wrapper(*args): -> 1957 return udf_obj(*args) 1958 1959 wrapper.func = udf_obj.func

I did some testing, and found that things work fine if I define my UDF right above the place where I pass it to col. I also tried redefining the ProcessLog to just return [0,0], and found that the problem did not go away. So the problem seems to be that I'm using a passed-in class object's method as a UDF. Is there another way to have a UDF be a method in a class? Thanks for any help here!

2
  • 1
    Have you registered this UDF defined in class? , something like this ` sparkContext.udf.register("ProcessLog",LogProcessor.ProcessLog,"Return Type") ` and then later you should be able to invoke it as ` df.withColumn("Result","ProcessLog(col('logfile_dir')") . And also change the method defintion to static. Try this if it works for you. I was able to invoke udf like above. Commented Oct 2, 2018 at 3:22
  • Ah, that is a good idea! I ended up switching to a procedural style and just passing the function definition, which worked. I suspect your version would work too! Thank you for the reply. Commented Oct 2, 2018 at 20:43

1 Answer 1

0

The approach that Usman Azhar suggested may work. I ended up solving this by simply passing the definition of my UDF as an argument of my library function.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.