0

i am trying to use kafka as streamer and use spark to process data

config:

  • python3.9

  • Kubuntu 21.10

  • echo $JAVA_HOME : /usr/lib/jvm/java-8-openjdk-amd64

  • echo $SPARK_HOME: /opt/spark

  • spark version: 3.2.0

  • pyspark version: pyspark-3.2.1-py2.py3

  • downloaded kafka version: kafka_2.13-3.1.0.tgz
    kafka status:

  • :~$ sudo systemctl status kafka
    kafka.service - Apache Kafka Server
    Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
    Active: active (running) since Sat 2022-01-29 19:02:18 +0330; 4s ago
    Docs: http://kafka.apache.org/documentation.html
    Main PID: 5271 (java)
    Tasks: 74 (limit: 19017)
    Memory: 348.7M
    CPU: 5.188s

my python program:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import os
import findspark as fs
fs.init()


spark_version = '3.2.0'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_3.1.0:{}'.format(spark_version)
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

# os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] pyspark-shell"

kafka_topic_name = "bdCovid19"

kafka_bootstrap_servers = 'localhost:9092'

if __name__ == "__main__":
    print("Welcome to DataMaking !!!")
    print("Stream Data Processing Application Started ...")
    print(time.strftime("%Y-%m-%d %H:%M:%S"))

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
        .master("local[*]") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    # Construct a streaming DataFrame that reads from test-topic
    orders_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

running on pycharm

Error:

raise RuntimeError("Java gateway process exited before sending its port number") RuntimeError: Java gateway process exited before sending its port number

in this line: spark = SparkSession \

IF i remove os.environ lines from the code that error disaper but a got this :

raise converted from None pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

in this line: orders_df = spark \

I have read these:

  1. Pyspark: Exception: Java gateway process exited before sending the driver its port number

  2. Creating sparkContext on Google Colab gives: RuntimeError: Java gateway process exited before sending its port number

  3. Spark + Python - Java gateway process exited before sending the driver its port number?

  4. Exception: Java gateway process exited before sending the driver its port number #743

  5. Pyspark: Exception: Java gateway process exited before sending the driver its port number

  6. Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

  7. pyspark.sql.utils.AnalysisException: Failed to find data source: kafka

none of them worked for me! any suggestions?

2
  • 1
    1) The PYSPARK_SUBMIT_ARGS should be assigned before findspark and before spark are imported. 2) You're using the wrong version of the kafka package, should match your Spark version 3) you can add --packages line when you actually run spark-submit, or modify the environment variables in Pycharm rather than code to set PYSPARK_SUBMIT_ARGS Commented Jan 30, 2022 at 8:01
  • could you tell me what version# of kafka should i use? Commented Jan 30, 2022 at 20:55

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.