1

I have the following PySpark dataframe and I want to find percentile row-wise.

value   col_a     col_b  col_c
row_a   5.0        0.0   11.0
row_b   3394.0     0.0   4543.0
row_c   136111.0   0.0   219255.0
row_d   0.0        0.0   0.0
row_e   0.0        0.0   0.0
row_f   42.0       0.0   54.0

I want to add a new column in the main dataframe like below.

value   col_a     col_b  col_c      25%
row_a   5.0        0.0   11.0        2.5
row_b   3394.0     0.0   4543.0      1697
row_c   136111.0   0.0   219255.0    68055
row_d   0.0        0.0   0.0         0.0
row_e   0.0        0.0   0.0         0.0
row_f   42.0       0.0   54.0        21.0

In pandas, I could use this:

df['25%']=df.quantile(0.25,axis=1)
1
  • You could check the revised answer Commented Apr 15, 2022 at 21:43

1 Answer 1

0

You could do it utilizing Spark's pandas_udf - vectorized (well-performing) UDF version.

from pyspark.sql import functions as F
import pandas as pd

@F.pandas_udf('double')
def quantile(q: pd.Series, s: pd.DataFrame) -> pd.Series:
    return s.quantile(q[0], axis=1)

Test:

df = spark.createDataFrame(
    [('row_a', 5.0, 0.0, 11.0),
     ('row_b', 3394.0, 0.0, 4543.0),
     ('row_c', 136111.0, 0.0, 219255.0),
     ('row_d', 0.0, 0.0, 0.0),
     ('row_e', 0.0, 0.0, 0.0),
     ('row_f', 42.0, 0.0, 54.0)],
    ['value', 'col_a', 'col_b', 'col_c']
)

df = df.withColumn('25%', quantile(F.lit(0.25), F.struct('col_a', 'col_b', 'col_c')))

df.show()
df.explain()
# +-----+--------+-----+--------+-------+
# |value|   col_a|col_b|   col_c|    25%|
# +-----+--------+-----+--------+-------+
# |row_a|     5.0|  0.0|    11.0|    2.5|
# |row_b|  3394.0|  0.0|  4543.0| 1697.0|
# |row_c|136111.0|  0.0|219255.0|68055.5|
# |row_d|     0.0|  0.0|     0.0|    0.0|
# |row_e|     0.0|  0.0|     0.0|    0.0|
# |row_f|    42.0|  0.0|    54.0|   21.0|
# +-----+--------+-----+--------+-------+
#
# == Physical Plan ==
# *(2) Project [value#526, col_a#527, col_b#528, col_c#529, pythonUDF0#563 AS 25%#535]
# +- ArrowEvalPython [quantile(0.25, struct(col_a, col_a#527, col_b, col_b#528, col_c, col_c#529))], [pythonUDF0#563], 200
#    +- *(1) Scan ExistingRDD[value#526,col_a#527,col_b#528,col_c#529]

Inspiration


Or create quantile function from scratch (no UDF of any type).

from pyspark.sql import functions as F
import math

def quantile(q, *cols):
    if q < 0 or q > 1:
        raise ValueError("Parameter q should be 0 <= q <= 1")

    if not cols:
        raise ValueError("List of columns should be provided")

    idx = (len(cols) - 1) * q
    i = math.floor(idx)
    j = math.ceil(idx)
    fraction = idx - i

    arr = F.array_sort(F.array(*cols))

    return arr.getItem(i) + (arr.getItem(j) - arr.getItem(i)) * fraction
df = spark.createDataFrame(
    [('row_a', 5.0, 0.0, 11.0),
     ('row_b', 3394.0, 0.0, 4543.0),
     ('row_c', 136111.0, 0.0, 219255.0),
     ('row_d', 0.0, 0.0, 0.0),
     ('row_e', 0.0, 0.0, 0.0),
     ('row_f', 42.0, 0.0, 54.0)],
    ['value', 'col_a', 'col_b', 'col_c']
)

df = df.withColumn('0.25%', quantile(0.25, 'col_a', 'col_b', 'col_c'))

df.show()
df.explain()
# +-----+--------+-----+--------+-------+
# |value|   col_a|col_b|   col_c|  0.25%|
# +-----+--------+-----+--------+-------+
# |row_a|     5.0|  0.0|    11.0|    2.5|
# |row_b|  3394.0|  0.0|  4543.0| 1697.0|
# |row_c|136111.0|  0.0|219255.0|68055.5|
# |row_d|     0.0|  0.0|     0.0|    0.0|
# |row_e|     0.0|  0.0|     0.0|    0.0|
# |row_f|    42.0|  0.0|    54.0|   21.0|
# +-----+--------+-----+--------+-------+

# == Physical Plan ==
# Project [value#564, col_a#565, col_b#566, col_c#567, (array_sort(array(col_a#565, col_b#566, col_c#567), lambdafunction(if ((isnull(lambda left#573) AND isnull(lambda right#574))) 0 else if (isnull(lambda left#573)) 1 else if (isnull(lambda right#574)) -1 else if ((lambda left#573 < lambda right#574)) -1 else if ((lambda left#573 > lambda right#574)) 1 else 0, lambda left#573, lambda right#574, false))[0] + ((array_sort(array(col_a#565, col_b#566, col_c#567), lambdafunction(if ((isnull(lambda left#575) AND isnull(lambda right#576))) 0 else if (isnull(lambda left#575)) 1 else if (isnull(lambda right#576)) -1 else if ((lambda left#575 < lambda right#576)) -1 else if ((lambda left#575 > lambda right#576)) 1 else 0, lambda left#575, lambda right#576, false))[1] - array_sort(array(col_a#565, col_b#566, col_c#567), lambdafunction(if ((isnull(lambda left#577) AND isnull(lambda right#578))) 0 else if (isnull(lambda left#577)) 1 else if (isnull(lambda right#578)) -1 else if ((lambda left#577 < lambda right#578)) -1 else if ((lambda left#577 > lambda right#578)) 1 else 0, lambda left#577, lambda right#578, false))[0]) * 0.5)) AS 0.25%#572]
# +- *(1) Scan ExistingRDD[value#564,col_a#565,col_b#566,col_c#567]

Inspiration

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.