0

I am using Spark2.3.0 and kafka1.0.0.3. I have created a spark read stream

df = spark.readStream. \
        format("kafka"). \
        option("kafka.bootstrap.servers", "localhost.cluster.com:6667"). \
        option("subscribe", "test_topic"). \
        load(). \
        selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)")

It runs succesfully and then

df_write = df_read2 \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)",  "CAST(timestamp as TIMESTAMP)") \
        .writeStream \
        .format("csv") \
        .option("path", "/test_streaming_data") \
        .option("checkpointLocation", "test_streaming_data/checkpoint") \
        .start()

But when I run this

df_write.awaitTermination()

It throws me an error:

    Py4JJavaError: An error occurred while calling o264.awaitTermination.
: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
=== Streaming Query ===
Identifier: [id = c140e21c-f827-4b1d-9182-b3f68a405fad, runId = 47d4b5cb-f223-4235-bef1-84871a2f85c8]
Current Committed Offsets: {}
Current Available Offsets: {KafkaSource[Subscribe[test_topic]]: {"test_topic":{"0":31300}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [cast(key#21 as string) AS key#124, cast(value#22 as string) AS value#125, cast(timestamp#23 as timestamp) AS timestamp#126]
+- Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22, cast(timestamp#12 as timestamp) AS timestamp#23]
   +- StreamingExecutionRelation KafkaSource[Subscribe[test_topic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
    at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    ... 1 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 11, localhost.cluster.com, executor 2): java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaSourceRDDPartition

Can anyone please help me sorting out the issue?

I have tried replacing the jar libraries with updated ones, but yet the issue persists.

1 Answer 1

0

tried replacing the jar libraries with updated ones

Unclear what you are doing with this, but you should never modify any JAR files directly.

Use --packages option when you run the app. I.e. for latest Spark 2.3.x, you need this package

spark-submit --master=local \
  --packages='org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4' \
  app.py

I have a Jupyter example here - https://github.com/OneCricketeer/docker-stacks/blob/master/hadoop-spark/spark-notebooks/kafka-sql.ipynb

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.