I have a Java client class (used as a dependency Jar with spark-shell) that responds to an API call - let's call the class SomeAPIRequester.
In plain Java, it would return me desired results with below sample code -
SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123")) // result: {"id123": "item123"}
I want to call this API in a distributed manner through my RDD of IDs in a stored in spark dataframe (in scala) -
val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...)) // sample RDD of IDs i want to call the API for
and I define my UDF as -
val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
requester.getSomeItem(id)
})
and call this UDF as -
inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester) // requester as built with SomeAPIRequester.builder()....
// or directly with RDD ? udf, or a plain scala function ..
inputIdRdd.foreach{ id => test(id, requester) }
When I run a .show() or .take() on the result, I get NullPointerException on the requester java class.
I also tried sending in literals (lit), and I read about typedLit in scala, but I could not convert the Java Requester class into any allowed typedLit types in scala.
Is there a way to call this Java class object through UDFs and get the result from the API?
Edit:
I also tried to initialize the requester class in the RDD's foreach block -
inputIdRdd.foreach(x =>{
val apiRequester = SomeAPIRequester.builder()...(argPool).build()
try {
apiRequester.getSomeItem(x)
} catch {
case ex: Exception => println(ex.printStackTrace()); ""
}
})
But this returns no response - cannot initialize class etc.
Thanks!
df.withColumn("newCol", udf(lit(x), requester))also throws similar error as myrdd.foreach{x => udf(x, requester)}does.. for the UDF that i defined. Updating my questions with some more details..