2

How do i call the below UDF with multiple arguments(currying) in a spark dataframe as below.

read read and get a list[String]

val data = sc.textFile("file.csv").flatMap(line => line.split("\n")).collect.toList

register udf

val getValue = udf(Udfnc.getVal(_: Int, _: String, _: String)(_: List[String]))

call udf in the below df

df.withColumn("value",
     getValue(df("id"),
        df("string1"),
        df("string2"))).show()

Here is am missing the List[String] argument, and I am really not sure as how should i pass on this argument .

3
  • what is panel_df? and please post sample input and expected output and full code of udf function Commented Jun 18, 2018 at 5:30
  • @RameshMaharjan, that was a type. its df and not panel_df Commented Jun 18, 2018 at 6:26
  • and what about the rest of the comments about samples? Commented Jun 18, 2018 at 6:47

2 Answers 2

5

I can make following assumption about your requirement based on your question

a] UDF should accept parameter other than dataframe column

b] UDF should take multiple columns as parameter

Let's say you want to concat values from all column along with specified parameter. Here is how you can do it

import org.apache.spark.sql.functions._

def uDF(strList: List[String]) = udf[String, Int, String, String]((value1: Int, value2: String, value3: String) => value1.toString + "_" + value2 + "_" + value3 + "_" + strList.mkString("_"))

val df = spark.sparkContext.parallelize(Seq((1,"r1c1","r1c2"),(2,"r2c1","r2c2"))).toDF("id","str1","str2")

scala> df.show
+---+----+----+
| id|str1|str2|
+---+----+----+
|  1|r1c1|r1c2|
|  2|r2c1|r2c2|
+---+----+----+

val dummyList = List("dummy1","dummy2")
val result = df.withColumn("new_col", uDF(dummyList)(df("id"),df("str1"),df("str2")))



   scala> result.show(2, false)
+---+----+----+-------------------------+
|id |str1|str2|new_col                  |
+---+----+----+-------------------------+
|1  |r1c1|r1c2|1_r1c1_r1c2_dummy1_dummy2|
|2  |r2c1|r2c2|2_r2c1_r2c2_dummy1_dummy2|
+---+----+----+-------------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

Can you please share java equivalent
Is it possible to register currying UDF with spark.udf.register ?
1

Defining a UDF with multiple parameters:

val enrichUDF: UserDefinedFunction = udf((jsonData: String, id: Long) => {

      val lastOccurence = jsonData.lastIndexOf('}')
      val sid = ",\"site_refresh_stats_id\":" + id+ " }]"
      val enrichedJson = jsonData.patch(lastOccurence, sid, sid.length)

      enrichedJson

    })

Calling the udf to an existing dataframe:

val enrichedDF = EXISTING_DF
  .withColumn("enriched_column",
    enrichUDF(col("jsonData")
      , col("id")))

An import statement is also required as:

import org.apache.spark.sql.expressions.UserDefinedFunction

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.