0

I have a Spark DataFrame that looks like this:

root
|-- employeeName: string (nullable = true)
|-- employeeId: string (nullable = true)
|-- employeeEmail: string (nullable = true)
|-- company: struct (nullable = true)
|    |-- companyName: string (nullable = true)
|    |-- companyId: string (nullable = true)
|    |-- details: struct (nullable = true)
|    |    |-- founded: string (nullable = true)
|    |    |-- address: string (nullable = true)
|    |    |-- industry: string (nullable = true)

What I want to do is group by companyId and get an array of employees per company, like this:

root
|-- company: struct (nullable = true)
|    |-- companyName: string (nullable = true)
|    |-- companyId: string (nullable = true)
|    |-- details: struct (nullable = true)
|    |    |-- founded: string (nullable = true)
|    |    |-- address: string (nullable = true)
|    |    |-- industry: string (nullable = true)
|-- employees: array (nullable = true)     
|    |-- employee: struct (nullable = true)           
|    |    |-- employeeName: string (nullable = true)
|    |    |-- employeeId: string (nullable = true)
|    |    |-- employeeEmail: string (nullable = true)

Of course, I can easily do that if I just had a pair of (company, employee): (String, String) using map and reduceByKey. But with all the different nested information, I'm not sure what approach to take.

Should I try to flatten everything? Any example to do similar things would be very helpful.

1 Answer 1

1

You can do the following --

// declaring data types
case class Company(cName: String, cId: String, details: String)
case class Employee(name: String, id: String, email: String, company: Company)

// setting up example data
val e1 = Employee("n1", "1", "[email protected]", Company("c1", "1", "d1"))
val e2 = Employee("n2", "2", "[email protected]", Company("c1", "1", "d1"))
val e3 = Employee("n3", "3", "[email protected]", Company("c1", "1", "d1"))
val e4 = Employee("n4", "4", "[email protected]", Company("c2", "2", "d2"))
val e5 = Employee("n5", "5", "[email protected]", Company("c2", "2", "d2"))
val e6 = Employee("n6", "6", "[email protected]", Company("c2", "2", "d2"))
val e7 = Employee("n7", "7", "[email protected]", Company("c3", "3", "d3"))
val e8 = Employee("n8", "8", "[email protected]", Company("c3", "3", "d3"))
val employees = Seq(e1, e2, e3, e4, e5, e6, e7, e8)
val ds = sc.parallelize(employees).toDS

// actual query to achieve what is mentioned in the question
val result = ds.groupByKey(e => e.company).mapGroups((k, itr) => (k, itr.toList))
result.collect

Results in:

Array(

(Company(c1,1,d1),WrappedArray(Employee(n1,1,[email protected],Company(c1,1,d1)), Employee(n2,2,[email protected],Company(c1,1,d1)), Employee(n3,3,[email protected],Company(c1,1,d1)))),

(Company(c2,2,d2),WrappedArray(Employee(n4,4,[email protected],Company(c2,2,d2)), Employee(n5,5,[email protected],Company(c2,2,d2)), Employee(n6,6,[email protected],Company(c2,2,d2)))), 

(Company(c3,3,d3),WrappedArray(Employee(n7,7,[email protected],Company(c3,3,d3)), Employee(n8,8,[email protected],Company(c3,3,d3)))))

The important thing is: you can pass any function you want in mapGroups to get the groups in a way that you want.

Hope this helps.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.