0

I come here to expose my problem:

I'm currently working on a python repository for timeseries analysis. In my repo, I create different transform functions.

In a particular code, I have an input that is a dataset with huge amount of data (almost 2To so far). What I can say, is that this dataset (or dataframe) is written with the following code:

merged = spark.createDataFrame(df_data.rdd, union_schema) merged_output.write_dataframe(
        merged.repartitionByRange('data_id').sortWithinPartitions('data_id', 'timestamp'),
        output_format='soho',
        options={'noho': 'true'})

When using a preview of this recorded dataset, I can see that it is cut into 5 billions+ files.

My issue is that, when using this recorded dataframe as an input of my transform function, I just want to filter in, to get for instance the min and max timestamps, and it seems it gives me wrong values (like it was doing it only on the last file (or partition?). I know it is wrong because I get true values when using an SQL preview module.

The code I'm using to get those timestamp values:

start_time_input_df = input_df.select('timestamp').sort(F.col('timestamp')).head(1)[0][0]
end_time_input_df = input_df.select('timestamp').sort(F.col('timestamp').desc()).head(1)[0][0]

I expect getting true min and max timestamp values in the whole dataframe. But it is not the case.

Do you what is going wrong? thanks a lot in advance!

EDIT: solution found to solve this issue:

result = input_df.agg(F.min("timestamp").alias("min_timestamp"), F.max("timestamp").alias("max_timestamp")).collect()[0] 
min_timestamp = result["min_timestamp"] 
max_timestamp = result["max_timestamp"] 
4
  • this is going to be difficult for SO users to reproduce, so you'll probably need to debug further with your data. what happens if you get the min and max timestamp for each data_id? Commented Jun 7 at 0:08
  • Hello Derek, thank you for your feedback. I have red some documents related to my working env (transform, incremental, timeseries soho format, etc). It seems, things should be transparent when dealing with such env. I have isolated a part of my code that doesn't work (where I filter a timeseries partioned dataframe in a range of time) in another tranform python with same configuration, and strangely, it works! So now, I try to found out why it doesn't work in my main code. Commented Jun 13 at 10:36
  • so to summarize, I have got two issues: 1) the min and max values of a partitioned dataframe where not as expected this was corrected by doing so: result = input_df.agg(F.min("timestamp").alias("min_timestamp"),\ F.max("timestamp").alias("max_timestamp")).collect()[0] min_timestamp = result["min_timestamp"] max_timestamp = result["max_timestamp"] 2) I try to filter a partitioned dataframe in a given time range doing so: input_filtered_df = input_df.filter((F.col("timestamp") >= start_time) \ & (F.col("timestamp") <= end_time)) it doesn't work all the time Commented Jun 13 at 11:54
  • can you edit your question with this additional information? thanks Commented Jun 13 at 17:47

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.