3

I'm trying to define a way to filter elements from WrappedArrays in DFs. The filter is based on an external list of elements.

Looking for a solutions I found this question. It is very similar, but it seems not to work for me. I'm using Spark 2.4.0. This is my code:

val df = sc.parallelize(Array((1, Seq("s", "v", "r")),(2, Seq("r", "a", "v")),(3, Seq("s", "r", "t")))).toDF("foo","bar")


def filterItems(flist: Seq[String]) = udf {
  (recs: Seq[String]) => recs match {
    case null => Seq.empty[String]
    case recs => recs.intersect(flist)
  }}

df.withColumn("filtercol", filterItems(Seq("s", "v"))(col("bar"))).show(5)

My expected result would be:

+---+---------+---------+ 
|foo| bar|filtercol| 
+---+---------+---------+ 
| 1 |[s, v, r]|   [s, v]| 
| 2 |[r, a, v]|      [v]| 
| 3| [s, r, t]|      [s]| 
+---+---------+---------+

But I'm getting this error:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
0

2 Answers 2

3

You can use the build-in function in Spark 2.4 without too much effort actually:

import org.apache.spark.sql.functions.{array_intersect, array, lit}

val df = sc.parallelize(Array((1, Seq("s", "v", "r")),(2, Seq("r", "a", "v")),(3, Seq("s", "r", "t")))).toDF("foo","bar")

val ar = Seq("s", "v").map(lit(_))
df.withColumn("filtercol", array_intersect($"bar", array(ar:_*))).show

Output:

+---+---------+---------+
|foo|      bar|filtercol|
+---+---------+---------+
|  1|[s, v, r]|   [s, v]|
|  2|[r, a, v]|      [v]|
|  3|[s, r, t]|      [s]|
+---+---------+---------+

The only tricky part is Seq("s", "v").map(lit(_)) which will map each string into lit(i). The intersection function accepts two arrays. The first one is the value of bar column. The second one is created it on the fly with array(ar:_*), which will contain values of lit(i).

Sign up to request clarification or add additional context in comments.

Comments

0

If you pass an attribute of ArrayType into an UDF, it arrives as an instance of WrappedArray, which is not a List. So you should change recs type to Seq, IndexedSeq or WrappedArray, normally I just use plain Seq:

def filterItems(flist: List[String]) = udf {
  (recs: Seq[String]) => recs match {
    case null => Seq.empty[String]
    case recs => recs.intersect(flist)
  }}

1 Comment

You are right about this. But it doesn't solve my problem. I could change of all List types to Seq types and error would still be the same. I have edited the question so this is not the issue. Thanks for your help.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.