6

I am reading an avro file which contains a field as binary string, I need to convert it into a java.lang.string to pass it to another library(spark-xml-util), how do I convert it into java.lang.string efficiently. This is the code I have got so far : -

    val df = sqlContext.read.format("com.databricks.spark.avro").load("filePath/fileName.avro")
    df.select("myField").collect().mkString

The last line gives me the following exception: -

Exception in thread "main" java.lang.ClassCastException: [B cannot be cast to java.lang.String
    at org.apache.spark.sql.Row$class.getString(Row.scala:255)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:165)

df schema is: -

root
|-- id: string (nullable = true)
|-- myField: binary (nullable = true)

3 Answers 3

8

Considering the state of the API right now (2.2.0), your best call is to create a UDF to do just that and replace the column :

import org.apache.spark.sql.functions.udf
val toString = udf((payload: Array[Byte]) => new String(payload))
df.withColumn("myField", toString(df("myField")))

or if as you seem to imply the data is compressed using GZIP you can :

import org.apache.spark.sql.functions.udf
val toString = udf((payload: Array[Byte]) => {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(payload))
  scala.io.Source.fromInputStream(inputStream).mkString
})
df.withColumn("myField", toString(df("myField")))
Sign up to request clarification or add additional context in comments.

3 Comments

I tried this, but all it gives me is junk values. This however works fine val inputStream = new GZIPInputStream(new ByteArrayInputStream(binPayldRec.getAs[Array[Byte]]("myfield"))) val xmlString = scala.io.Source.fromInputStream(inputStream).mkString I don't understand why one works and the other doesn't
well as you wrote because it seems the array[byte] is not a pure string it's a GZIP compressed string just use the same code as a UDF and it should work.
Fully qualified classes, val gzipBinaryToString = udf((payload: Array[Byte]) => { val inputStream = new java.util.zip.GZIPInputStream(new java.io.ByteArrayInputStream(payload)) scala.io.Source.fromInputStream(inputStream).mkString })
6

In Spark 3.0 you can cast between BINARY and STRING data.

scala> val df = sc.parallelize(Seq("ABC", "BCD", "CDE", "DEF")).toDF("value")
df: org.apache.spark.sql.DataFrame = [value: string]

scala> df.select($"value", $"value".cast("BINARY"), 
$"value".cast("BINARY").cast("STRING")).show()
+-----+----------+-----+
|value|     value|value|
+-----+----------+-----+
|  ABC|[41 42 43]|  ABC|
|  BCD|[42 43 44]|  BCD|
|  CDE|[43 44 45]|  CDE|
|  DEF|[44 45 46]|  DEF|
+-----+----------+-----+

I don't have your data to test with, but you should be able to do:

df.select($"myField".cast("STRING"))

This obviously depends on the actual data (i.e. don't cast a JPEG to STRING) but assuming it's UTF-8 encoded this should work.

Comments

0

In the previous solution, the code new String(payload) did not work for me on true binary data.

Ultimately the solution was a little more involved, with the length of the binary data required as a 2nd parameter.

def binToString(payload: Array[Byte], payload_length: Int): String = {
  val ac: Array[Char] = Range(0,payload_length).map(i => payload(i).toChar).toArray
  return ac.mkString 
}

val binToStringUDF = udf( binToString(_: Array[Byte], _: Int): String )

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.