I have a file contains bytes information like below with delimiter : '#*'
b'\x00\x00V\x97'#*b'2%'#*b'\x00\x00'#*b'\xc5'#*b'\t'#*b'\xc0'
I want to read this data using Pyspark and each column has different conversion methods to convert into ascii.
I have read this file using String Type() but when I'm doing conversion it is throwing error
'Attribute Error: 'str' object has no attribute 'decode''
customSchema = StructType(
[
StructField("Col1", StringType(), True),
StructField("Col2", StringType(), True),
StructField("Col3", StringType(), True),
]
)
df = (
spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.schema(customSchema)
.option("sep", "#*")
.load("/FileStore/tables/EbcdicTextData.txt")
)
df.show()
# col1 col2 col3
# |b'\x00\x01d0'|b'I\x08'|b'\x00\x00'|
def unpack_ch_or_zd(bytes: bytearray) -> str:
ascii_text = bytes.decode("cp037").replace("\x00", "").rstrip()
return ascii_text if ascii_text.isascii() else "Non-ASCII"
def unpack_pd_or_pd_plus(bytes) -> str:
ascii_text = (
"" if bytes.hex()[-1:] != "d" and bytes.hex()[-1:] != "b" else "-"
) + bytes.hex()[:-1]
return ascii_text if ascii_text.isascii() else "Non-ASCII"
def unpack_pd_or_pd_plus_dec(bytes, decimal: int) -> str:
ascii_text = (
"" if bytes.hex()[-1:] != "d" and bytes.hex()[-1:] != "b" else "-"
) + bytes.hex()[:-1]
ascii_text = (
ascii_text[:-decimal] + "." + ascii_text[-decimal:]
if ascii_text.isascii()
else "Non-ASCII"
)
return ascii_text
def unpack_bi_or_biplus_no_dec(bytes) -> str:
# elif (type.lower() == "bi" or ( type.lower() == "bi+" and bytes.hex() <= HighestPositive[:len(bytes) * 2])) and decimal == 0:
a = str(int("0x" + bytes.hex(), 0))
return a if a.isascii() else "Non-ASCII"
unpack_ch_or_zd_UDF = udf(lambda x: unpack_ch_or_zd(x), StringType())
unpack_pd_or_pd_plus_UDF = udf(lambda x: unpack_pd_or_pd_plus(x), StringType())
unpack_pd_or_pd_plus_dec_UDF = udf(lambda x: unpack_pd_or_pd_plus_dec(x), StringType())
Layout_df is a dataframe that Contains column names and conversion Type of the particular column:
for row in layout_df.collect():
column_name = row["Field_name"]
conversion_type = row["Python_Data_type"]
if conversion_type.lower() == "ch" or conversion_type.lower() == "zd":
df = df.withColumn(column_name, unpack_ch_or_zd_UDF(col(column_name)))
elif (
conversion_type.lower() == "pd" or conversion_type.lower() == "pd+"
) and decimal == 0:
df = df.withColumn(column_name, unpack_pd_or_pd_plus_UDF(col(column_name)))
elif (
conversion_type.lower() == "pd" or conversion_type.lower() == "pd+"
) and decimal > 0:
df = df.withColumn(column_name, unpack_pd_or_pd_plus_dec_UDF(col(column_name)))
/FileStore/tables/EbcdicTextData.txt? Either paste the content of the file or use this post to understand how to create a manual dataframe : stackoverflow.com/questions/57959759/…