0

My sbt project has the following dependency configuration:

val scyllaConnector  = "com.datastax.spark" %% "spark-cassandra-connector" % "3.2.0"
val sparkHadoopCloud = "org.apache.spark"   %% "spark-hadoop-cloud"        % "3.3.0"
val sparkSql         = "org.apache.spark"   %% "spark-sql"                 % "3.3.0"
val sparkSqlKafka    = "org.apache.spark"   %% "spark-sql-kafka-0-10"      % "3.3.0"

val customService    = // group, name and version of custom service

The customService includes a critical dependency:

val kubernetesClient = "io.kubernetes" % "client-java" % "19.0.1"

which requires Gson 2.10.1 specifically.

When running in Spark cluster mode, Gson 2.2.4 from Spark's jars is being used instead. I've confirmed this via:

  1. Runtime path checking:

    val path = classOf[Gson].getProtectionDomain.getCodeSource.getLocation.getPath
    log.info("Gson loaded from: " + path)
    

returns:

Gson loaded from: /opt/app-root/spark/jars/gson-2.2.4.jar
  1. Error stack traces showing method signatures that only exist in 2.2.4

    Exception refers in code lines, which take a place in 2.2.4 version of Gson methods, but not in 2.10.1

So the dependency three at runtime for gson may be so:

 |-spark-sql -|
 |            |- gson-2.2.4
 |           
 |-customService -|
                  |-- client-java -|
                                   |-- gson-2.10.1

(As an interest, it does not match dependencyTreeBrowser output, in which all mentioned dependencies have 2.10.1 version)

As I understand, Spark by it's specific classloaders replace all gson version in project by it's own 2.2.4.

I tried to threat it by:

  1. Clear all project dependencies from gson, with following add gson of target version.

    val gson = "com.google.code.gson" % "gson" % "2.10.1"
    
    Seq(
      scyllaConnector,
      sparkHadoopCloud,
      sparkSql,
      sparkSqlKafka,
      customService
    )
    .map(_=>ExclusionRule(organization = "com.google.code.gson", name = "gson")) ++ Seq(gson)
    
  2. Shade gson artifacts with target version:

    lazy val shadingSettings = Seq(
      assembly / assemblyShadeRules := Seq(
        ShadeRule
          .rename("com.google.gson.**" -> "my_shaded.gson.@1")
          .inLibrary("com.google.code.gson" % "gson" % "2.10.1")
          .inAll,
    
      // Delete not shaded gson
      ShadeRule.zap("com.google.gson.**").inAll
     ),
     // Merge conflicted versions
     assembly / assemblyMergeStrategy := {
        case PathList("META-INF", "services", _*) => MergeStrategy.concat
        case PathList("META-INF", _*)             => MergeStrategy.discard
        case _                                    => MergeStrategy.first
      }
    )
    

Neither approach works. What am I missing?

Additionally:

Full build.sbt

import com.eed3si9n.jarjarabrams.ShadeRule
import sbtassembly.AssemblyPlugin.autoImport.*

lazy val commonSettings = Seq(
  tpolecatScalacOptions ++= Set(
      ScalacOptions.release("11"),
      ScalacOptions.warnOption("nonunit-statement")
   )
 )

 lazy val shadingSettings = Seq(
    assembly / assemblyShadeRules := Seq(
      ShadeRule
      .rename("com.google.gson.**" -> "my_shaded.gson.@1")
      .inLibrary("com.google.code.gson" % "gson" % "2.10.1")
      .inAll,

    // Delete not shaded gson
    ShadeRule.zap("com.google.gson.**").inAll
 ),
 // Merge conflicted versions
 assembly / assemblyMergeStrategy := {
    case PathList("META-INF", "services", _*) => MergeStrategy.concat
    case PathList("META-INF", _*)             => MergeStrategy.discard
    case _                                    => MergeStrategy.first
   }
 )


 inThisBuild(
   Seq(
     scalaVersion := "2.13.10",
     javacOptions ++= Seq("-encoding","UTF-8","-source","11"),
   Compile / compile / javacOptions ++= Seq( // need to split this 'cause javadoc :(
     "-target",
     "11",
     "-Xlint:deprecation", // Print deprecation warning details.
    ),
    autoAPIMappings := true, // will use external ScalaDoc links for managed dependencies
    updateOptions   := updateOptions.value.withCachedResolution(true),
    addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
   )
)

