1

I am new to Spark Streaming and Pandas UDF. I am working on pyspark consumer from kafka, payload is of xml format and trying to parse the incoming xml by applying pandas udf

@pandas_udf("col1 string, col2 string",PandasUDFType.GROUPED_MAP)
def test_udf(df):
    import xmltodict
    from collections import MutableMapping 
    xml_str=df.iloc[0,0]
    df_col=['col1', 'col2']
    doc=xmltodict.parse(xml_str,dict_constructor=dict)
    extract_needed_fields = { k:doc[k] for k in df_col }
    return pd.DataFrame( [{'col1': 'abc', 'col2': 'def'}] , index=[0] , dtype="string" )

data=df.selectExpr("CAST(value AS STRING) AS value") 
data.groupby("value").apply(test_udf).writeStream.format("console").start()

I get the below error

  File "pyarrow/array.pxi", line 859, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 215, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 104, in pyarrow.lib._handle_arrow_array_protocol
ValueError: Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.


Is this the right approach ? What am i doing wrong

2

2 Answers 2

1

It looks like, as if this is a more kind of undocumented limitation than a bug. You cannot use any pandas type which will be stored as an array object, which has a method named __arrow_array__, because pyspark always defines a mask. The string type you used, is stored in a StringArray, which is such a case. After I converted the string dtype into object, the error went away.

Sign up to request clarification or add additional context in comments.

Comments

1

While converting a pandas dataframe to a pyspark one, I stumbled upon this error as well :

Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol

My pandas dataframe had datetime-like values that I tried to convert to "string". I initially used the astype("string") method, which looked like this :

df["time"] = (df["datetime"].dt.time).astype("string")

When I tried to get the info of this dataframe, it seemed like it was indeed converted to a string type :

df.info(verbose=True)
> ...
>  #   Column    Non-Null Count   Dtype
> ...
>  6   time      295452 non-null  string

But the error kept coming back to me.

Solution

To avoid it, I instead went on to use the apply(str) method :

df["time"] = (df["datetime"].dt.time).apply(str)

Which gave me a type of object

df.info(verbose=True)
> ...
>  #   Column    Non-Null Count   Dtype
> ...
>  6   time      295452 non-null  object

After that, the conversion was successful

spark.createDataFrame(df)
# DataFrame[datetime: string, date: string, year: bigint, month: bigint, day: bigint, day_name: string, time: string, hour: bigint, minute: bigint]

1 Comment

Had the same issue, this seemed to work, using .apply(str) rather than .astype(string)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.