I want to extract streaming data from a Kafka cluster in batches of one hour, so I run a script every hour, having set writeStream to .trigger(once=True) and the startingOffsets set to earliest, like this:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config.get("kafka_servers")) \
.option("subscribe", config.get("topic_list")) \
.option("startingOffsets", "earliest") \
.load()
df.writeStream \
.format("parquet") \
.option("checkpointLocation", config.get("checkpoint_path")) \
.option("path", config.get("s3_path_raw")) \
.trigger(once=True)
.partitionBy('date', 'hour') \
.start()
But every time the script gets triggered it only writes to the S3 the messages that are coming from the Kafka cluster in that precise moment, instead of taking all the messages from the last hour as I was expecting it to do.
What might be the problem?
Edit: I should mention that the kafka cluster retention is set to 24 hours