3

I have a dataframe:

+--------------------+------+
|people              |person|
+--------------------+------+
|[[jack, jill, hero]]|joker |
+--------------------+------+

It's schema:

root
 |-- people: struct (nullable = true)
 |    |-- person: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- person: string (nullable = true)

Here, root--person is a string. So, I can update this field using udf as:

def updateString = udf((s: String) => {
    "Mr. " + s
})
df.withColumn("person", updateString(col("person"))).select("person").show(false)

output:

+---------+
|person   |
+---------+
|Mr. joker|
+---------+

I want to do same operation on root--people--person column which contains array of person. How to achieve this using udf?

def updateArray = udf((arr: Seq[Row]) => ???
df.withColumn("people", updateArray(col("people.person"))).select("people").show(false)

expected:

+------------------------------+
|people                        |
+------------------------------+
|[Mr. hero, Mr. jack, Mr. jill]|
+------------------------------+

Edit: I also want to preserve its schema after updating root--people--person.

Expected schema of people:

df.select("people").printSchema()

root
 |-- people: struct (nullable = false)
 |    |-- person: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

Thanks,

3 Answers 3

1

The problem here is that people is s struct with only 1 field. In your UDF, you need to return Tuple1 and then further cast the output of your UDF to keep the names correct:

def updateArray = udf((r: Row) => Tuple1(r.getAs[Seq[String]](0).map(x=>"Mr."+x)))

val newDF = df
  .withColumn("people",updateArray($"people").cast("struct<person:array<string>>"))

newDF.printSchema()
newDF.show()

gives

root
 |-- people: struct (nullable = true)
 |    |-- person: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- person: string (nullable = true)


+--------------------+------+
|              people|person|
+--------------------+------+
|[[Mr.jack, Mr.jil...| joker|
+--------------------+------+
Sign up to request clarification or add additional context in comments.

1 Comment

can you answer it in PySpark also ?
1

for you just need to update your function and everything remains the same. here is the code snippet.

scala> df2.show
+------+------------------+
|people|            person|
+------+------------------+
| joker|[jack, jill, hero]|
+------+------------------+
//jus order is changed
I just updated your function instead of using Row I am using here Seq[String]

scala> def updateArray = udf((arr: Seq[String]) => arr.map(x=>"Mr."+x))
scala> df2.withColumn("test",updateArray($"person")).show(false)
+------+------------------+---------------------------+
|people|person            |test                       |
+------+------------------+---------------------------+
|joker |[jack, jill, hero]|[Mr.jack, Mr.jill, Mr.hero]|
+------+------------------+---------------------------+
//keep all the column for testing purpose you could drop if you dont want.

let me know if you want to know more about same.

2 Comments

Thank you @Mahesh, this works like charm, but I also want to preserve its schema. I have updated the question. Could you please see it and update the answer.
your input is [jack, jill, hero] and you want output as [Mr. hero, Mr. jack, Mr. jill] is that correct ?
0

Let's create data for testing

scala> val data = Seq((List(Array("ja", "ji", "he")), "person")).toDF("people", "person")
data: org.apache.spark.sql.DataFrame = [people: array<array<string>>, person: string]

scala> data.printSchema
root
 |-- people: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- person: string (nullable = true)

create UDF for our requirements

scala> def arrayConcat(array:Seq[Seq[String]], str: String) = array.map(_.map(str + _))
arrayConcat: (array: Seq[Seq[String]], str: String)Seq[Seq[String]]

scala> val arrayConcatUDF = udf(arrayConcat _)
arrayConcatUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(ArrayType(StringType,true),true),Some(List(ArrayType(ArrayType(StringType,true),true), StringType)))

Applying the udf

scala> data.withColumn("dasd", arrayConcatUDF($"people", lit("Mr."))).show(false)
+--------------------------+------+-----------------------------------+
|people                    |person|dasd                               |
+--------------------------+------+-----------------------------------+
|[WrappedArray(ja, ji, he)]|person|[WrappedArray(Mr.ja, Mr.ji, Mr.he)]|
+--------------------------+------+-----------------------------------+

You may need to tweak a bit(I think any tweak is hardly required) but this contains the most of it to solve your problem

1 Comment

this is not the correct schema (of your input data)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.