lazy val root =
 Project(id = "project", base = file("."))
    .enablePlugins(GitBranchPrompt, AssemblyPlugin)
    .aggregate(
      sparkJob
     )
    .settings(
      commonSettings,
      shadingSettings,
      crossScalaVersions := Nil,
      cleanFiles += baseDirectory.value / "dist"
    )

  lazy val sparkJob = Project(id = "events_loader", base = 
   file("events_loader"))
    .configs(IntegrationTest)
    .settings(
     commonSettings,
     shadingSettings,
     inConfig(IntegrationTest)(Defaults.testSettings),
     libraryDependencies ++= Dependencies.sparkJob.value
   )

Dependencies.scala

 object Dependencies {
  object Version {
    val ScyllaConnector = "3.2.0"
    val Spark           = "3.3.0"
    val Gson            = "2.10.1"
    val HadoopAws = "3.3.2"
  }

  object Libraries {
     val gson            = "com.google.code.gson"             % "gson"                    % Version.Gson
     val scyllaConnector = "com.datastax.spark" %% "spark-cassandra-connector" % Version.ScyllaConnector
     val sparkHadoopCloud =
  ("org.apache.spark" %% "spark-hadoop-cloud" % Version.Spark).exclude("org.apache.hadoop", "hadoop-aws")
     val sparkHadoopAws = "org.apache.hadoop" % "hadoop-aws"           % Version.HadoopAws % Provided
     val sparkSql       = "org.apache.spark" %% "spark-sql"            % Version.Spark     % Provided
     val sparkSqlKafka  = "org.apache.spark" %% "spark-sql-kafka-0-10" % Version.Spark
     val customService    = // group, name and version of custom service
   }

    val sparkJob = Def.setting {
     import Libraries.*
       Seq(
        scyllaConnector,
        sparkHadoopCloud,
        sparkSql,
        sparkSqlKafka,
        sparkHadoopAws,
     ).map(_=>ExclusionRule(organization = "com.google.code.gson", name = "gson")) ++ Seq(gson)
     }
  }

Updated:

I tried to set classLoader priority by userClasspathFirst as following:

 val conf = new SparkConf()
  .setAppName(sparkConfig.appName)
  .set("spark.driver.userClassPathFirst", "true")
  .set("spark.executor.userClassPathFirst", "true")

But got an error:

 org.apache.spark.sql.AnalysisException:  Cannot modify the value of a Spark config: spark.driver.userClassPathFirst. See also 'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'
3
  • 1
    You should probably configure Spark to use your classpath first. That's what we do. Package as usual and make sure priority is given to our classpath, not the one from Spark. userClasspathFirst. Commented Jun 4 at 20:12
  • I also tried it, and got an error: "org.apache.spark.sql.AnalysisException: Cannot modify the value of a Spark config: spark.driver.userClassPathFirst". I updated my Answer with this attempt Commented Jun 4 at 20:33
  • 1
    I would think the userClasspathFirst has to be given beforehand when launching the app. In the code it's already too late. Commented Jun 5 at 17:39

2 Answers 2

1

I found a solution that involves removing all Gson artifacts from /opt/app-root/spark/jars/.

Since the Spark application runs in a Kubernetes cluster environment, I implemented this through a simple script in the Dockerfile:

  RUN rm /opt/app-root/spark/jars/gson*.jar

This is combined with excluding Gson dependencies from all project dependencies while explicitly adding our required gson-2.10.1 version (as demonstrated in my solution).

As I understand, Spark uses the following classloader hierarchy:
Bootstrap → Extension → System ClassLoader → Executor ClassLoader → User ClassLoader

If Spark cannot find gson*.jar in the Executor ClassLoader paths (due to the script above), it will then look in the User ClassLoader and find the target version 2.10.1.

I recognize this approach might seem somewhat crude, as Spark may rely on a specific Gson version, requiring additional verification to ensure nothing breaks.

However, this method (changing the order for just one JAR) appears significantly less risky compared to the userClassPathFirst setting, which globally changes the load order for all JARs.

Sign up to request clarification or add additional context in comments.

Comments

1

You can feed options spark.driver.userClassPathFirst, spark.executor.userClassPathFirst in the following way:

/path/to/spark-3.3.0-bin-hadoop3-scala2.13/bin/spark-submit --class App --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true target/scala-2.13/sparkdemo2-assembly-0.1.0-SNAPSHOT.jar 

I guess now significant is the first one, spark.driver.userClassPathFirst.


I used simpler reproduction:

project/build.properties

sbt.version = 1.11.2

project/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1")

build.sbt

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.13.11"
//ThisBuild / scalaVersion := "2.13.10"

