4

I know we can replace values in a dataframe column and return a new dataframe with updated values using below method :

dataframe.withColumn("col1",when(col("col1").equalTo("this"),"that").otherwise(col("make")))

but this will change the whole column value wherever required.

Now I have a slightly complex dataframe:

|        colleagues|   name|

|[guy1, guy2, guy3]|Thisguy|
|[guy4, guy5, guy6]|Thatguy|
|[guy7, guy8, guy9]|Someguy|

Here I have a 'colleagues' column which holds arrays. And I want to replace a specific element of any array, e.g instead of 'guy2' in the first row I want 'guy10' in my new dataframe How can I achieve that? Please help.

1 Answer 1

2

Introduction

There are a couple of questions to be answered before offering a final solution (e.g. the ordering of the elements in colleagues array after replacing some), but I don't want to drag this along too long. Let's have a look at the very common approach to solve problems like this.

Solution

Since colleagues column is an array column (and Spark it very effective at queries over rows) you should first explode (or posexplode) it. With rows per array element you can do necessary changes and in the end collect_list to have the array column back.

explode(e: Column): Column Creates a new row for each element in the given array or map column.

posexplode(e: Column): Column Creates a new row for each element with position in the given array or map column.

Let's use the following names dataset:

val names = Seq((Array("guy1", "guy2", "guy3"), "Thisguy")).toDF("colleagues", "name")
scala> names.show
+------------------+-------+
|        colleagues|   name|
+------------------+-------+
|[guy1, guy2, guy3]|Thisguy|
+------------------+-------+
scala> names.printSchema
root
 |-- colleagues: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- name: string (nullable = true)

Let's explode, do the changes and in the end collect_list.

val elements = names.withColumn("elements", explode($"colleagues"))
scala> elements.show
+------------------+-------+--------+
|        colleagues|   name|elements|
+------------------+-------+--------+
|[guy1, guy2, guy3]|Thisguy|    guy1|
|[guy1, guy2, guy3]|Thisguy|    guy2|
|[guy1, guy2, guy3]|Thisguy|    guy3|
+------------------+-------+--------+

That's what Spark SQL can handle with ease. Let's use regexp_replace (What? regexp?! And now you've got two problems :)).

val replaced = elements.withColumn("replaced", regexp_replace($"elements", "guy2", "guy10"))
scala> replaced.show
+------------------+-------+--------+--------+
|        colleagues|   name|elements|replaced|
+------------------+-------+--------+--------+
|[guy1, guy2, guy3]|Thisguy|    guy1|    guy1|
|[guy1, guy2, guy3]|Thisguy|    guy2|   guy10|
|[guy1, guy2, guy3]|Thisguy|    guy3|    guy3|
+------------------+-------+--------+--------+

In the end, let's group by the initial array column and use collect_list grouping function.

val solution = replaced
  .groupBy($"colleagues" as "before")
  .agg(
    collect_list("replaced") as "after",
    first("name") as "name")
scala> solution.show
+------------------+-------------------+-------+
|            before|              after|   name|
+------------------+-------------------+-------+
|[guy1, guy2, guy3]|[guy1, guy10, guy3]|Thisguy|
+------------------+-------------------+-------+

Alternative Solutions

User-Defined Function (UDF)

Alternatively, you could also write a custom user-defined function, but that would not benefit from as many optimizations as the solution above so I'd not recommend it (and will only show on request).

Custom Logical Operator

The best approach would be to write a custom logical operator (a LogicalPlan) that would do all this and participate in optimizations, but avoid exchanges (introduced by groupBy). That'd however be a fairly advanced Spark development that I have not done yet.

Sign up to request clarification or add additional context in comments.

2 Comments

solutions for withColumn(..., explode()) are clear. What about posexplode? your issues.apache.org/jira/browse/SPARK-20174 has no updates so far.
nevermind, I found a way: df selectExpr("*", "posexplode(s) as (p,c)") drop("s")

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.