I want to do this kind of query by scala dataframe api.
tl;dr Up to Spark 2.1.0 it is not possible. There are currently no plans to add such an operator to Dataset API.
Spark SQL supports the following so-called multi-dimensional aggregate operators:
rollup operator
cube operator
GROUPING SETS clause (only in SQL mode)
grouping() and grouping_id() functions
NOTE: GROUPING SETS is only available in SQL mode. There is no support in Dataset API.
GROUPING SETS
val sales = Seq(
("Warsaw", 2016, 100),
("Warsaw", 2017, 200),
("Boston", 2015, 50),
("Boston", 2016, 150),
("Toronto", 2017, 50)
).toDF("city", "year", "amount")
sales.createOrReplaceTempView("sales")
// equivalent to rollup("city", "year")
val q = sql("""
SELECT city, year, sum(amount) as amount
FROM sales
GROUP BY city, year
GROUPING SETS ((city, year), (city), ())
ORDER BY city DESC NULLS LAST, year ASC NULLS LAST
""")
scala> q.show
+-------+----+------+
| city|year|amount|
+-------+----+------+
| Warsaw|2016| 100|
| Warsaw|2017| 200|
| Warsaw|null| 300|
|Toronto|2017| 50|
|Toronto|null| 50|
| Boston|2015| 50|
| Boston|2016| 150|
| Boston|null| 200|
| null|null| 550| <-- grand total across all cities and years
+-------+----+------+
// equivalent to cube("city", "year")
// note the additional (year) grouping set
val q = sql("""
SELECT city, year, sum(amount) as amount
FROM sales
GROUP BY city, year
GROUPING SETS ((city, year), (city), (year), ())
ORDER BY city DESC NULLS LAST, year ASC NULLS LAST
""")
scala> q.show
+-------+----+------+
| city|year|amount|
+-------+----+------+
| Warsaw|2016| 100|
| Warsaw|2017| 200|
| Warsaw|null| 300|
|Toronto|2017| 50|
|Toronto|null| 50|
| Boston|2015| 50|
| Boston|2016| 150|
| Boston|null| 200|
| null|2015| 50| <-- total across all cities in 2015
| null|2016| 250| <-- total across all cities in 2016
| null|2017| 250| <-- total across all cities in 2017
| null|null| 550|
+-------+----+------+
If a value in a column of resulting table is null, it may not necessarily mean that the column was aggregated on that row. If that column has nulls in the original table, null value in the aggregations table may represent just a null value from the original table. Use grouping function to check if the column was aggregated on the specific row or not.