4

I'm trying to use SQLContext.subtract() in Spark 1.6.1 to remove rows from a dataframe based on a column from another dataframe. Let's use an example:

from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(name='Alice', age=2),
    Row(name='Bob', age=1),
]).alias('df1')

df2 = sqlContext.createDataFrame([
    Row(name='Bob'),
])

df1_with_df2 = df1.join(df2, 'name').select('df1.*')
df1_without_df2 = df1.subtract(df1_with_df2)

Since I want all rows from df1 which don't include name='Bob' I expect Row(age=2, name='Alice'). But I also retrieve Bob:

print(df1_without_df2.collect())
# [Row(age='1', name='Bob'), Row(age='2', name='Alice')]

After various experiments to get down to this MCVE, I found out that the issue is with the age key. If I omit it:

df1_noage = sqlContext.createDataFrame([
    Row(name='Alice'),
    Row(name='Bob'),
]).alias('df1_noage')

df1_noage_with_df2 = df1_noage.join(df2, 'name').select('df1_noage.*')
df1_noage_without_df2 = df1_noage.subtract(df1_noage_with_df2)
print(df1_noage_without_df2.collect())
# [Row(name='Alice')]

Then I only get Alice as expected. The weirdest observation I made is that it's possible to add keys, as long as they're after (in the lexicographical order sense) the key I use in the join:

df1_zage = sqlContext.createDataFrame([
    Row(zage=2, name='Alice'),
    Row(zage=1, name='Bob'),
]).alias('df1_zage')

df1_zage_with_df2 = df1_zage.join(df2, 'name').select('df1_zage.*')
df1_zage_without_df2 = df1_zage.subtract(df1_zage_with_df2)
print(df1_zage_without_df2.collect())
# [Row(name='Alice', zage=2)]

I correctly get Alice (with her zage)! In my real examples, I'm interested in all columns, not only the ones that are after name.

1 Answer 1

3

Well there are some bugs here (the first issue looks like related to to the same problem as SPARK-6231) and JIRA looks like a good idea, but SUBTRACT / EXCEPT is no the right choice for partial matches.

Instead, as of Spark 2.0, you can use anti-join:

df1.join(df1_with_df2, ["name"], "leftanti").show()

In 1.6 you can do pretty much the same thing with standard outer join:

import pyspark.sql.functions as F

ref = df1_with_df2.select("name").alias("ref")

(df1
    .join(ref, ref.name == df1.name, "leftouter")
    .filter(F.isnull("ref.name"))
    .drop(F.col("ref.name")))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.