2

I am trying to use a continuous trigger with Spark Structured Streaming query. I get the error that spark consumers can't find appropriate offset while processing data. Without this trigger query runs normal (as expected).

What I do:

Reading data from Kafka topic:

val inputStream: DataStreamReader = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("subscribe", "input_topic")
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")

val inputDF: DataFrame = inputStream.load()

Writing data to Kafka topic:

val outputStream: DataStreamWriter[Row] = inputDF
      .writeStream
      .trigger(Trigger.Continuous("1 second"))
      .queryName("some_name_fresh_each_run")
      .option("checkpointLocation", "path_to_dir_fresh_each_run")
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("failOnDataLoss", value = false)
      .option("startingOffsets", "latest")
      .option("topic", "output_topic")

val streamingQuery = outputStream.start()

So I basically do nothing special - just transport input data to output topic without any transformations or invalid operations.

What I get:

In executor logs I see lots of these messages:

21/10/06 14:16:55 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894

21/10/06 14:16:55 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor-1, groupId=spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor] Resetting offset for partition input_topic-7 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.

21/10/06 14:16:55 INFO KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor-2, groupId=spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor] Seeking to offset 0 for partition input_topic-11
2
1/10/06 14:16:55 WARN KafkaDataConsumer: Some data may be lost. Recovering from the earliest offset: 0

21/10/06 14:16:55 WARN KafkaDataConsumer: 
The current available offset range is AvailableOffsetRange(0,0).
 Offset 0 is out of range, and records in [0, 9223372036854775807) will be
 skipped (GroupId: spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor, TopicPartition: input_topic-7). 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you want your streaming query to fail on such cases, set the source
 option "failOnDataLoss" to "true"

And messages like this despite the latest offset setting:

21/10/06 14:16:55 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor-3, groupId=spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor] 
Seeking to EARLIEST offset of partition input_topic-5

I know that failOnDataLoss-errors usually appear when something is wrong with the Kafka topic or its offsets are broken or data is really lost. But I deleted all data in my local Kafka broker, recreated from scratch all used topics (input_topic and output_topic), checked that there was no data in them at all and the error still appears.

What did I try (many times in different order):

  1. Change/delete checkpoint locations for streaming query.
  2. Fully clean Kafka topics and even the whole broker data.
  3. Fully delete all consumer groups.
  4. Manually resetting offset to latest in freshly created topics by running this:
bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group <copied_group_name_from_logs> \
  --reset-offsets --execute --to-latest --topic input_topic
  1. Set different time in trigger.
  2. Just wait hoping that messages will disappear (== that consumer will reach valid offset eventually).

Nothing helps :( The most confusing thing is that topics are empty, there is no data and the only available offset is 0.

What I do as workaround:

Do not use continuous trigger :(

With the same settings, same topics, same query, same Spark cluster, ... and so on, I run my application without .trigger(Trigger.Continuous("1 second")) (just no trigger which means Spark will use the default one) and everything works like a charm.

Why do I need this:

Well, I want to achieve the promising latency of ~1ms in processing streaming data with Spark.

Environment:

  • Spark version: 3.0.1
  • Scala version: 2.12.10
  • Kafka version (runs locally): 2.7.0
  • Number partitions in topics: 20

I know that this is experimental feature but I really hope that it can be used as promised. Please help!

1 Answer 1

1

The problem is that you are running this example on your local machine and you get the Warning:

21/10/06 14:16:55 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894

For Continuous processing of the data it is required that the KafkaDataConsumer can run in an UninterruptibleThread. Under the Caveats it says:

"Continuous processing engine launches multiple long-running tasks that continuously read data from sources, process it and continuously write to sinks. The number of tasks required by the query depends on how many partitions the query can read from the sources in parallel. Therefore, before starting a continuous processing query, you must ensure there are enough cores in the cluster to all the tasks in parallel. For example, if you are reading from a Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query to make progress."

Apparently you need a proper cluster to test out this experimental feature.

As a note, I tested the identical set-up locally with source and sink topics having only one partition but I still get the KafkaDataConsumer is not running in UninterruptibleThread. warning, although I have 6 cores in my CPU. Apparently, that does not work either so probably only a cluster will help out.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks a lot for your reply! I will try another setup as you note. What do you mean saying "only a cluster can help"? You mean Spark cluster having worker with enough cores? Or Kafka cluster?
Yes @roseaysina I meant a Spark cluster where the tasks are running on different physical/virtual workers.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.