35

I am working on a PySpark DataFrame with n columns. I have a set of m columns (m < n) and my task is choose the column with max values in it.

For example:

Input: PySpark DataFrame containing :

col_1 = [1,2,3], col_2 = [2,1,4], col_3 = [3,2,5]

Ouput :

col_4 = max(col1, col_2, col_3) = [3,2,5]

There is something similar in pandas as explained in this question.

Is there any way of doing this in PySpark or should I change convert my PySpark df to Pandas df and then perform the operations?

1
  • 1
    if the question is about getting the max value of each column, then it looks like the expected output should be [max(col_1), max(col_2), max(col_3)] = [3, 4, 5] Commented Sep 22, 2018 at 21:17

5 Answers 5

33

You can reduce using SQL expressions over a list of columns:

from pyspark.sql.functions import max as max_, col, when
from functools import reduce

def row_max(*cols):
    return reduce(
        lambda x, y: when(x > y, x).otherwise(y),
        [col(c) if isinstance(c, str) else c for c in cols]
    )

df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)])
    .toDF(["a", "b", "c"]))

df.select(row_max("a", "b", "c").alias("max")))

Spark 1.5+ also provides least, greatest

from pyspark.sql.functions import greatest

df.select(greatest("a", "b", "c"))

If you want to keep name of the max you can use `structs:

from pyspark.sql.functions import struct, lit

def row_max_with_name(*cols):
    cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols]
    return greatest(*cols_).alias("greatest({0})".format(",".join(cols)))

 maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))

And finally you can use above to find select "top" column:

from pyspark.sql.functions import max

((_, c), ) = (maxs
    .groupBy(col("maxs")["col"].alias("col"))
    .count()
    .agg(max(struct(col("count"), col("col"))))
    .first())

df.select(c)
Sign up to request clarification or add additional context in comments.

1 Comment

this is very helpful! how do you find second largest instead? I want to get the name of the second largest column
33

We can use greatest

Creating DataFrame

df = spark.createDataFrame(
    [[1,2,3], [2,1,2], [3,4,5]], 
    ['col_1','col_2','col_3']
)
df.show()
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    1|    2|    3|
|    2|    1|    2|
|    3|    4|    5|
+-----+-----+-----+

Solution

from pyspark.sql.functions import greatest
df2 = df.withColumn('max_by_rows', greatest('col_1', 'col_2', 'col_3'))

#Only if you need col
#from pyspark.sql.functions import col
#df2 = df.withColumn('max', greatest(col('col_1'), col('col_2'), col('col_3')))
df2.show()

+-----+-----+-----+-----------+
|col_1|col_2|col_3|max_by_rows|
+-----+-----+-----+-----------+
|    1|    2|    3|          3|
|    2|    1|    2|          2|
|    3|    4|    5|          5|
+-----+-----+-----+-----------+

1 Comment

If you need the minimum from two columns, use least()
13

You can also use the pyspark built-in least:

from pyspark.sql.functions import least, col
df = df.withColumn('min', least(col('c1'), col('c2'), col('c3')))

2 Comments

I think OP wants the opposite of this. Is there an equivalent most function?
Ah it's greatest - see @ansev answer below
0

Another simple way of doing it. Let us say that the below df is your dataframe

df = sc.parallelize([(10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4)]).toDF(["c1", "c2", "c3"])
df.show()

+---+---+---+
| c1| c2| c3|
+---+---+---+
| 10| 10|  1|
|200|  2| 20|
|  3| 30|300|
|400| 40|  4|
+---+---+---+

You can process the above df as below to get the desited results

from pyspark.sql.functions import lit, min

df.select( lit('c1').alias('cn1'), min(df.c1).alias('c1'),
           lit('c2').alias('cn2'), min(df.c2).alias('c2'),
           lit('c3').alias('cn3'), min(df.c3).alias('c3')
          )\
         .rdd.flatMap(lambda r: [ (r.cn1, r.c1), (r.cn2, r.c2), (r.cn3, r.c3)])\
         .toDF(['Columnn', 'Min']).show()

+-------+---+
|Columnn|Min|
+-------+---+
|     c1|  3|
|     c2|  2|
|     c3|  1|
+-------+---+

1 Comment

You are doing min(col1), whereas I want min(row1), min(row2).. and so on...
0

Scala solution:

df = sc.parallelize(Seq((10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4))).toDF("c1", "c2", "c3"))  

df.rdd.map(row=>List[String](row(0).toString,row(1).toString,row(2).toString)).map(x=>(x(0),x(1),x(2),x.min)).toDF("c1","c2","c3","min").show    

+---+---+---+---+  
| c1| c2| c3|min|  
+---+---+---+---+  
| 10| 10|  1|  1|    
|200|  2| 20|  2|  
|  3| 30|300|  3|  
|400| 40|  4|  4|  
+---+---+---+---+  

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.