0

To make it simple, let's assume we have a dataframe containing the following data:

+----------+---------+----------+----------+
|firstName |lastName |Phone     |Address   |
+----------+---------+----------+----------+
|firstName1|lastName1|info1     |info2     |
|firstName1|lastName1|myInfo1   |dummyInfo2|
|firstName1|lastName1|dummyInfo1|myInfo2   |
+----------+---------+----------+----------+

How can I merge all rows grouping by (firstName,lastName) and keep in the columns Phone and Address only data starting by "my" to get the following :

+----------+---------+----------+----------+
|firstName |lastName |Phone     |Address   |
+----------+---------+----------+----------+
|firstName1|lastName1|myInfo1   |myInfo2   |
+----------+---------+----------+----------+

Maybe should I use agg function with a custom UDAF? But how can I implement it?

Note: I'm using Spark 2.2 along with Scala 2.11.

2 Answers 2

2

You can use groupBy and collect_set aggregation function and use a udf function to filter in the first string that starts with "my"

import org.apache.spark.sql.functions._
def myudf = udf((array: Seq[String]) => array.filter(_.startsWith("my")).head)

df.groupBy("firstName ", "lastName")
  .agg(myudf(collect_set("Phone")).as("Phone"), myudf(collect_set("Address")).as("Address"))
  .show(false)

which should give you

+----------+---------+-------+-------+
|firstName |lastName |Phone  |Address|
+----------+---------+-------+-------+
|firstName1|lastName1|myInfo1|myInfo2|
+----------+---------+-------+-------+

I hope the answer is helpful

Sign up to request clarification or add additional context in comments.

Comments

1

If only two columns involved, filtering and join can be used instead of UDF:

val df = List(
  ("firstName1", "lastName1", "info1", "info2"),
  ("firstName1", "lastName1", "myInfo1", "dummyInfo2"),
  ("firstName1", "lastName1", "dummyInfo1", "myInfo2")
).toDF("firstName", "lastName", "Phone", "Address")

val myPhonesDF = df.filter($"Phone".startsWith("my"))
val myAddressDF = df.filter($"Address".startsWith("my"))

val result = myPhonesDF.alias("Phones").join(myAddressDF.alias("Addresses"), Seq("firstName", "lastName"))
    .select("firstName", "lastName", "Phones.Phone", "Addresses.Address")
result.show(false)

Output:

+----------+---------+-------+-------+
|firstName |lastName |Phone  |Address|
+----------+---------+-------+-------+
|firstName1|lastName1|myInfo1|myInfo2|
+----------+---------+-------+-------+

For many columns, when only one row expected, such construction can be used:

  val columnsForSearch = List("Phone", "Address")
  val minExpressions = columnsForSearch.map(c => min(when(col(c).startsWith("my"), col(c)).otherwise(null)).alias(c))
  df.groupBy("firstName", "lastName").agg(minExpressions.head, minExpressions.tail: _*)

Output is the same.

UDF with two parameters example:

  val twoParamFunc = (firstName: String, Phone: String) => firstName + ": " + Phone
  val twoParamUDF = udf(twoParamFunc)
  df.select(twoParamUDF($"firstName", $"Phone")).show(false)

3 Comments

It was just an example to make it simple, but in real my dataframe contains more than 40 cols ... But thank you anyway
Here we apply min function to get only one value. What about if I have multiple values for the same Column starting with "my" and I'm supposed to launch an exception or do some logic to choose which one I should keep ? (The logic is the same for all the columns)
For chose logic UDF is required, if "min" or similar ("max", etc.) are not applicable.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.