1

I have the following Spark SQL query:

val subquery = 
        "( select garment_group_name , prod_name, " +
        "row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum " +
        "from articles a1 " +
        "group by garment_group_name, prod_name )"


val query = "SELECT garment_group_name, prod_name " +
            "FROM " + subquery +
            " WHERE seqnum = 1 "


val query3 = spark.sql(query)

I am trying to do that exact same thing however as a Data frame API. I wanted to just first concentrate on the subquery part and I did something like this

import org.apache.spark.sql.expressions.Window // imports the needed Window object
import org.apache.spark.sql.functions.row_number

val windowSpec = Window.partitionBy("garment_group_name")

articlesDF.withColumn("row_number", row_number.over(windowSpec))
    .show()

However I get the following error

org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply$33.applyOrElse(Analyzer.scala:2207)......... and so on.

I see that I need to include an orderBy clause but how can I do this if I am actually first counting from a group by on two columns and then comes an order by?

The warning gives the example: SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table, but I do not know how to do this as a data frame API and I don't see this online.

1 Answer 1

1

The solution is to first perform count("prod_name") in a Window which is partitioned by both "garment_group_name" and "prod_name" which is then used in windowSpec.

Starting with some example data:

val df = List(
  ("a", "aa1"), ("a", "aa2"), ("a", "aa3"), ("b", "bb")
)
.toDF("garment_group_name", "prod_name")

df.show(false)

gives:

+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a                 |aa1      |
|a                 |aa2      |
|a                 |aa3      |
|b                 |bb       |
+------------------+---------+

and the two window functions we need:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col

val countWindowSpec = Window.partitionBy("garment_group_name", "prod_name")
val windowSpec      = Window.partitionBy(col("garment_group_name")).orderBy(col("count").desc)

We can then use them:

df
    // create the `count` column to be used by `windowSpec`
    .withColumn("count", count(col("prod_name")).over(countWindowSpec))
    .withColumn("seqnum", row_number.over(windowSpec))
    // take only the first row of each partition
    .filter(col("seqnum") === 1)
    // select only the rows we care about
    .select("garment_group_name", "prod_name")
    .show(false)

which gives:

+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a                 |aa1      |
|b                 |bb       |
+------------------+---------+

Comparing this to your SQL implementation, using the same df:

df.createOrReplaceTempView("a1")

val subquery = 
        "( select garment_group_name , prod_name, " +
        "row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum " +
        "from a1 " +
        "group by garment_group_name, prod_name )"

val query = "SELECT garment_group_name, prod_name " +
            "FROM " + subquery +
            " WHERE seqnum = 1 "

spark.sql(query).show(false)

we get the same result of:

+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a                 |aa1      |
|b                 |bb       |
+------------------+---------+
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you for explaining it. This a great solution!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.