0

I have a variable declare like this :

val jobnameSeq = Seq( ("42409245", "name12"),("42461545", "name4"),("42409291", "name1"),("42413872", "name3"),("42417044", "name2"))

I want to be able to create a function usable in Spark SQL to substitute 42461545 by name4 in an sql query.
I tried to declare this function:

val jobnameDF = jobnameSeq.toDF("jobid","jobname")
sqlContext.udf.register("getJobname", (id: String) => (
     jobnameDF.filter($"jobid" === id).select($"jobname")
    )
)

To be use like this in sql:

select getjobname(jobid), other, field from table  

But jobnameDF.filter($"jobid" === id).select($"jobname") returns a DF not a string and I can't figure out how to simply convert this value to string as there will be only one result each time.

If a Seq is not the object to use in this case, I'm open to suggestion.

Edit:
Though the suggested answer works, here's what I did exactly to make this work:

#Convert my seq to a hash map
val jobMap = jobnameSeq.toMap
#declare a sql function so I could use it in sparksql (I need to be accessible to people that don't know scala
sqlContext.udf.register("getJobname", (id: String) => (
    jobMap(id)
    )
)
10
  • My goal is just to have a function that return the jobname based on the id provided I don't want to replace any value. @RameshMaharjan I tried to add a bit more context there. Commented Jun 8, 2018 at 12:48
  • @RameshMaharjan do you realize I have no full knowledge of the spark framework, I would be delighted to learn a better / new way to do things. Commented Jun 8, 2018 at 12:57
  • All I am saying is you don't need spark for your requirement . is your data big ? and your requirement suggests that it would faster and efficient to do it in hashmap Commented Jun 8, 2018 at 12:58
  • The sequence is indeed small So not integrated to hdfs. The table on which i need to subsitute the value several time is big indeed. Commented Jun 8, 2018 at 13:03
  • then update the question with the substitute part as well. You will certainly get answer Commented Jun 8, 2018 at 13:05

2 Answers 2

2

You can do this in many ways:

val jobnameSeq = Seq( ("42409245", "name12"),("42461545", "name4"),
                      ("42409291", "name1"),("42413872", "name3"),("42417044", "name2"))
val jobIdDF = Seq( "42409245",("42409291"),("42409231")).toDF("jobID")
jobIdDF.createOrReplaceTempView("JobView")

Just use plain scala's toMap function on the Jobname Sequence.

sqlContext.udf.register("jobNamelookUp", (jobID: String) =>  
                                            jobnameSeq.toMap.getOrElse(jobID,"null"))

//OR

If the input is an RDD then use collectAsMap using spark.

val jobnameMap = sc.parallelize(jobnameSeq).collectAsMap
sqlContext.udf.register("lookupJobName",(jobID:String) => 
                                            jobnameMap.getOrElse(jobID,"null"))

//OR

If this lookup is going happen on a cluster then you can broadcast it.

val jobnameMapBC = sc.broadcast(jobnameMap)
sqlContext.udf.register("lookupJobNameBC",(jobID:String) => 
                                                jobnameMapBC.value.getOrElse(jobID,"null")) 

spark.sql("select jobID,jobNamelookUp(jobID) as jobNameUsingMap,
                        lookupJobNameBC(jobID) as jobNameUsingBC,
                        lookupJobName(jobID) as jobNameUsingRDDMap 
         from JobView")
    .show()

+--------+---------------+--------------+------------------+
|   jobID|jobNameUsingMap|jobNameUsingBC|jobNameUsingRDDMap|
+--------+---------------+--------------+------------------+
|42409245|         name12|        name12|            name12|
|42409291|          name1|         name1|             name1|
|42409231|           null|          null|              null|
+--------+---------------+--------------+------------------+    

As suggested by Raphael, using broadcast-join:

import org.apache.spark.sql.functions._
val jobnameSeqDF = jobnameSeq.toDF("jobID","name")
jobIdDF.join(broadcast(jobnameSeqDF), Seq("jobID"),"leftouter").show(false)

+--------+------+
|jobID   |name  |
+--------+------+
|42409245|name12|
|42409291|name1 |
|42409231|null  |
+--------+------+
Sign up to request clarification or add additional context in comments.

2 Comments

another way would be to convert jobnameSeq to a Dataframe and simply join it on table (using a broadcast-join)
Added it Raphael. Thanks
1

As far as I can understand from your question, you should be creating a Map from your sequence and get the jobId directly as

val simpleMap = jobnameSeq.toMap

println(simpleMap("42461545"))

which should give you name4

Now if you want to test with dataframe, you can do the following

val jobnameDF = jobnameSeq.toDF("jobid","jobname")

val jobName = jobnameDF.filter($"jobid" === "42461545").select("jobname").first().getAs[String]("jobname")

println(jobName)

which should print name4

1 Comment

great to hear that @Kiwy :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.