0

I have a file contains bytes information like below with delimiter : '#*'

b'\x00\x00V\x97'#*b'2%'#*b'\x00\x00'#*b'\xc5'#*b'\t'#*b'\xc0'

I want to read this data using Pyspark and each column has different conversion methods to convert into ascii.

I have read this file using String Type() but when I'm doing conversion it is throwing error

'Attribute Error: 'str' object has no attribute 'decode''

customSchema = StructType(
    [
        StructField("Col1", StringType(), True),
        StructField("Col2", StringType(), True),
        StructField("Col3", StringType(), True),
    ]
)

df = (
    spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .schema(customSchema)
    .option("sep", "#*")
    .load("/FileStore/tables/EbcdicTextData.txt")
)

df.show()

#  col1           col2      col3
# |b'\x00\x01d0'|b'I\x08'|b'\x00\x00'|


def unpack_ch_or_zd(bytes: bytearray) -> str:
    ascii_text = bytes.decode("cp037").replace("\x00", "").rstrip()
    return ascii_text if ascii_text.isascii() else "Non-ASCII"


def unpack_pd_or_pd_plus(bytes) -> str:
    ascii_text = (
        "" if bytes.hex()[-1:] != "d" and bytes.hex()[-1:] != "b" else "-"
    ) + bytes.hex()[:-1]
    return ascii_text if ascii_text.isascii() else "Non-ASCII"


def unpack_pd_or_pd_plus_dec(bytes, decimal: int) -> str:
    ascii_text = (
        "" if bytes.hex()[-1:] != "d" and bytes.hex()[-1:] != "b" else "-"
    ) + bytes.hex()[:-1]
    ascii_text = (
        ascii_text[:-decimal] + "." + ascii_text[-decimal:]
        if ascii_text.isascii()
        else "Non-ASCII"
    )
    return ascii_text


def unpack_bi_or_biplus_no_dec(bytes) -> str:
    # elif (type.lower() == "bi" or ( type.lower() == "bi+" and bytes.hex() <= HighestPositive[:len(bytes) * 2])) and decimal == 0:
    a = str(int("0x" + bytes.hex(), 0))
    return a if a.isascii() else "Non-ASCII"


unpack_ch_or_zd_UDF = udf(lambda x: unpack_ch_or_zd(x), StringType())
unpack_pd_or_pd_plus_UDF = udf(lambda x: unpack_pd_or_pd_plus(x), StringType())
unpack_pd_or_pd_plus_dec_UDF = udf(lambda x: unpack_pd_or_pd_plus_dec(x), StringType())

Layout_df is a dataframe that Contains column names and conversion Type of the particular column:

for row in layout_df.collect():
    column_name = row["Field_name"]
    conversion_type = row["Python_Data_type"]
    if conversion_type.lower() == "ch" or conversion_type.lower() == "zd":
        df = df.withColumn(column_name, unpack_ch_or_zd_UDF(col(column_name)))
    elif (
        conversion_type.lower() == "pd" or conversion_type.lower() == "pd+"
    ) and decimal == 0:
        df = df.withColumn(column_name, unpack_pd_or_pd_plus_UDF(col(column_name)))
    elif (
        conversion_type.lower() == "pd" or conversion_type.lower() == "pd+"
    ) and decimal > 0:
        df = df.withColumn(column_name, unpack_pd_or_pd_plus_dec_UDF(col(column_name)))
3
  • Please add a Minimal, Reproducible Example. Include the creation of the dataframe, the UDF, the imports and the example should return the same error. Commented Feb 6, 2023 at 9:09
  • @Steven Can you please check now. Commented Feb 6, 2023 at 9:22
  • Your current example is not enough. How are we supposed to executed all this without the file /FileStore/tables/EbcdicTextData.txt ? Either paste the content of the file or use this post to understand how to create a manual dataframe : stackoverflow.com/questions/57959759/… Commented Feb 6, 2023 at 10:05

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.