2

I use Databricks Auto Loader to ingest files that contain data with different schemas and want to write them in corresponding delta tables using update mode.

There may be many (>15) different message types in a stream, so that I'd have to write an output stream for very one of them. There is an "upsert" function for every table.

Can this be condensed using a function (example given below) that will save a few keystrokes?

upload_path = '/example'

# Set up the stream to begin reading incoming files from the
# upload_path location.
df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'avro') \
  .load(upload_path)

# filter messages and apply JSON schema
table1_df = filter_and_transform(df, json_schema1)
table2_df = filter_and_transform(df, json_schema2)
table3_df = filter_and_transform(df, json_schema3)

# each table has it's own upsert function
def create_output_stream(df, table_name, upsert_function):
    # Create stream and return it.
    return df.writeStream.format('delta') \
         .writeStream \
         .trigger(once=True) \
         .format("delta") \
         .foreachBatch(upsert_function) \
         .queryName(f"autoLoader_query_{table_name}") \
         .option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
         .outputMode("update")

output_stream1 = create_output_stream(table1_df, "table_name1", upsert_function1).start() # start stream in outer environment
output_stream2 = create_output_stream(table2_df, "table_name2", upsert_function2).start()
output_stream3 = create_output_stream(table3_df, "table_name3", upsert_function3).start()


1 Answer 1

1

Yes, of course it's possible to do it this way - it's quite a standard pattern.

But you need to take one thing into a consideration - if your input data isn't partitioned by the message type, then you will scan same files multiple times (for each message type). Alternative to it could be following - you perform filtering & upsert of all message types using the single foreachBatch, like this:

df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'avro') \
  .load(upload_path)

def do_all_upserts(df, epoch):
  df.cache()
  table1_df = filter_and_transform(df, json_schema1)
  table2_df = filter_and_transform(df, json_schema2)
  table3_df = filter_and_transform(df, json_schema3)
  # really you can run multiple writes using multithreading, or something like it
  do_upsert(table1_df)
  do_upsert(table2_df)
  ...
  # free resources
  df.unpersist()

df.writeStream.format('delta') \
         .writeStream \
         .trigger(once=True) \
         .format("delta") \
         .foreachBatch(do_all_upserts) \
         .option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
         .start()
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for pointing out the recomputation issue. What about the checkpointLocation? Using one function to upsert many tables, one would only need one checkpointLocation for the output stream, not for each table, right?
yes, one per output stream

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.