2

I'm using StructuredStreaming .. i have a pyspark dataframe which i need to write to Kafka.

Schema of the dataframe is shown below:

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = false)
 |    |-- end: timestamp (nullable = false)
 |-- processedAlarmCnt: integer (nullable = false)
 |-- totalAlarmCnt: integer (nullable = false)

I my current code, i'm converting the pyspark DataFrame to pandas, looping over each row, adding the data to a hashmap

def writeCountToKafka(df):
       if df.count()>0:
          hm = {}
          df_pandas = df.toPandas()
          for _, row in df_pandas.iterrows():
               hm["window"] = [datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
               hm["processedAlarmCnt"] = row["processedAlarmCnt"]
               hm["totalAlarmCnt"] = row["totalAlarmCnt"]
               
               # Python Kafka Producer
               kafka_producer.send(topic_count, json.dumps(mymap).encode('utf-8'))
                    kafka_producer.flush()

Few questions:

  1. How do i make this code more efficient - possibly not having to loop over every row to get the values, and store in hashmap ?

  2. Does it make sense to use StructuredStreaming Kafka Producer instead of the python KafkaProducer (import - from kafka import KafkaProducer) ? With the StructuredStreaming kafka producer (i.e. , it requires a "value", seems i cannot cast the window(struct) as value... so not sure what should be put as "value" ?

What is the best way to design/code this ?

tia!

1 Answer 1

1
  1. You don't need pandas. Spark should be able to do everything you need to transform your data. Using loops over Dataframe rows is almost always a sign you've done something wrong

  2. No, don't import KafkaProducer library; in fact, you don't need any other Python library installed to produce to Kafka. As written in the Spark Structured Streaming documentation, your dataframe needs to only contain a value column of type bytes or str (key / topic / timestamp columns are all optional).

You need to define a UDF function that accepts a Struct and serializes the three root columns into a single value (as json string, or any other type)

Sign up to request clarification or add additional context in comments.

2 Comments

thnx for the input, i was able to use the UDF to serialize the struct column into a json string .. i agree, that seems to be a better approach in this context
pls advise on this ticket as well stackoverflow.com/questions/71243726/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.