0

I have data in a DataFrame with below columns

  1. Fileformat is csv
  2. All below column datatypes are String

    employeeid,pexpense,cexpense

Now I need to create a new DataFrame which has new column called expense, which is calculated based on columns pexpense, cexpense.

The tricky part is the calculation algorithm is not an UDF function which I created, but it's an external function that needs to be imported from a Java library which takes primitive types as arguments - in this case pexpense, cexpense - to calculate the value required for new column.

The function signature which is from an external Java jar

public class MyJava

{

    public Double calculateExpense(Double pexpense, Double cexpense) {
       // calculation
    }

}

So how can I invoke that external function to create a new calculated column. Can I register that external function as UDF in my Spark application?

0

3 Answers 3

2

You can create your UDF of the external method similar to the following (illustrated using Scala REPL):

// From a Linux shell prompt:

vi MyJava.java
public class MyJava {
    public Double calculateExpense(Double pexpense, Double cexpense) {
        return pexpense + cexpense;
    }
}
:wq

javac MyJava.java
jar -cvf MyJava.jar MyJava.class

spark-shell --jars /path/to/jar/MyJava.jar

// From within the Spark shell

val df = Seq(
  ("1", "1.0", "2.0"), ("2", "3.0", "4.0")
).toDF("employeeid", "pexpense", "cexpense")

val myJava = new MyJava

val myJavaUdf = udf(
  myJava.calculateExpense _
)

val df2 = df.withColumn("totalexpense", myJavaUdf($"pexpense", $"cexpense") )

df2.show
+----------+--------+--------+------------+
|employeeid|pexpense|cexpense|totalexpense|
+----------+--------+--------+------------+
|         1|     1.0|     2.0|         3.0|
|         2|     3.0|     4.0|         7.0|
+----------+--------+--------+------------+
Sign up to request clarification or add additional context in comments.

1 Comment

i was about to post the same thing but anyways thanks for the answer!
0

You can simply "wrap" the given method in a UDF by passing it as an argument to the udf function in org.apache.spark.sql.functions:

import org.apache.spark.sql.functions._
import spark.implicits._

val myUdf = udf(calculateExpense _)
val newDF = df.withColumn("expense", myUdf($"pexpense", $"cexpense"))

This assumes pexpense and cexpense columns are both Doubles.

2 Comments

As i mentioned calculateExpense is an external function which is part of a class named MyJava .i need to instantiate this class and invoke that with object reference ..Is your solution still valid ?
(was offline) - the answer is yes, you just have to instantiate a MyJava instance and use it to reference the method, as @leo-c showed in a similar answer...
-1

bellow, is an example of sum two columns:

val somme= udf((a: Int, b: int) => a+b)

val df_new = df.select(col("employeeid"), \
                       col("pexpense"),   \
                       col("pexpense"),   \
                       somme(col("pexpense"), col("pexpense")) as "expense")

1 Comment

the function is an external Java function not an UDF which i defined in my application

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.