0

That is my code.

import findspark

findspark.init()
import os

os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 pyspark-shell"
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *


appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
time_format = "dd/MM/yyyy HH:mm:ss"

spark = (
    SparkSession.builder.appName("Spark Kafka Streaming").master(master).getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("tarih", StringType(), True),
        StructField("time", TimestampType(), True),
        StructField("temperature", FloatType(), True),
        StructField("pressure", FloatType(), True),
        StructField("vibration_x", FloatType(), True),
        StructField("vibration_y", FloatType(), True),
        StructField("vibration_motor", FloatType(), True),
    ]
)

lines = (
    spark.readStream.option("multiLine", True)
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "raw-data")
    .option("startingOffsets", "earliest")
    .load()
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
    .select(col("parsed_value.*"))
)
df = lines.select("*")

df2 = (
    df.withColumn("org_unit4", split(col("tarih"), ",").getItem(0))
    .withColumn("asset", split(col("tarih"), ",").getItem(1))
    .drop("tarih")
    .withColumn("time", date_format(col("time"), time_format))
    .select(
        "id",
        "org_unit4",
        "asset",
        "time",
        "temperature",
        "pressure",
        "vibration_x",
        "vibration_y",
        "vibration_motor",
    )
)

query = (
    df2.writeStream.option("truncate", False)
    .outputMode("Append")
    .format("console")
    .start()
)

query.awaitTermination()
root@115ec4500b0e:/usr/spark-2.3.1/sparks/deneme# python deneme3.py
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/spark-2.3.1/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b06fc574-6246-4c40-8953-72f8b41bcbaf;1.0
        confs: [default]
        found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 in central
        found org.apache.kafka#kafka-clients;0.10.0.1 in central
        found net.jpountz.lz4#lz4;1.3.0 in central
        found org.xerial.snappy#snappy-java;1.1.2.6 in central
        found org.slf4j#slf4j-api;1.7.16 in central
        found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 312ms :: artifacts dl 9ms
        :: modules in use:
        net.jpountz.lz4#lz4;1.3.0 from central in [default]
        org.apache.kafka#kafka-clients;0.10.0.1 from central in [default]
        org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 from central in [default]
        org.slf4j#slf4j-api;1.7.16 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   6   |   0   |   0   |   0   ||   6   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-b06fc574-6246-4c40-8953-72f8b41bcbaf
        confs: [default]
        0 artifacts copied, 6 already retrieved (0kB/11ms)
2022-10-02 12:24:22 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "deneme3.py", line 42, in <module>
    .option("startingOffsets", "earliest")
  File "/usr/spark-2.3.1/python/pyspark/sql/streaming.py", line 403, in load
    return self._df(self._jreader.load())
  File "/usr/spark-2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/spark-2.3.1/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/spark-2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o40.load.
: org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value.
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
        at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

The code works when I add spark_home on my laptop, but I need to extract it to docker environment.

This is docker images --> https://hub.docker.com/layers/gettyimages/spark/2.3.1-hadoop-3.0/images/sha256-0bc08017eb4da02b7d6260ca3e5fdff921944c4a598283f1787521c58cf368c6?context=explore Note (the spark version i am using is 2.3.1
Even if I install the environment from a different place, it should be version 2.3.1.)

1 Answer 1

0

You can pass this, but I would also suggest upgrading Spark version and Kafka dependency because this may be a bug, since RangeAssignor should be the default.

.option("kafka.partition.assignment.strategy", 
    "org.apache.kafka.clients.consumer.RangeAssignor")

You could also try including newer version of kafka-clients in PYSPARK_SUBMIT_ARGS

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.