I am trying to use a continuous trigger with Spark Structured Streaming query. I get the error that spark consumers can't find appropriate offset while processing data. Without this trigger query runs normal (as expected).
What I do:
Reading data from Kafka topic:
val inputStream: DataStreamReader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "input_topic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
val inputDF: DataFrame = inputStream.load()
Writing data to Kafka topic:
val outputStream: DataStreamWriter[Row] = inputDF
.writeStream
.trigger(Trigger.Continuous("1 second"))
.queryName("some_name_fresh_each_run")
.option("checkpointLocation", "path_to_dir_fresh_each_run")
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("failOnDataLoss", value = false)
.option("startingOffsets", "latest")
.option("topic", "output_topic")
val streamingQuery = outputStream.start()
So I basically do nothing special - just transport input data to output topic without any transformations or invalid operations.
What I get:
In executor logs I see lots of these messages:
21/10/06 14:16:55 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/10/06 14:16:55 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor-1, groupId=spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor] Resetting offset for partition input_topic-7 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
21/10/06 14:16:55 INFO KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor-2, groupId=spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor] Seeking to offset 0 for partition input_topic-11
2
1/10/06 14:16:55 WARN KafkaDataConsumer: Some data may be lost. Recovering from the earliest offset: 0
21/10/06 14:16:55 WARN KafkaDataConsumer:
The current available offset range is AvailableOffsetRange(0,0).
Offset 0 is out of range, and records in [0, 9223372036854775807) will be
skipped (GroupId: spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor, TopicPartition: input_topic-7).
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you want your streaming query to fail on such cases, set the source
option "failOnDataLoss" to "true"
And messages like this despite the latest offset setting:
21/10/06 14:16:55 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor-3, groupId=spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor]
Seeking to EARLIEST offset of partition input_topic-5
I know that failOnDataLoss-errors usually appear when something is wrong with the Kafka topic or its offsets are broken or data is really lost. But I deleted all data in my local Kafka broker, recreated from scratch all used topics (input_topic and output_topic), checked that there was no data in them at all and the error still appears.
What did I try (many times in different order):
- Change/delete checkpoint locations for streaming query.
- Fully clean Kafka topics and even the whole broker data.
- Fully delete all consumer groups.
- Manually resetting offset to latest in freshly created topics by running this:
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group <copied_group_name_from_logs> \
--reset-offsets --execute --to-latest --topic input_topic
- Set different time in trigger.
- Just wait hoping that messages will disappear (== that consumer will reach valid offset eventually).
Nothing helps :( The most confusing thing is that topics are empty, there is no data and the only available offset is 0.
What I do as workaround:
Do not use continuous trigger :(
With the same settings, same topics, same query, same Spark cluster, ... and so on, I run my application without .trigger(Trigger.Continuous("1 second")) (just no trigger which means Spark will use the default one) and everything works like a charm.
Why do I need this:
Well, I want to achieve the promising latency of ~1ms in processing streaming data with Spark.
Environment:
- Spark version: 3.0.1
- Scala version: 2.12.10
- Kafka version (runs locally): 2.7.0
- Number partitions in topics: 20
I know that this is experimental feature but I really hope that it can be used as promised. Please help!