3

I have a Java client class (used as a dependency Jar with spark-shell) that responds to an API call - let's call the class SomeAPIRequester.

In plain Java, it would return me desired results with below sample code -

SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123"))  // result: {"id123": "item123"}

I want to call this API in a distributed manner through my RDD of IDs in a stored in spark dataframe (in scala) -

val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...))  // sample RDD of IDs i want to call the API for

and I define my UDF as -

val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
   requester.getSomeItem(id)
})

and call this UDF as -

inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester)  // requester as built with SomeAPIRequester.builder()....

// or directly with RDD ? udf, or a plain scala function .. 
inputIdRdd.foreach{ id => test(id, requester) }

When I run a .show() or .take() on the result, I get NullPointerException on the requester java class.

I also tried sending in literals (lit), and I read about typedLit in scala, but I could not convert the Java Requester class into any allowed typedLit types in scala.

Is there a way to call this Java class object through UDFs and get the result from the API?

Edit:

I also tried to initialize the requester class in the RDD's foreach block -

inputIdRdd.foreach(x =>{
  val apiRequester = SomeAPIRequester.builder()...(argPool).build()

  try {
    apiRequester.getSomeItem(x)
  } catch {
    case ex: Exception => println(ex.printStackTrace()); ""
  }
})

But this returns no response - cannot initialize class etc.

Thanks!

2
  • You probably need to post the entire structure of your code. It's hard to guess where what is declared and when what is being used. Besides, you're using a UDF with RDDs, which is strange. Commented Sep 27, 2020 at 7:44
  • Either ways, df.withColumn("newCol", udf(lit(x), requester)) also throws similar error as my rdd.foreach{x => udf(x, requester)} does.. for the UDF that i defined. Updating my questions with some more details.. Commented Sep 27, 2020 at 7:47

1 Answer 1

2

Working with custom classes working with Spark requires having some knowledge about how Spark works under the hood. Don´t put your instance as a parameter in the udf. Parameters in udfs are extracted from the rows of the dataframe, the null pointer exception is understandable in this case. You can try with the following options:

  1. First put the instance in the scope of the udf:

    val requester: SomeAPIRequester = ???
    
    val test: UserDefinedFunction = udf((id: String) => {
         requester.getSomeItem(id)
    })
    

At this point you will need to mark your class as Serializable if possible, otherwise you will have a NotSerializableException.

  1. If your class is not Seriazable because it comes form a third party you can mark your instance as lazy transient val as you can see in https://mengdong.github.io/2016/08/16/spark-serialization-memo/ or https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d.

  2. If you work in the RDD world you can use mapPartitions to create just one instance per partition.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for these suggestions, let me try this out!
Edit: This works like a charm! Thanks for the mapPartitions idea!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.