3

I want to create a structured stream in databricks with a kafka source. I followed the instructions as described here. My script seems to start, however it fails with the first element of the stream. The stream itsellf works fine and produces results and works (in databricks) when I use confluent_kafka, thus there seems to be a different issue I am missing:

After the initial stream is processed, the script times out:

java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 80afdeed-9266-4db4-85fa-66ccf261aee4, 
runId = b564c626-9c74-42a8-8066-f1f16c7ab53d] failed to stop within 36000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.`

WHAT I TRIED: looking at SO and finding this answer, to which I included spark.conf.set("spark.sql.streaming.stopTimeout", 36000) into my setup - which changed nothing.

Any input is highly appreciated!

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Define a data schema
schema = StructType() \
           .add('PARAMETERS_TEXTVALUES_070_VALUES', StringType())\
           .add('ID', StringType())\
           .add('PARAMETERS_TEXTVALUES_001_VALUES', StringType())\
           .add('TIMESTAMP', TimestampType())


df = spark \
    .readStream \
    .format("kafka") \
    .option("host", "stream.xxx.com") \
    .option("port", 12345)\
    .option('kafka.bootstrap.servers', 'stream.xxx.com:12345') \
    .option('subscribe', 'stream_test.json') \
    .option("startingOffset", "earliest") \
    .load()

df_word = df.select(F.col('key').cast('string'),
                    F.from_json(F.col('value').cast('string'), schema).alias("parsed_value"))
  

df_word \
      .writeStream \
      .format("parquet") \
      .option("path", "dbfs:/mnt/streamfolder/stream/") \
      .option("checkpointLocation", "dbfs:/mnt/streamfolder/check/") \
      .outputMode("append") \
      .start()

my stream output data looks like this:

"PARAMETERS_TEXTVALUES_070_VALUES":'something'
"ID":"47575963333908"
"PARAMETERS_TEXTVALUES_001_VALUES":12345
"TIMESTAMP": "2020-10-22T15:06:42.507+02:00"

Furthermore, stream and check folders are filled with 0-b files, except for metadata, which includes the ìd from the error above.

Thanks and stay safe.

2
  • how much data is in the Kafka topic? did you also call awaitTermination at the end of your code? (it is not visible in your question) Commented Oct 23, 2020 at 10:28
  • hi @mike - as I showed, there are 4 columns with some strings in the kafka topic. I tried awaitTermination - no change, therefore I omitted it. Thank you :) Commented Oct 23, 2020 at 10:38

1 Answer 1

0

I had the same problem. I checked the driver logs and discovered this exception in the stacktrace:

org.apache.spark.SparkException: Failed to store executor broadcast spark_join_relation_3540_1455983219 (size = Some(67371008)) in BlockManager with storageLevel=StorageLevel(memory, deserialized, 1 replicas)

Based on this recommendation I raised Driver memory (16gb to 32gb in my case) and it solved the issue.

This answer on StackOverflow explains why it works.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.