0

I have a pyspark 2.0 dataframe that I'm trying to filter based on a (relatively) short list - maybe length 50-100.

filterList = ['A','B','C']

I'd like to broadcast that list out to each of my nodes and use it to remove records where one of two columns isn't in my list.

This operation works:

filter_df= df.where((df['Foo'].isin(filterList )) | (df['Bar'].isin(filterList)))

but as soon as I broadcast out the list I get an error:

filterListB= sc.broadcast(filterList)

filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-99-1b972cf29148> in <module>()
----> 1 filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))

/usr/local/spark/python/pyspark/sql/column.pyc in isin(self, *cols)
    284         if len(cols) == 1 and isinstance(cols[0], (list, set)):
    285             cols = cols[0]
--> 286         cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
    287         sc = SparkContext._active_spark_context
    288         jc = getattr(self._jc, "isin")(_to_seq(sc, cols))

/usr/local/spark/python/pyspark/sql/column.pyc in _create_column_from_literal(literal)
     33 def _create_column_from_literal(literal):
     34     sc = SparkContext._active_spark_context
---> 35     return sc._jvm.functions.lit(literal)
     36 
     37 

/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1122 
   1123     def __call__(self, *args):
-> 1124         args_command, temp_args = self._build_args(*args)
   1125 
   1126         command = proto.CALL_COMMAND_NAME +\

/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in _build_args(self, *args)
   1092 
   1093         args_command = "".join(
-> 1094             [get_command_part(arg, self.pool) for arg in new_args])
   1095 
   1096         return args_command, temp_args

/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool)
    287             command_part += ";" + interface
    288     else:
--> 289         command_part = REFERENCE_TYPE + parameter._get_object_id()
    290 
    291     command_part += "\n"

AttributeError: 'Broadcast' object has no attribute '_get_object_id'

Any thoughts on how I should be filtering a pyspark 2.0 dataframe based on a broadcast list?

2
  • 1
    If you have such a short list of only 50-100 length, then no need to broadcast it. You can directly use it through the global variable. Only objects of big size are needed to broadcast. Commented Sep 25, 2018 at 15:24
  • How is it different if it is a 100 item list or 10000 item list ? As per my understanding, the list gets resolved by the catalyst optimizer and directly parse the value in == Physical Plan == Commented Aug 19, 2021 at 9:52

1 Answer 1

9

You can't directly access the Broadcast variable in your DataFrame functions, instead use the 'value' to access the value of Broadcast variable.

So, modify your code as below:

filterListB= sc.broadcast(filterList)
filter_df= df.where((df['Foo'].isin(filterListB.value)) | (df['Bar'].isin(filterListB.value)))

Reference: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.