15

This error has been the hardest to trace. I am not sure what is going on. I am running a Spark cluster on my location machine. so the entire spark cluster is under one host which is 127.0.0.1 and I run on a standalone mode

JavaPairRDD<byte[], Iterable<CassandraRow>> cassandraRowsRDD= javaFunctions(sc).cassandraTable("test", "hello" )
   .select("rowkey", "col1", "col2", "col3",  )
   .spanBy(new Function<CassandraRow, byte[]>() {
        @Override
        public byte[] call(CassandraRow v1) {
            return v1.getBytes("rowkey").array();
        }
    }, byte[].class);

Iterable<Tuple2<byte[], Iterable<CassandraRow>>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE
Tuple2<byte[], Iterable<CassandraRow>> tuple = listOftuples.iterator().next();
byte[] partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
    System.out.println("************START************");
    System.out.println(new String(partitionKey));
    System.out.println("************END************");
}

This error has been the hardest to trace. It clearly happens at cassandraRowsRDD.collect() and I dont know why?

16/10/09 23:36:21 ERROR Executor: Exception in task 2.3 in stage 0.0 (TID 21)
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Here are the versions I use

Scala code runner version 2.11.8  // when I run scala -version or even ./spark-shell


compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0'
compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11' version: '2.0.0-M3': 

my gradle file looks like this after introducing something called "provided" which actually doesn't seem to exist but google said to create one so my build.gradle looks like this

group 'com.company'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'idea'

repositories {
    mavenCentral()
    mavenLocal()
}

configurations {
    provided
}
sourceSets {
    main {
        compileClasspath += configurations.provided
        test.compileClasspath += configurations.provided
        test.runtimeClasspath += configurations.provided
    }
}

idea {
    module {
        scopes.PROVIDED.plus += [ configurations.provided ]
    }
}

dependencies {
    compile 'org.slf4j:slf4j-log4j12:1.7.12'
    provided group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.0.0'
    provided group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.0.0-M3'
}



jar {
    from { configurations.provided.collect { it.isDirectory() ? it : zipTree(it) } }
   // with jar
    from sourceSets.test.output
    manifest {
        attributes 'Main-Class': "com.company.batchprocessing.Hello"
    }
    exclude 'META-INF/.RSA', 'META-INF/.SF', 'META-INF/*.DSA'
    zip64 true
}
1

6 Answers 6

18

I had the same issue and could resolve it by adding my application's jar to spark's classpath with

spark = SparkSession.builder()
        .appName("Foo")
        .config("spark.jars", "target/scala-2.11/foo_2.11-0.1.jar")
Sign up to request clarification or add additional context in comments.

3 Comments

This worked for me, but why is this necessary? Does anyone know? I would expect spark to automatically figure this out, if it was really needed.
I think that whenever you do any kind of map operation using a lambda which is referring to methods/classes of your project, you need to supply them as an additional jar. Spark does serializes the lambda itself, but is not pulling together its dependencies. Not sure why the error message is not informative at all.
I fixed it by separating Main app code from uber JAR and providing it via extraClassPath: .config("spark.executor.extraClassPath","/app/lib/original-SparkApp-1.0-SNAPSHOT.jar")
6

I have hit the same exception and have dig into multiple related Jiras (9219, 12675, 18075).

I believe that the exception name is confusing, and the real problem is the inconsistent environment settings between the spark cluster and the driver application.

For example, I started my Spark cluster with the following line in conf/spark-defaults.conf:

spark.master                     spark://master:7077

while I started my driver program (even the program is started with spark-submit) with a line:

sparkSession.master("spark://<master ip>:7077")

in which the <master ip> is the correct IP address of the node master, but the program would fail due to this simple inconsistency.

As a result, I would recommend that all driver applications are started with spark-submit and do not duplicate any configuration in the driver code (unless you need to override some config). Namely, just let the spark-submit set your environment with the same way in the running Spark cluster.

Comments

3

In my case I had to add spark-avro jar (I put it into /lib folder next to main jar):

SparkSession spark = SparkSession.builder().appName("myapp").getOrCreate();
...
spark.sparkContext().addJar("lib/spark-avro_2.11-4.0.0.jar");

1 Comment

I had to use spark.sparkContext.addJar("lib/spark-avro_2.11-4.0.0.jar"); in my case instead (brackets removed).
1

You call() method should return byte[] like below.

@Override
public byte[] call(CassandraRow v1) {
  return v1.getBytes("rowkey").array();
}

If you still get the issue then check the versions of your dependencies as mentioned in Jira https://issues.apache.org/jira/browse/SPARK-9219

5 Comments

Hi! Sorry, I did have the .array() and I just updated the question. Looks like I screwed up somewhere in pasting my code here but it should be good now.
I saw that link as well and I couldn't figure out what is going there thats why I pasted all my versions that I am using. I am using Java 8 so I dont really know scala stuff and I dont understand what marking the library "provided" means
I tested your code and it is working fine in standalone mode with spark 2.0.0. Try clean your build environment, rebuild and test. "provided" dependency means jar will be available at run-time. Please check maven.apache.org/guides/introduction/…
Did you mark them as provided? If so, which libraries from the ones above you marked them provided?
I am running the spark java program in eclipse using maven dependencies so I am not marking them as provided. If you want to run your build jar in cluster using spark-submit and want to use spark provided jars then you can mark them as provided. Please check the jar files and versions in your build environment and cluster environment.
0

Check you code - In Intellij: Analyze... -> Inspect code. If you have deprecated methods related to serialisation fix it. Or simply try to reduce Spark o Scala version. In my case I reduce Scala version to 2.10 and all worked.

Comments

0

I had the same issue while running my job in eclipse on one of the nodes in spark cluster which is ubuntu box. I created the UDF as a separate java class. While running spark locally everything is fine but turning to yarn throws the same exception as in the question.

I solved it by putting the path of the generated classes to spark classpath which includes the UDF class similar to Holger Brandl adviced.

I created a variable for classpath :

String cpVar = "..../target/classes"

and added to spark as config variable :

.config("spark.driver.extraClassPath", cpVar)
.config("spark.executorEnv.CLASSPATH", cpVar)

EDIT:

Adding the path to classpath solves only for the driver node and the other nodes in the cluster are still may have the same error. The final solution I got is putting the generated classes to hdfs after every build and setting the classpath to hdfs folder for spark as below.

sparkSession.sparkContext().addJar("hdfs:///user/.../classes");

Please see the answer of TheMP

2 Comments

do you know if specifying maven repo would work instead of creating JAR everytime?
classes are generated on every submit thus you need to replace every time

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.