I come here to expose my problem:
I'm currently working on a python repository for timeseries analysis. In my repo, I create different transform functions.
In a particular code, I have an input that is a dataset with huge amount of data (almost 2To so far). What I can say, is that this dataset (or dataframe) is written with the following code:
merged = spark.createDataFrame(df_data.rdd, union_schema) merged_output.write_dataframe(
merged.repartitionByRange('data_id').sortWithinPartitions('data_id', 'timestamp'),
output_format='soho',
options={'noho': 'true'})
When using a preview of this recorded dataset, I can see that it is cut into 5 billions+ files.
My issue is that, when using this recorded dataframe as an input of my transform function, I just want to filter in, to get for instance the min and max timestamps, and it seems it gives me wrong values (like it was doing it only on the last file (or partition?). I know it is wrong because I get true values when using an SQL preview module.
The code I'm using to get those timestamp values:
start_time_input_df = input_df.select('timestamp').sort(F.col('timestamp')).head(1)[0][0]
end_time_input_df = input_df.select('timestamp').sort(F.col('timestamp').desc()).head(1)[0][0]
I expect getting true min and max timestamp values in the whole dataframe. But it is not the case.
Do you what is going wrong? thanks a lot in advance!
EDIT: solution found to solve this issue:
result = input_df.agg(F.min("timestamp").alias("min_timestamp"), F.max("timestamp").alias("max_timestamp")).collect()[0]
min_timestamp = result["min_timestamp"]
max_timestamp = result["max_timestamp"]
data_id?