I have been trying to complete a project in which I needed to send data stream using kafka to local Spark to process the incoming data. However I can not show and use the data frame in the right format. How can I resolve this issue? Here is code for Kafka:
import pandas as pd
import time
import json
from confluent_kafka import Producer
# Read CSV
df = pd.read_csv('timeseries.csv')
df = df.astype(str)
# Number of rows to send in each batch
batch_size = 100
# Kafka settings
kafka_bootstrap_servers = 'localhost:9092'
kafka_topic = 'T7'
# Create a Kafka producer
producer_config = {'bootstrap.servers': kafka_bootstrap_servers}
producer = Producer(producer_config)
# Total number of rows in the dataset
total_rows = len(df)
# Loop through the dataset and send 100 rows at a time
for i in range(0, total_rows, batch_size):
batch = df.iloc[i:i + batch_size]
# Select only the "id" and "value" columns
selected_columns = batch[['id', 'date', 'value', 'label']]
# Convert the selected columns to a JSON string
json_data = selected_columns.to_json(orient='records')
try:
# Send the batch to the Kafka topic
producer.produce(kafka_topic, key=str(batch['id'].iloc[0]), value=json_data.encode('utf-8'))
except Exception as e:
print(f"Error sending batch: {e}")
# Introduce a delay of 1 second between sending each batch
time.sleep(1)
# Wait for any outstanding messages to be delivered and delivery reports received
producer.flush()
and this is the code that I use for Spark :
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, from_json, split, explode
from pyspark.sql.types import StringType, StructType
# Define your Spark session
spark = SparkSession.builder \
.appName("SparkConsumeKafka") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \
.getOrCreate()
# Set log level to ERROR
spark.sparkContext.setLogLevel("ERROR")
# Read from the Kafka topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "T8") \
.load()
my_schema = (
StructType()
.add("id", StringType())
.add("date", StringType())
.add("value", StringType())
.add("label", StringType())
)
# Deserialize the JSON data using the defined schema
df = df.select(
explode(split(col("value").cast("string"), "\n")).alias("values"),
"timestamp"
)
df = df.select(
from_json(col("values"), my_schema).alias("datas"),
"timestamp"
)
def show_batch(df, epoch_id):
df = df.select(
"datas.id",
"datas.date",
"datas.value",
"datas.label",
"timestamp"
)
df.show(truncate=False)
# Write to the console with formatted options
query = df.writeStream.foreachBatch(show_batch).start()
# Await termination
query.awaitTermination()
# Stop the Spark session
spark.stop()
