36

I've been searching for a while if there is any way to use a Scala class in Pyspark, and I haven't found any documentation nor guide about this subject.

Let's say I create a simple class in Scala that uses some libraries of apache-spark, something like:

class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._

    df.select(col(column))
  }
}
  • Is there any possible way to use this class in Pyspark?
  • Is it too tough?
  • Do I have to create a .py file?
  • Is there any guide that shows how to do that?

By the way I also looked at the spark code and I felt a bit lost, and I was incapable of replicating their functionality for my own purpose.

0

2 Answers 2

46

Yes it is possible although can be far from trivial. Typically you want a Java (friendly) wrapper so you don't have to deal with Scala features which cannot be easily expressed using plain Java and as a result don't play well with Py4J gateway.

Assuming your class is int the package com.example and have Python DataFrame called df

df = ... # Python DataFrame

you'll have to:

  1. Build a jar using your favorite build tool.

  2. Include it in the driver classpath for example using --driver-class-path argument for PySpark shell / spark-submit. Depending on the exact code you may have to pass it using --jars as well

  3. Extract JVM instance from a Python SparkContext instance:

    jvm = sc._jvm
    
  4. Extract Scala SQLContext from a SQLContext instance:

    ssqlContext = sqlContext._ssql_ctx
    
  5. Extract Java DataFrame from the df:

    jdf = df._jdf
    
  6. Create new instance of SimpleClass:

    simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
    
  7. Callexe method and wrap the result using Python DataFrame:

    from pyspark.sql import DataFrame
    
    DataFrame(simpleObject.exe(), ssqlContext)
    

The result should be a valid PySpark DataFrame. You can of course combine all the steps into a single call.

Important: This approach is possible only if Python code is executed solely on the driver. It cannot be used inside Python action or transformation. See How to use Java/Scala function from an action or a transformation? for details.

Sign up to request clarification or add additional context in comments.

1 Comment

What happen if the scala class has also alternative constructors? Is it supposed to work?
5

As an update to @zero323's answer, given that Spark's APIs have evolved over the last six years, a recipe that works in Spark-3.2 is as follows:

  1. Compile your Scala code into a JAR file (e.g. using sbt assembly)
  2. Include the JAR file in the --jars argument to spark-submit together with any --py-files arguments needed for local package definitions
  3. Extract the JVM instance within Python:
jvm = spark._jvm
  1. Extract a Java representation of the SparkSession:
jSess = spark._jsparkSession
  1. Extract the Java handle for the PySpark DataFrame "df" that you want to pass into the Scala method:
jdf = df._jdf
  1. Create a new instance of SimpleClass from within PySpark:
simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
  1. Call the exe method and convert its output into a PySpark DataFrame:
from pyspark.sql import DataFrame

result = DataFrame(simpleObject.exe(), spark)

If you need to pass additional parameters, such as a Python dictionary, PySpark may automatically convert them into corresponding Java types they before emerge into your Scala methods. Scala provides the JavaConverters package to help with translating this into more natural Scala datatypes. For example, a Python dictionary could be passed into a Scala method and immediately converted from a Java HashMap into a Scala (mutable) Map:

def processDict(spark: SparkSession, jparams: java.util.Map[String, Any]) {
  import scala.collection.JavaConverters._
  val params = jparams.asScala
  ...
}

5 Comments

Thanks for sharing this. But how to do type conversion when pass parameters?
Thanks for sharing this. But how to do type conversion when pass parameters? For example, I have a Scala class with a method like read(paraA: String, paraB: Map[String, String]) When I call the method like read("abc", {"cde":"fgh"}) , it gave me the following error: Method read([class java.lang.String, class java.util.HashMap]) does not exist. I guess it is because a python dictionary is converted as HashMap, but the original method required a Scala Immuted Map?
Thanks, Gary - I've added an example of how you might be able to do this.
Thank you @rwp for providing an updated Spark API answer. Can you please elaborate in step 5, where df is defined? Is it in scala or python? I am having difficulties following your step trying to understand what steps are done in which runtime stack. TIA!
Thanks, @geekyj - I've amended the description to try to make it clearer that "df" is whatever dataframe you want to pass into your Scala library, and that this step is executed within PySpark rather than Scala Spark.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.