1

I am reading 2 different .csv files which has only column as below:

    val dF1 = sqlContext.read.csv("some.csv").select($"ID")
    val dF2 = sqlContext.read.csv("other.csv").select($"PID")

trying to search if dF2("PID") exists in dF1("ID"):

    val getIdUdf = udf((x:String)=>{dF1.collect().map(_(0)).toList.contains(x)})
    val dfFinal = dF2.withColumn("hasId", getIdUdf($"PID"))

This gives me null pointer exception. but if I convert dF1 outside and use list in udf it works:

    val dF1 = sqlContext.read.csv("some.csv").select($"ID").collect().map(_(0)).toList
    val getIdUdf = udf((x:String)=>{dF1.contains(x)})
    val dfFinal = dF2.withColumn("hasId", getIdUdf($"PID"))

I know I can use join to get this done but want to know what is the reason of null pointer exception here.

Thanks.

1
  • I think put a collect inside UDF is not a good manner and maybe is the source of the error. Think that function will be called too many times, so will perform a collect each time. Have you think about extract that collect outside and broadcast the data? Commented Nov 3, 2017 at 9:15

1 Answer 1

3

Please check this question about accessing dataframe inside the transformation of another dataframe. This is exactly what you are doing with your UDF, and this is not possible in spark. Solution is either to use join, or collect outside of transformation and broadcast.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.