lazy val root = (project in file("."))
  .settings(
    name := "sparkdemo2",
    libraryDependencies ++= Seq(
      "com.datastax.spark" %% "spark-cassandra-connector" % "3.4.1" /*"3.2.0"*/, 
      ("org.apache.spark"  %% "spark-hadoop-cloud"        % "3.3.0")
        .exclude("org.apache.hadoop", "hadoop-aws"),
      "org.apache.spark"   %% "spark-sql"                 % "3.3.0" % Provided,
      "org.apache.hadoop"  %  "hadoop-aws"                % "3.3.2" % Provided,
      "org.apache.spark"   %% "spark-sql-kafka-0-10"      % "3.3.0",
      "io.kubernetes" % "client-java" % "19.0.1",
      "com.google.code.gson" % "gson" % "2.10.1",
    ),
    javaOptions += "--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED",
  )

ThisBuild / assemblyMergeStrategy := {
  case PathList("mozilla", "public-suffix-list.txt")      => MergeStrategy.first
  case PathList("META-INF", "okio.kotlin_module")         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".class"     => MergeStrategy.first
  case x =>
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

fork := true

I've noticed a couple of issues with your build.sbt.

  • spark-cassandra-connector 3.2.0 can't be used with Scala 2.13. spark-cassandra-connector 3.2.0 is published only for Scala 2.12. The first version of spark-cassandra-connector published for Scala 2.13 is 3.4.1

https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector

  • For Scala 2.13.10 with your dependencies I have error
[error] (scalaInstance) Expected `root / scalaVersion` to be 2.13.11 or later, but found 2.13.10.
[error] To support backwards-only binary compatibility (SIP-51), the Scala 2.13 compiler
[error] should not be older than scala-library on the dependency classpath.
[error] 
[error] Upgrade the `scalaVersion` to fix the build. If upgrading the Scala compiler version is
[error] not possible (for example due to a regression in the compiler or a missing dependency),
[error] this error can be demoted by setting `allowUnsafeScalaLibUpgrade := true`.

https://docs.scala-lang.org/sips/drop-stdlib-forwards-bin-compat.html

So I had to use Scala 2.13.11+. The latest Scala 2.13 is currently 2.13.16.


You can investigate classloaders at runtime with something like

import com.google.gson.Gson
import java.net.{URL, URLClassLoader}

object App {
  def main(args: Array[String]): Unit = {
    val path = classOf[Gson].getProtectionDomain.getCodeSource.getLocation.getPath
    println(s"path=$path")
    println()

    println("java.class.path:")
    System.getProperty("java.class.path")
      .split(System.getProperty("path.separator"))
      .foreach(println)

    println()

    def printUrls(classLoader: ClassLoader): Unit = {
      var cl = classLoader
      while (cl != null) {
        println(s"classloader: ${cl.getClass.getName}")
        cl match {
          case cl: URLClassLoader =>
            println("URLClassLoader urls:")
            cl.getURLs.foreach(println)
          //javaOptions += "--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED"
//          case cl: jdk.internal.loader.BuiltinClassLoader =>
//            println("BuiltinClassLoader urls:")
//            val ucpField = classOf[jdk.internal.loader.BuiltinClassLoader].getDeclaredField("ucp")
//            ucpField.setAccessible(true)
//            val ucp = ucpField.get(cl)
//            if (ucp == null) println("null")
//            else ucp.asInstanceOf[jdk.internal.loader.URLClassPath].getURLs.foreach(println)
          case cl if System.getProperty("java.version").split('.').head.toInt >= 9 &&
            Class.forName("jdk.internal.loader.BuiltinClassLoader").isAssignableFrom(cl.getClass) =>
            println("BuiltinClassLoader urls:")
            val ucpField = Class.forName("jdk.internal.loader.BuiltinClassLoader").getDeclaredField("ucp")
            ucpField.setAccessible(true)
            val ucp = ucpField.get(cl)
            if (ucp == null) println("null")
            else Class.forName("jdk.internal.loader.URLClassPath").getMethod("getURLs").invoke(ucp).asInstanceOf[Array[URL]].foreach(println)
          case _ =>
            println("not URLClassLoader or BuiltinClassLoader")
        }
        cl = cl.getParent
      }
    }

    println("class loader".toUpperCase)
    printUrls(getClass.getClassLoader)
    println
    println("system classloader".toUpperCase)
    printUrls(ClassLoader.getSystemClassLoader)
    println
    println("thread classloader".toUpperCase)
    printUrls(Thread.currentThread().getContextClassLoader)
  }
}

--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED should be added to the command line via --driver-java-options (--conf "spark.executor.extraJavaOptions=..." can be not enough, https://spark.apache.org/docs/latest/configuration.html):

/path/to/spark-3.3.0-bin-hadoop3-scala2.13/bin/spark-submit --driver-java-options "--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED" --class App --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true target/scala-2.13/sparkdemo2-assembly-0.1.0-SNAPSHOT.jar

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.