11

Does anyone know the best way for Apache Spark SQL to achieve the same results as the standard SQL qualify() + rnk or row_number statements?

For example:

  • I have a Spark Dataframe called statement_data with 12 monthly records each for 100 unique account_numbers, therefore 1200 records in total
  • Each monthly record has a field called "statement_date" that can be used for determining the most recent record

I want my final result to be a new Spark Dataframe with the 3 most recent records (as determined by statement_date descending) for each of the 100 unique account_numbers, therefore 300 final records in total.

In standard Teradata SQL, I can do the following:

select * from statement_data
qualify row_number ()
over(partition by acct_id order by statement_date desc) <= 3

Apache Spark SQL does not have a standalone qualify function that I'm aware of, maybe I'm screwing up the syntax or can't find documentation that qualify exists.

It is fine if I need to do this in two steps as long as those two steps are:

  • A select query or alternative method to assign rank/row numbering for each account_number's records
  • A select query where I'm selecting all records with rank <= 3 (i.e. choose 1st, 2nd, and 3rd most recent records).

EDIT 1 - 7/23 2:09pm: The initial solution provided by zero323 was not working for me in Spark 1.4.1 with Spark SQL 1.4.1 dependency installed.

EDIT 2 - 7/23 3:24pm: It turns out the error was related to using SQL Context objects for my query instead of Hive Context. I am now able to run the below solution correctly after adding the following code to create and use a Hive Context:

final JavaSparkContext sc2;
final HiveContext hc2;
DataFrame df;
hc2 = TestHive$.MODULE$;
sc2 = new JavaSparkContext(hc2.sparkContext()); 
....
// Initial Spark/SQL contexts to set up Dataframes  
SparkConf conf = new SparkConf().setAppName("Statement Test");
...
DataFrame stmtSummary = 
    hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");

3 Answers 3

15

There is no qualify (it is usually useful to check parser source) but you can use subquery like this:

SELECT * FROM (
    SELECT *, row_number() OVER (
        PARTITION BY acct_id ORDER BY statement_date DESC
    ) rank FROM df
 ) tmp WHERE rank <= 3

See also SPARK : failure: ``union'' expected but `(' found

Sign up to request clarification or add additional context in comments.

3 Comments

This was not working for me on Spark 1.3.1. I read the "window function" (partition/order by/desc) was added in 1.4.0, working on installing that then I will give this a shot
See my edits above for additional details around getting window functions to work using Hive Context. I am accepting this answer but feel free to add the additional notes around the Hive Context vs SQL Context for clarity.
But I'm using cassandra nosql.
3

There is Qualify Statement.

SELECT * 
FROM df
QUALIFY row_number() OVER (PARTITION BY acct_id ORDER BY statement_date DESC) = 1 

2 Comments

Which versions of spark is this available for? It looks to be Databricks only.
Here is the open ticket to add QUALIFY support to Spark SQL, so we can benefit in all the spark implementations: issues.apache.org/jira/browse/SPARK-31561
0

There is currently no QUALIFY in Spark SQL, but we really need it. As mentioned in a comment above, there is a JIRA for it, but it does not seem to be getting any traction of late: https://issues.apache.org/jira/browse/SPARK-31561

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.