5

I'm trying to do a spark streaming job with Kafka but I have a problem when I execute my class using Eclipse

In my main class "JavaDirectKafkaWordCount.class" I created my JavaInputDStream with my kafka params and I'm trying to count the number of words readed from kafka topic

    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
        .reduceByKey((i1, i2) -> i1 + i2);
lines.print();
    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }

I'm getting this error

17/11/13 00:20:33 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.io.IOException: unexpected exception type 
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1582)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1154)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)                                Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1148)    Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at start.JavaDirectKafkaWordCount.$deserializeLambda$(JavaDirectKafkaWordCount.java:1)
... 37 more

How can I solve this problem?

5
  • Can you check this :SPACE.split(x) . Should not it be x.split(" ") Commented Nov 13, 2017 at 2:11
  • Thank you Sourav for you comment I change it but I still have the same error ! Commented Nov 13, 2017 at 7:31
  • what spark version are you using? Commented Nov 13, 2017 at 22:46
  • I have installed spark version version 2.2.0 , in my maven dependances I'm using spark-core_2.11 and spark-streaming_2.11 version 2.1.1 Commented Nov 14, 2017 at 4:27
  • It is showing error on line 1 "JavaDirectKafkaWordCount.java:1" . Could you please post the complete java file? Commented Nov 14, 2017 at 9:53

2 Answers 2

6

Change this line:

JavaDStream<String> lines = messages.map(ConsumerRecord::value);

to

JavaDStream<String> lines = messages.map(x -> x.value());
Sign up to request clarification or add additional context in comments.

1 Comment

add some description
1

Build a Uber jar with all the dependencies, Below is the pom.xml for spark 2.2.0, if you have a different version, change spark.version property accordingly.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.venk.exercise</groupId>
    <artifactId>test_exercise</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <spark.version>2.2.0</spark.version>
        <spark.kafka.version>2.2.0</spark.kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>

    </build>
</project>

Once you change your pom.xml, run the command mvn clean compile assembly:single and then submit the job with the below command

bin/spark-submit  --class edu.hw.test.SparkStreamingKafkaConsumer jar/test_exercise-0.0.1-SNAPSHOT-jar-with-dependencies.jar <application-arguments>

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.