similar to this question. How can I do same to write different groups of dataframe to different delta live tables? something similar to following where I am not limited by just panda dataframe. Allowing apply to pass either spark dataframe or a spark session to aggregate function.
def mycustomNotPandaAgg(key, Iterator, sparkSession|sparkDataframe):
temp_df = sparkSession.createDataFrame(Iterator) #I can apply schema here
temp_df.createOrReplaceTable("temp_df")
sparkSession.sql('insert into ... key as select * from temp_df') #key is table_name
or
sparkDataframe.writeToTable(key) #where sparkDataframe is created internally from each group and passed into this apply function
my_df.groupBy("table_name").apply(mycustomNotPandaAgg)
ps - I have already tried filter approach where I filter same dataframe for each table, get N dataframe (1 for each table) and save them. It's not efficient as data is skewed per key. even if I persist the dataframe before filter spark still launches jobs for each filters.