2

We have parquet files generated with two different schemas where we have ID and Amount fields.
File:
file1.snappy.parquet
ID: INT
AMOUNT: DECIMAL(15,6)
Content:
1,19500.00
2,198.34


file2.snappy.parquet
ID: INT
AMOUNT: DECIMAL(15,2)
Content:
1,19500.00
3,198.34

When I am loading both the files together df3 = spark.read.parquet("output/"), and tried to get the data it is inferring the schema of Decimal(15,6) to the file which has amount with Decimal(16,2) and that files data is getting manipulated wrongly. Is there is a way that I can retrieve the data properly for this case.

Final output I could see after executing df3.show()
+---+-----------------+
|ID|       AMOUNT|
+---+-----------------+
| 1|        1.950000|
| 3|        0.019834|
| 1|19500.000000|
| 2|    198.340000|
+---+-----------------+

Here if you see for 1st and 2nd row the amount got manipulated incorrectly.

Looking for some suggestions on this. I know if we regenerate the files with same schema this issue will go away, this requires regeneration and replacing of the files which were delivered, is there any other way temporary which we can use and mean while we will work on regenerating those files.

~R, Krish

2
  • I have a point here, if spark is allowing us to load this data means, there would haven been an option as well to load the data correctly. Correct me if I am wrong here. Commented Jul 15, 2020 at 3:47
  • Raised issue to Spark community on this. issues.apache.org/jira/browse/SPARK-32317 Commented Jul 25, 2020 at 12:24

3 Answers 3

2

You can try by using mergeSchema property as true. So instead of

df3 = spark.read.parquet("output/") 

Try this:

df3 = spark.read.option("mergeSchema","true").parquet("output/")

But this will give inconsistency records if the version of spark is different for both the parquet. in this case the new version of spark should set the below property to true. spark.sql.parquet.writeLegacyFormat

Sign up to request clarification or add additional context in comments.

3 Comments

It didn't work, I already tried that. Its giving incompatible shema merging "Caused by: org.apache.spark.SparkException: Failed to merge fields 'AMOUNT' and 'AMOUNT'. Failed to merge decimal types with incompatible scala 6 and 2"
@Krish did you find a solution? I am stuck on a similar problem.
Go through the defect that I had raised issues.apache.org/jira/browse/SPARK-32317 , you can see some code push for this, see if that helps!
0

Try to read this as a string and provide the schema manually while reading the file

schema = StructType([
StructField("flag_piece", StringType(), True)
])

spark.read.format("parquet").schema(schema).load(path)

2 Comments

I already had thought of this, and this will be my last option, I had kept this as my final solution. But I am looking for some options in spark for this. Spark should be able to provide option for this as this will be a very generic mistake which can happen (like we did).
I was under an impression that this work around solution will work till we replace the files. Even for this spark is throwing the error "org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file*****.snappy.parquet. Column: [AMOUNT], Expected: string, Found: INT64". Does it worked for you?
0

The following worked for me:

df = spark.read.parquet("data_file/")
for col in df.columns:
    df = df.withColumn(col, df[col].cast("string"))

1 Comment

Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.