5

I have a Dataframe A that contains a column of array string.

...
 |-- browse: array (nullable = true)
 |    |-- element: string (containsNull = true)
...

For example three sample rows would be

+---------+--------+---------+
| column 1|  browse| column n|
+---------+--------+---------+
|     foo1| [X,Y,Z]|     bar1|
|     foo2|   [K,L]|     bar2|
|     foo3|     [M]|     bar3|

And another Dataframe B that contains a column of string

|-- browsenodeid: string (nullable = true)

Some sample rows for it would be

+------------+
|browsenodeid|
+------------+
|           A|
|           Z|
|           M|

How can I filter A so that I keep all the rows whose browse contains any of the the values of browsenodeid from B? In terms of the above examples the result will be:

+---------+--=-----+---------+
| column 1|  browse| column n|
+---------+--------+---------+
|     foo1| [X,Y,Z]|     bar1| <- because Z is a value of B.browsenodeid
|     foo3|     [M]|     bar3| <- because M is a value of B.browsenodeid

If I had a single value then I would use something like

A.filter(array_contains(A("browse"), single_value))

But what do I do with a list or DataFrame of values?

3 Answers 3

7

I found an elegant solution for this, without the need to cast DataFrames/Datasets to RDDs.

Assuming you have a DataFrame dataDF:

+---------+--------+---------+
| column 1|  browse| column n|
+---------+--------+---------+
|     foo1| [X,Y,Z]|     bar1|
|     foo2|   [K,L]|     bar2|
|     foo3|     [M]|     bar3|

and an array b containing the values you want to match in browse

val b: Array[String] = Array(M,Z)

Implement the udf:

import org.apache.spark.sql.expressions.UserDefinedFunction
import scala.collection.mutable.WrappedArray

def array_contains_any(s:Seq[String]): UserDefinedFunction = {
udf((c: WrappedArray[String]) =>
  c.toList.intersect(s).nonEmpty)
}

and then simply use the filter or where function (with a little bit of fancy currying :P) to do the filtering like:

dataDF.where(array_contains_any(b)($"browse"))
Sign up to request clarification or add additional context in comments.

Comments

1

In Spark >= 2.4.0 you can use arrays_overlap:

import org.apache.spark.sql.functions.{array, arrays_overlap, lit}

val df = Seq(
  ("foo1", Seq("X", "Y", "Z"), "bar1"),
  ("foo2", Seq("K", "L"), "bar2"),
  ("foo3", Seq("M"), "bar3")
).toDF("col1", "browse", "coln")

val b = Seq("M" ,"Z") 
val searchArray = array(b.map{lit}:_*) // cast to lit(i) then create Spark array

df.where(arrays_overlap($"browse", searchArray)).show()

// +----+---------+----+
// |col1|   browse|coln|
// +----+---------+----+
// |foo1|[X, Y, Z]|bar1|
// |foo3|      [M]|bar3|
// +----+---------+----+

Comments

0

Assume input data:Dataframe A

browse
200,300,889,767,9908,7768,9090
300,400,223,4456,3214,6675,333
234,567,890
123,445,667,887

and you have to match it with Dataframe B

browsenodeid:(I flatten the column browsenodeid) 123,200,300

val matchSet = "123,200,300".split(",").toSet
val rawrdd = sc.textFile("D:\\Dataframe_A.txt")
rawrdd.map(_.split("|"))
      .map(arr => arr(0).split(",").toSet.intersect(matchSet).mkString(","))
      .foreach(println)

Your output:

300,200
300
123

Updated

val matchSet = "A,Z,M".split(",").toSet

val rawrdd = sc.textFile("/FileStore/tables/mvv45x9f1494518792828/input_A.txt")

rawrdd.map(_.split("|"))
      .map(r => if (! r(1).split(",").toSet.intersect(matchSet).isEmpty) org.apache.spark.sql.Row(r(0),r(1), r(2))).collect.foreach(println)

Output is

foo1,X,Y,Z,bar1
foo3,M,bar3

2 Comments

Manish thanks for your response but this is not quite what I am looking for. I have added some examples in the description to clarify what I have and what I want to achieve
Manish thanks for your answer. I wanted a solution that could be just plugged in to the Dataset's filter/where functions so that it is more readable and more easily integrated to the existing codebase (mostly written around DataFrames rather than RDDs). Check my answer above and if you like it upvote it for me!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.