5

Problem

I would like to create a User-Defined Function in Java that can be called as a Java method within a chain of Apache Spark operators. I'm having trouble finding Java examples that don't require the UDF to exist inside of a SQL query.

Versions

  • Java 8
  • Scala 2.10.6
  • Apache Spark 1.6.0 Pre-built for Hadoop 2.6.0

What I've Tried That Works

I can successfully create a UDF in Java. However, I can't use this unless it's in a SQL query:

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
oldDF.registerTempTable("df");
DataFrame newDF = sqlContext.sql("SELECT udfUppercase(name) AS name_upper FROM df");

Where I'm Stuck

I would expect a non-SQL method-call-style UDF in Java to look something like this:

import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

UserDefinedFunction udfUppercase = udf(
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", udfUppercase(oldDF.col("name")));

Compiling this leads to a compiler error on the line beginning with "UserDefinedFunction", so obviously my attempt at guessing the right signature is incorrect:

error: no suitable method found for udf((String st[...]ase(),DataType)
    UserDefinedFunction udfUppercase = udf((String string) -> string.toUpperCase(), DataTypes.StringType);
method functions.<RT#1>udf(Function0<RT#1>,TypeTags.TypeTag<RT#1>) is not applicable
    (cannot infer type-variable(s) RT#1
    (argument mismatch; Function0 is not a functional interface
    multiple non-overriding abstract methods found in interface Function0))

This error continues with detail for each of the inferred udf() signatures attempted.

What I Need

I need to fix the Java code so that I can define and use the udfUppercase UDF without embedding it in a SQL query. I feel like I'm missing something very simple, fundamental, and possibly syntax-y, but could be completely off base.

Working Solution (courtesy of zero323 below)

There's no good way to register and use a Java UDF as a Java method, but a UDF registered in the SQLContext can be inserted into a chain of operators using callUDF().

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", callUDF("udfUppercase", oldDF.col("name")));

Also, be sure to use callUDF() and not the deprecated callUdf() which has a different method signature.

5
  • First off, I don't understand what you mean by "non-SQL". Clearly, even in the last example you have a valid sqlContext -- you can't work with a DataFrame without a valid sqlContext. So why not try calling sqlContext.register() on the udf? Commented Mar 27, 2016 at 14:23
  • By non-SQL, I mean a UDF that can be used as a Java method call in pure Java code, not embedded in a SQL query. I already understand how to create UDF that works in SQL, as shown in the working code I provided. Commented Mar 27, 2016 at 15:33
  • Like I said, try registering the udf like you did where it works. Commented Mar 27, 2016 at 15:45
  • That approach doesn't work. Registering the UDF in the SQLContext with register() makes it available within that context for SQL queries, but doesn't promote it to a full-fledged Java-callable method. After the change, compilation fails on the line where I try to call udfUppercase() within the code. Commented Mar 27, 2016 at 15:56
  • Brilliant . Absolutely Brilliant . master Engineering at it's very best . Commented Jan 21, 2020 at 22:48

1 Answer 1

14

Spark >= 2.3

SPARK-22945 (add java UDF APIs in the functions object) adds simplified udf API, similar to Scala and Python:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction udfUppercase = udf(
  (String s) -> s.toUpperCase(), DataTypes.StringType
);

df.select(udfUppercase.apply(col("name")));

Spark < 2.3

Long story short functions.udf methods are not designed for Java interoperability. All variants require TypeTags and while it is possible to generate these manually (I am pretty sure I've seen Daniel Darabos showing how to do it on SO) it is something you probably want to avoid.

If for some reason you want to avoid writing UDF in Scala the simplest thing is to register UDF and call it by name:

sqlContext.udf().register("udfUppercase",
  (String string) -> string.toUpperCase(), DataTypes.StringType);

df.select(callUDF("udfUppercase", col("name")));
Sign up to request clarification or add additional context in comments.

2 Comments

callUDF() was the piece I was missing. Thanks!
Brilliant . Absolutely Brilliant . master Engineering at it's very best .

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.