2

I trying use spark structure streaming with kafka and i have problem when use spark submit, Consumer still receive data from produce but Spark Structure is error. Please help me find issue on my code Here my code in test.py:

from kafka import KafkaProducer
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('stream_test').getOrCreate()
import random

producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
for i in range(0,100):
    lg_value = str(random.uniform(5000, 10000))
    producer.send(topic = 'test', value = bytes(lg_value, encoding='utf-8'))
    producer.flush()

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092") \
    .option("subscribe","test").load()
df_to_string = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
print("done")

when i run : spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 test.py terminal output:

> 20/07/12 19:39:09 INFO Executor: Starting executor ID driver on host
> 192.168.31.129 20/07/12 19:39:09 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on
> port 38885. 20/07/12 19:39:09 INFO NettyBlockTransferService: Server
> created on 192.168.31.129:38885 20/07/12 19:39:09 INFO BlockManager:
> Using org.apache.spark.storage.RandomBlockReplicationPolicy for block
> replication policy 20/07/12 19:39:09 INFO BlockManagerMaster:
> Registering BlockManager BlockManagerId(driver, 192.168.31.129, 38885,
> None) 20/07/12 19:39:09 INFO BlockManagerMasterEndpoint: Registering
> block manager 192.168.31.129:38885 with 413.9 MiB RAM,
> BlockManagerId(driver, 192.168.31.129, 38885, None) 20/07/12 19:39:09
> INFO BlockManagerMaster: Registered BlockManager
> BlockManagerId(driver, 192.168.31.129, 38885, None) 20/07/12 19:39:09
> INFO BlockManager: Initialized BlockManager: BlockManagerId(driver,
> 192.168.31.129, 38885, None) 20/07/12 19:39:11 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of
> spark.sql.warehouse.dir ('file:/home/thoaint2/spark-warehouse').
> 20/07/12 19:39:11 INFO SharedState: Warehouse path is
> 'file:/home/thoaint2/spark-warehouse'. Traceback (most recent call
> last):   File "/home/thoaint2/test.py", line 15, in <module>
>     df = spark.readStream.format("kafka").option('kafka.bootstrap.servers','localhost:9092')
> \   File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 420, in load   File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__   File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 131, in deco   File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error
> occurred while calling o31.load. : java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArraySerializer     at
> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:557)
>   at
> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
>   at
> org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:325)
>   at
> org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:70)
>   at
> org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:220)
>   at
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:112)
>   at
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:112)
>   at
> org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
>   at
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:205)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)     at
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)  at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)    at
> py4j.Gateway.invoke(Gateway.java:282)     at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)   at
> py4j.GatewayConnection.run(GatewayConnection.java:238)    at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.ClassNotFoundException:
> org.apache.kafka.common.serialization.ByteArraySerializer     at
> java.net.URLClassLoader.findClass(URLClassLoader.java:382)

1 Answer 1

2

NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer

This package is part of kafka-clients JAR, which you'll want to add to your --packages. e.g. spark-submit ... --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.kafka:kafka-clients:<<version>>


Also note that Spark works as a producer as well, so you don't need a different Python Kafka library.

If you simply want to process Kafka Streams without using a JVM then look into Faust

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.