1

I'm trying to implement a continuous data generator from Databricks to an Event Hub.

My idea was to generate some data in a .csv file and then create a data frame with the data. In a loop I call a function that executes a query to stream that data to the Event Hub. Not sure if the idea was good or if spark can handle writing from the same data frame or if I understood correctly how queries work.

The code looks like this:

def write_to_event_hub(
    df: DataFrame,
    topic: str,
    bootstrap_servers: str,
    config: str,
    checkpoint_path: str,
):

    return (
        df.writeStream.format("kafka")
        .option("topic", topic)
        .option("kafka.bootstrap.servers", bootstrap_servers)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.jaas.config", config)
        .option("checkpointLocation", checkpoint_path)
        .trigger(once=True)
        .start()
    )


while True:
    query = write_to_event_hub(
        streaming_df,
        topic,
        bootstrap_servers,
        sasl_jaas_config,
        "/checkpoint",
    )
    query.awaitTermination()
    print("Wrote once")
    time.sleep(5)


I want to mention that this is how I read data from the CSV file (I have it in DBFS) and I also have the schema for it:

streaming_df = (
    spark.readStream.format("csv")
    .option("header", "true")
    .schema(location_schema)
    .load(f"{path}")
)

It looks like no data is written event though I have the message "Wrote once" printed. Any ideas how to handle this? Thank you!

1 Answer 1

1

The problem is that you're using readStream to get the CSV data, so it will wait until new data will be pushed to the directory with CSV files. But really, you don't need to use readStream/writeStream - Kafka connector works just fine in batch mode, so your code should be:

df = read_csv_file()
while True:
  write_to_kafka(df)
  sleep(5)
Sign up to request clarification or add additional context in comments.

2 Comments

It works but now I try to figure out if I read the data in the kafka format in another notebook how do I convert the data from the kafka format back into my schema
it depends on which format have you used when writing data into Kafka - often people use JSON, Avro, or something else. Kafka itself doesn't know about data - both keys & values are binary values, and format of the actual data is a contract between producers & consumers

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.