2

I use spark1.6. I tried to broadcast a RDD and am not sure how to access the broadcasted variable in the data frames?

I have two dataframes employee & department.

Employee Dataframe

-------------------
Emp Id | Emp Name | Emp_Age
------------------
1 | john | 25

2 | David | 35

Department Dataframe

--------------------
Dept Id | Dept Name | Emp Id
-----------------------------
1 | Admin | 1

2 | HR | 2

import scala.collection.Map

val df_emp = hiveContext.sql("select * from emp")

val df_dept = hiveContext.sql("select * from dept")

val rdd = df_emp.rdd.map(row => (row.getInt(0),row.getString(1)))

val lkp = rdd.collectAsMap()

val bc = sc.broadcast(lkp)

print(bc.value.get(1).get)

--Below statement doesn't work

val combinedDF = df_dept.withColumn("emp_name",bc.value.get($"emp_id").get)
  1. How do I refer the broadcast variable in the above combinedDF statement?
  2. How to handle if the lkp doesn't return any value?
  3. Is there a way to return multiple records from the lkp (lets assume if there are 2 records for emp_id=1 in the look up, I would like to get both records)
  4. How to return more than one value from broadcast...(emp_name & emp_age)

1 Answer 1

4

How do I refer the broadcast variable in the above combinedDF statement?

Use udf. If emp_id is Int

val f = udf((emp_id: Int) =>  bc.value.get(emp_id))

df_dept.withColumn("emp_name", f($"emp_id"))

How to handle if the lkp doesn't return any value?

Don't use get as shown above

Is there a way to return multiple records from the lkp

Use groupByKey:

val lkp = rdd.groupByKey.collectAsMap()

and explode:

df_dept.withColumn("emp_name", f($"emp_id")).withColumn("emp_name", explode($"emp_name"))

or just skip all the steps and broadcast:

import org.apache.spark.sql.functions._

df_emp.join(broadcast(df_dep), Seq("Emp Id"), "left")
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the response.....for returning multiple records from the lookup, I did the following....val lkp = rdd.groupByKey.collectAsMap().....val bc = sc.broadcast(lkp)......val f = udf((emp_id: Int) => bc.value.get(emp_id))......I get java.lang.UnsupportedOperationException.
Also, if I look up has multiple columns, How to return more than one value from broadcast...(emp_name & emp_age)
There was a mistake in the udf call which is fixed. With multiple value it is best to use broadcast join (the last part).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.