0

Now I have a table with following task:

  1. Group by functions on DepartmentID and EmployeeID
  2. Within each group, I need to order them by (ArrivalDate, ArrivalTime) and pick the first one. So if two dates are different, pick the newer date. If two dates are same, pick the newer time.

I am trying with this kinda of approach:

input.select("DepartmenId","EmolyeeID", "ArrivalDate", "ArrivalTime", "Word")
  .agg(here will be the function that handles logic from 2)
  .show()

What is the syntax to aggregate here?

Thank you in advance.

// +-----------+---------+-----------+-----------+--------+
// |DepartmenId|EmolyeeID|ArrivalDate|ArrivalTime|   Word |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E1    |  20170101 |    0730   |  "YES" |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E1    |  20170102 |    1530   |  "NO"  |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E2    |  20170101 |    0730   |  "ZOO" |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E2    |  20170102 |    0330   |  "BOO" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E1    |  20170101 |    0730   |  "LOL" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E1    |  20170101 |    1830   |  "ATT" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E2    |  20170105 |    1430   |  "UNI" |
// +-----------+---------+-----------+-----------+--------+


// output should be

// +-----------+---------+-----------+-----------+--------+
// |DepartmenId|EmolyeeID|ArrivalDate|ArrivalTime|   Word |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E1    |  20170102 |    1530   |  "NO"  |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E2    |  20170102 |    0330   |  "BOO" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E1    |  20170101 |    1830   |  "ATT" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E2    |  20170105 |    1430   |  "UNI" |
// +-----------+---------+-----------+-----------+--------+

2 Answers 2

2

You can use max on a new Struct column that contains all the non-grouping columns, with ArrivalData first and ArrivalTime second: the sorting of that new column will match your requirement (Latest date first; Latest hour among similar dates first) so getting the maximum will produce the record you're after.

Then, you can use a select operation to "split" the struct back into separate columns.

import spark.implicits._
import org.apache.spark.sql.functions._

df.groupBy($"DepartmentID", $"EmployeeID")
  .agg(max(struct("ArrivalDate", "ArrivalTime", "Word")) as "struct")
  .select($"DepartmentID", $"EmployeeID",
    $"struct.ArrivalDate" as "ArrivalDate",
    $"struct.ArrivalTime" as "ArrivalTime",
    $"struct.Word" as "Word"
  )
Sign up to request clarification or add additional context in comments.

2 Comments

This works! But is there a way we can avoid having "Word" in the struct()? Because in my real example, there are more than just one column of "Word". So image there are "Word1", "Word2", "Word3"..., Is there a way so that I can just add those words in the .select statement?
This technique only works if you "combine" all the columns you're interested in into one struct - otherwise, using max on just some of the columns would not fetch the right values for other columns. If you have many such columns, you can either try the alternative Window approach suggested by @leo-c, or use some filtering over df.columns to dynamically generate the list of columns included in the struct so that you don't have to list them explicitly.
2

One approach would be to use Spark Window function:

val df = Seq(
  ("D1", "E1", "20170101", "0730", "YES"),
  ("D1", "E1", "20170102", "1530", "NO"),
  ("D1", "E2", "20170101", "0730", "ZOO"),
  ("D1", "E2", "20170102", "0330", "BOO"),
  ("D2", "E1", "20170101", "0730", "LOL"),
  ("D2", "E1", "20170101", "1830", "ATT"),
  ("D2", "E2", "20170105", "1430", "UNI")
).toDF(
  "DepartmenId", "EmolyeeID", "ArrivalDate", "ArrivalTime", "Word"
)

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn("rowNum", row_number().over(
    Window.partitionBy("DepartmenId", "EmolyeeID").
      orderBy($"ArrivalDate".desc, $"ArrivalTime".desc)
  )).
  select("DepartmenId", "EmolyeeID", "ArrivalDate", "ArrivalTime","Word").
  where($"rowNum" === 1).
  orderBy("DepartmenId", "EmolyeeID")

df2.show
+-----------+---------+-----------+-----------+----+
|DepartmenId|EmolyeeID|ArrivalDate|ArrivalTime|Word|
+-----------+---------+-----------+-----------+----+
|         D1|       E1|   20170102|       1530|  NO|
|         D1|       E2|   20170102|       0330| BOO|
|         D2|       E1|   20170101|       1830| ATT|
|         D2|       E2|   20170105|       1430| UNI|
+-----------+---------+-----------+-----------+----+

2 Comments

Thanks! Is using Window.Partition better than doing GroupBy? I want to know if there is tradeoff in speed and performance
@vincwng, I haven't done any benchmarking on Window functions vs groupBy, but I find Window functions often useful in reducing the level of necessary nested queries (e.g. self-joins) in which case they would give better performance.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.