18

I'm in trouble trying to remove rows from a dataframe based on two-column list of items to filter. For example, for this dataframe:

df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id'])
df.show()
# +------+------+---+
# |number|letter| id|
# +------+------+---+
# |   100|     A|304|
# |   200|     B|305|
# |   300|     C|306|
# +------+------+---+

I can easily remove rows using isin on one column:

df.where(~col('number').isin([100, 200])).show()
# +------+------+---+
# |number|letter| id|
# +------+------+---+
# |   300|     C|306|
# +------+------+---+

But when I try to remove them by two columns I get an exception:

df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show()
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A]
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
    at org.apache.spark.sql.functions$.lit(functions.scala:101)
    at org.apache.spark.sql.functions.lit(functions.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

After some investigation, I realized that the root cause of the problem is creating literals from non-primitive types. I tried the following code in PySpark:

lit((100, 'A'))
lit([100, 'A'])

and the following in Scala:

lit((100, "A"))
lit(List(100, "A"))
lit(Seq(100, "A"))
lit(Array(100, "A"))

but with no luck. Does anyone know the way to create literal array in Spark/PySpark? Or is there another method to filter dataframe by two columns?

0

3 Answers 3

7

First of all you probably want struct not arrays. Remember that Spark SQL doesn't support heterogeneous arrays so array(1, 'a') is casted to array<string>.

So query could look like this:

choices = [(100, 'A'), (200, 'B')]

target = [
    struct(
        lit(number).alias("number").cast("long"), 
        lit(letter).alias("letter").cast("string")) 
    for number, letter  in choices]

query = struct("number", "letter").isin(target)

This seems to generate valid expression:

query
Column<b'(named_struct(NamePlaceholder(), number, NamePlaceholder(), letter) IN (named_struct(col1, CAST(100 AS `number` AS BIGINT), col2, CAST(A AS `letter` AS STRING)), named_struct(col1, CAST(200 AS `number` AS BIGINT), col2, CAST(B AS `letter` AS STRING))))'>

But for some reason fails on analyzer:

df.where(~query)
AnalysisException                         Traceback (most recent call last)
...
AnalysisException: "cannot resolve '(named_struct('number', `number`, 'letter', `letter`) IN (named_struct('col1', CAST(100 AS BIGINT), 'col2', CAST('A' AS STRING)), named_struct('col1', CAST(200 AS BIGINT), 'col2', CAST('B' AS STRING))))' due to data type mismatch: Arguments must be same type;;\n'Filter NOT named_struct(number, number#0L, letter, letter#1) IN (named_struct(col1, cast(100 as bigint), col2, cast(A as string)),named_struct(col1, cast(200 as bigint), col2, cast(B as string)))\n+- LogicalRDD [number#0L, letter#1, id#2L]\n"

Strangely enough with SQL following fails as well:

df.createOrReplaceTempView("df")

spark.sql("SELECT * FROM df WHERE struct(letter, letter) IN (struct(CAST(1 AS bigint), 'a'))")
AnalysisException: "cannot resolve '(named_struct('letter', df.`letter`, 'letter', df.`letter`) IN (named_struct('col1', CAST(1 AS BIGINT), 'col2', 'a')))' due to data type mismatch: Arguments must be same type; line 1 pos 46;\n'Project [*]\n+- 'Filter named_struct(letter, letter#1, letter, letter#1) IN (named_struct(col1, cast(1 as bigint), col2, a))\n   +- SubqueryAlias df\n      +- LogicalRDD [number#0L, letter#1, id#2L]\n"

but when replaced with literals on both sides:

spark.sql("SELECT * FROM df WHERE struct(CAST(1 AS bigint), 'a') IN (struct(CAST(1 AS bigint), 'a'))")
DataFrame[number: bigint, letter: string, id: bigint]

works fine so it looks like a bug.

That being said left anti join should work just fine here:

from pyspark.sql.functions import broadcast

df.join(
    broadcast(spark.createDataFrame(choices, ("number", "letter"))), 
    ["number", "letter"],
    "leftanti"
 )
+------+------+---+
|number|letter| id|
+------+------+---+
|   300|     C|306|
+------+------+---+
Sign up to request clarification or add additional context in comments.

Comments

3

To create an array literal in spark you need to create an array from a series of columns, where a column is created from the lit function:

scala> array(lit(100), lit("A"))
res1: org.apache.spark.sql.Column = array(100, A)

3 Comments

The question was about pyspark, not scala.
A hint in this answer! from pyspark.sql.functions import *; array(lit(100), lit("A")) works
thats hard codeing can this be dynamic ?
2

Spark 3.4+

F.lit([5, 7])

Full example:

from pyspark.sql import functions as F

df = spark.range(2)
df = df.withColumn("c1", F.lit([5, 7]))

df.show()
# +---+------+
# | id|    c1|
# +---+------+
# |  0|[5, 7]|
# |  1|[5, 7]|
# +---+------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.