3

I want to extract streaming data from a Kafka cluster in batches of one hour, so I run a script every hour, having set writeStream to .trigger(once=True) and the startingOffsets set to earliest, like this:

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 
        config.get("kafka_servers")) \
    .option("subscribe", config.get("topic_list")) \
    .option("startingOffsets", "earliest") \
    .load()

df.writeStream \
    .format("parquet") \
    .option("checkpointLocation", config.get("checkpoint_path")) \
    .option("path", config.get("s3_path_raw")) \
    .trigger(once=True) 
    .partitionBy('date', 'hour') \
    .start()

But every time the script gets triggered it only writes to the S3 the messages that are coming from the Kafka cluster in that precise moment, instead of taking all the messages from the last hour as I was expecting it to do.

What might be the problem?

Edit: I should mention that the kafka cluster retention is set to 24 hours

1 Answer 1

3

Option 1:

.trigger(once=True) is supposed to process only one patch of data.

Please try to replace it with .trigger(availableNow=True)

Option 2:

To have an up and running job with a 1-hour processing interval; .trigger(processingTime='60 minutes')

Also, you will need to set the following option while reading the stream

.option("failOnDataLoss", "false")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.