0

I've written the following UDF in Scala:

import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPInputStream}

def Decompress(compressed: Array[Byte]): String = {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
  val output = scala.io.Source.fromInputStream(inputStream).mkString
  
  return output
}

val decompressUdf = (compressed: Array[Byte]) => {
  Decompress(compressed)
}

spark.udf.register("Decompress", decompressUdf)

I'm then attempting to call the UDF with the following:

val sessionsRawDF =
  sessionRawDF
    .withColumn("WebsiteSession", decompressUdf(sessionRawDF("body")))
    .select(
      current_timestamp().alias("ingesttime"),
      current_timestamp().cast("date").alias("p_ingestdate"),
      col("partition"),
      col("enqueuedTime"),
      col("WebsiteSession").alias("Json")
    )

When I run this, I get the following error:

command-130062350733681:9: error: type mismatch;
found: org.apache.spark.sql.Column required: Array[Byte] decompressUdf(col("WebsiteSession")).alias("Json")

I was under the impression Spark would implicitly get the value and go from the spark type to Array[Byte] in this case.

Would some please help me understand what's going on, I've been fighting this for a while and not sure what else to try.

1 Answer 1

3

You need to convert the Scala function to a Spark UDF first, before you can register it as a UDF. For example,

val decompressUdf = udf(Decompress _)

spark.udf.register("Decompress", decompressUdf)

In fact, there is no need to register the UDF if you're just using it in the DataFrame API. You can simply run the first line and use decompressUdf. Registering is only needed if you want to use the UDF in SQL.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.