1

I have dataframe in spark that is very complex. I'm trying to use a UDF that takes 2 columns and then runs a function on each row of each column at the same time.

each column has the following identical schema:

root
 |-- A: array (nullable = true)
 |    |-- element: double (containsNull = true)

In some cases, the array will be empty and in other cases, it will have elements, the count will vary.

when I do .dtypes on a column I get:

test: Array[(String, String)] = Array((A,ArrayType(DoubleType,true)))

When I do a take(1) on one of the columns I get a

Array[org.apache.spark.sql.Row] = Array([WrappedArray(1234, 4567, 789, 1346)])

When I simply run a select on a column I get:

org.apache.spark.sql.DataFrame = [A: array<double>]

My goal is to run a function that takes each column's same element.

def inRange = udf((A: ???, B: ??? ) => {
   //iterate over the array and run coolFunction(A(0),B(0))
 })

I'm running the udf in this

df.withColumn("coolFunction", coolFunction(df("A"), df("B"))) 

2 Answers 2

1

You can define your udf function using collection.mutable.WrappedArray[Double] as

def inRange = udf((A: collection.mutable.WrappedArray[Double], B: collection.mutable.WrappedArray[Double]) => {
  //iterate over the array and run coolFunction(A(0),B(0))
})

Or you can also use the parent class of WrappedArray which are IndexedSeq or Seq

def inRange = udf((A: collection.mutable.IndexedSeq[Double], B: collection.mutable.IndexedSeq[Double]) => {
  //iterate over the array and run coolFunction(A(0),B(0))
})

Or

def inRange = udf((A: Seq[Double], B: Seq[Double]) => {
  //iterate over the array and run coolFunction(A(0),B(0))
})
Sign up to request clarification or add additional context in comments.

6 Comments

I'm going to try each out, are there any pros or cons to each?
I get a error of java.lang.UnsupportedOperationException: Schema for type Unit is not supported for each. df.withColumn("coolFunction", coolFunction(df("A"), df("B")))
I think that error is saying my function is returning nothing.. let me test it a few times.
You have to return a value from the udf function for a new column or replace an existing column
Google: a consequence of an action or event, especially when complex or unwelcome
|
0

Should be:

def inRange = udf((A: Seq[Double], B: Seq[Double]) => {
    //iterate over the array and run coolFunction(A(0),B(0))
})

Reference https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.