1

I have big PySpark data frame that looks like this:

from pyspark.sql.functions import col, to_timestamp

data = [('2010-09-12 0', 'x1', 13), 
        ('2010-09-12 0', 'x2', 12), 
        ('2010-09-12 2', 'x3', 23), 
        ('2010-09-12 4', 'x1', 22), 
        ('2010-09-12 4', 'x2', 32), 
        ('2010-09-12 4', 'x3', 7), 
        ('2010-09-12 6', 'x3', 24),
        ('2010-09-12 16', 'x3', 34),]

columns = ['timestamp', 'category', 'value']
df =spark.createDataFrame(data=data, schema=columns)
df = df.withColumn('ts', to_timestamp(col('timestamp'), 'yyyy-MM-dd H')).drop(col('timestamp'))
df.show()

+--------+-----+-------------------+
|category|value|                 ts|
+--------+-----+-------------------+
|      x1|   13|2010-09-12 00:00:00|
|      x2|   12|2010-09-12 00:00:00|
|      x3|   23|2010-09-12 02:00:00|
|      x1|   22|2010-09-12 04:00:00|
|      x2|   32|2010-09-12 04:00:00|
|      x3|    7|2010-09-12 04:00:00|
|      x3|   24|2010-09-12 06:00:00|
|      x3|   34|2010-09-12 16:00:00|
+--------+-----+-------------------+

The timestamp in column ts is increasing at every exact 2-hour interval(for eg. 0, 2, ..., 22)

I want to extract the average, min, max, median of column value by the ts timestamp, and put these statistics into a pandas data frame as following:

import pandas as pd
import datetime

start_ts = datetime.datetime(year=2010, month=2, day=1, hour=0)
end_ts = datetime.datetime(year=2022, month=6, day=1, hour=22)
ts                      average   min    max   median 
...
2010-09-12 00:00:00     12.5      12     13    12.5
2010-09-12 02:00:00     23        23     23    23
2010-09-12 04:00:00     20.3      7      32    22
2010-09-12 06:00:00     24        24     24    24
2010-09-12 16:00:00     34        34     34    34
...

What would be an economical way to do this, minimizing the number of iterations over the pyspark dataframe?

2 Answers 2

1

Aggregate then convert the result into pandas:

from pyspark.sql import functions as F

df1 = df.groupby("ts").agg(
    F.avg("value").alias("average"),
    F.min("value").alias("min"),
    F.max("value").alias("max"),
    F.percentile_approx("value", 0.5).alias("median")
)

result = df1.toPandas()

#                    ts    average  min  max  median
# 0 2010-09-12 00:00:00  12.500000   12   13      12
# 1 2010-09-12 02:00:00  23.000000   23   23      23
# 2 2010-09-12 04:00:00  20.333333    7   32      22
# 3 2010-09-12 06:00:00  24.000000   24   24      24
# 4 2010-09-12 16:00:00  34.000000   34   34      34
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks. Both solutions look similar and I appreciate that your response was the first. There was some additional useful info in ZygD's response that I could use.
0

The following should calculate the accurate median, but you shouldn't be using accurate median for very big groups of data.

Also, you can filter data without datetime module.

from pyspark.sql import functions as F
df = (df
    .filter(F.col('ts').between('2010-02-01', '2022-06-01'))
    .groupBy('ts').agg(
        F.round(F.mean('value'), 1).alias('average'),
        F.min('value').alias('min'),
        F.max('value').alias('max'),
        F.expr('percentile(value, .5)').alias('median'),
    )
)
pdf = df.toPandas()
print(pdf)
#                    ts  average  min  max  median
# 0 2010-09-12 02:00:00     23.0   23   23    23.0
# 1 2010-09-12 00:00:00     12.5   12   13    12.5
# 2 2010-09-12 06:00:00     24.0   24   24    24.0
# 3 2010-09-12 16:00:00     34.0   34   34    34.0
# 4 2010-09-12 04:00:00     20.3    7   32    22.0

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.