0

I am concatenating two columns in spark of type Array[Map[String,String]] resulting in a new column of type Array[Array[Map[String,String]]]. However I would like to flatten that column to end up having a columns of type Array[Map[String,String]] with the values of both of the original columns

I have read that from Spark 2.4 it would be possible to apply flatten directly on the concatenation of the columns. Something like this:

df.withColumn("concatenation", flatten(array($"colArrayMap1", $"colArrayMap2")))

However I am still with Spark 2.2, so I need to use a udf for that. This is what I wrote:

def flatten_collection(arr: Array[Array[Map[String,String]]]) = {
    if(arr == null)
        null
    else
        arr.flatten
}
  
val flatten_collection_udf = udf(flatten_collection _)

df.withColumn("concatenation", array($"colArrayMap1", $"colArrayMap2")).withColumn("concatenation", flatten_collection_udf($"concatenation")).show(false)

But I am getting the following error:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<array<map<string,string>>>) => array<map<string,string>>)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:835)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:835)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:380)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [[Lscala.collection.immutable.Map;

I assume the cast error is happening in the udf, but why and how to avoid it?

Besides if someone knows a solution for Spark 2.2 which doesn't require to use UDF even better

0

1 Answer 1

2

Adapted from the answer here. Seq is needed instead of Array.

def concat_arr(
    arr1: Seq[Map[String,String]],
    arr2: Seq[Map[String,String]]
) : Seq[Map[String,String]] =
{
    (arr1 ++ arr2)
}
val concatUDF = udf(concat_arr _)

val df2 = df.withColumn("concatenation", concatUDF($"colArrayMap1", $"colArrayMap2"))

df2.show(false)
+--------------------+--------------------+----------------------------------------+
|colArrayMap1        |colArrayMap2        |concatenation                           |
+--------------------+--------------------+----------------------------------------+
|[[a -> b], [c -> d]]|[[a -> b], [c -> d]]|[[a -> b], [c -> d], [a -> b], [c -> d]]|
+--------------------+--------------------+----------------------------------------+
Sign up to request clarification or add additional context in comments.

1 Comment

Regular Array is a Java array, it does not actually support all the same things a Seq does. In order to make it do so there's an implicit conversion to WrappedArray but this doesn't automatically happen inside Spark, you have to force it by declaring it Seq

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.