0

I have been trying to complete a project in which I needed to send data stream using kafka to local Spark to process the incoming data. However I can not show and use the data frame in the right format. How can I resolve this issue? Here is code for Kafka:

import pandas as pd
import time
import json
from confluent_kafka import Producer

# Read CSV
df = pd.read_csv('timeseries.csv')
df = df.astype(str)

# Number of rows to send in each batch
batch_size = 100

# Kafka settings
kafka_bootstrap_servers = 'localhost:9092'
kafka_topic = 'T7'

# Create a Kafka producer
producer_config = {'bootstrap.servers': kafka_bootstrap_servers}
producer = Producer(producer_config)

# Total number of rows in the dataset
total_rows = len(df)

# Loop through the dataset and send 100 rows at a time
for i in range(0, total_rows, batch_size):
    batch = df.iloc[i:i + batch_size]

    # Select only the "id" and "value" columns
    selected_columns = batch[['id', 'date', 'value', 'label']]

    # Convert the selected columns to a JSON string
    json_data = selected_columns.to_json(orient='records')

    try:
        # Send the batch to the Kafka topic
        producer.produce(kafka_topic, key=str(batch['id'].iloc[0]), value=json_data.encode('utf-8'))
    except Exception as e:
        print(f"Error sending batch: {e}")

    # Introduce a delay of 1 second between sending each batch
    time.sleep(1)

# Wait for any outstanding messages to be delivered and delivery reports received
producer.flush()

and this is the code that I use for Spark :

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, from_json, split, explode
from pyspark.sql.types import StringType, StructType

# Define your Spark session
spark = SparkSession.builder \
    .appName("SparkConsumeKafka") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \
    .getOrCreate()

# Set log level to ERROR
spark.sparkContext.setLogLevel("ERROR")

# Read from the Kafka topic
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "T8") \
    .load()

my_schema = (
    StructType()
    .add("id", StringType())
    .add("date", StringType())
    .add("value", StringType())
    .add("label", StringType())
)

# Deserialize the JSON data using the defined schema
df = df.select(
    explode(split(col("value").cast("string"), "\n")).alias("values"), 
    "timestamp"
)

df = df.select(
    from_json(col("values"), my_schema).alias("datas"), 
    "timestamp"
)

def show_batch(df, epoch_id):
    df = df.select(
        "datas.id",
        "datas.date",
        "datas.value",
        "datas.label",
        "timestamp"
    )

    df.show(truncate=False)
    

# Write to the console with formatted options
query = df.writeStream.foreachBatch(show_batch).start()

# Await termination
query.awaitTermination()

# Stop the Spark session
spark.stop()

This is the best i could get out of it spark data frame show

1
  • Why do you need to split the data on new lines? Commented May 19, 2024 at 3:25

1 Answer 1

0

You can print the schema of your DataFrame to get more information about your input. I haven't debugged your problem, but this might help :

kafka_values = kafka_stream.selectExpr("CAST(value AS STRING) as json_string")

json_df = kafka_values.select(from_json(col("json_string"), json_schema).alias("data"))

exploded_df = json_df.select(explode(col("data")).alias("record"))

flattened_df = exploded_df.select("record.id", ...)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.