1

I am using Databricks 4.3 (includes Apache Spark 2.3.1, Scala 2.11), Python version 3.5.

I have a Spark data frame df_spark and I ran pandas grouped UDF on it to get a new Spark data frame df_spark2, which only have one column of string type. When I shows the head of the df_spark2, I got error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 12.0 failed 4 times, most recent failure: Lost task 18.3 in stage 12.0 (TID 1973, 10.96.133.5, executor 0): java.lang.IllegalArgumentException: requirement failed: Decimal" precision 8 exceeds max precision 7

I tested the pandas grouped by UDF on a pandas data frame and it works well. The codes are:

sample = df[df.acct_id==10030255388]
reformat.func(sample)

Also I can successfully show the schema of the df_spark2

df_spark_2.schema

I got : StructType(List(StructField(Donation,StringType,true)))

below is my codes

df_spark = spark.createDataFrame(df)

from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('''Donation string''', PandasUDFType.GROUPED_MAP)  # first parameter is the schema of the output dataframe
def reformat(df):
  Donation = df[['amount','charges','organization','rowColor']].to_json(orient='records')
  temp_dict = {}
  temp_dict[df.acct_id.unique().item()] = Donation
  temp_df = pd.DataFrame.from_dict(data = temp_dict, orient='index', columns=['Donation'])
  return(temp_df)

df_spark_2 = df_spark.groupby("acct_id").apply(reformat)
#I got the messages: df_spark_2:pyspark.sql.dataframe.DataFrame = [Donation: string]

df_spark_2.head()
#here is where I got error messages.

below is the details of my error messages

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-2141407> in <module>()
----> 1 df_spark_2.head()

/databricks/spark/python/pyspark/sql/dataframe.py in head(self, n)
   1193         """
   1194         if n is None:
-> 1195             rs = self.head(1)
   1196             return rs[0] if rs else None
   1197         return self.take(n)

/databricks/spark/python/pyspark/sql/dataframe.py in head(self, n)
   1195             rs = self.head(1)
   1196             return rs[0] if rs else None
-> 1197         return self.take(n)
   1198 
   1199     @ignore_unicode_prefix

/databricks/spark/python/pyspark/sql/dataframe.py in take(self, num)
    520         [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
    521         """
--> 522         return self.limit(num).collect()
    523 
    524     @since(1.3)

/databricks/spark/python/pyspark/sql/dataframe.py in collect(self)
    479         # Default path used in OSS Spark / for non-DF-ACL clusters:
    480         with SCCallSiteSync(self._sc) as css:
--> 481             sock_info = self._jdf.collectToPython()
    482         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    483 

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o332.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 3.0 failed 4 times, most recent failure: Lost task 44.3 in stage 3.0 (TID 329, 10.96.134.14, executor 1): java.lang.IllegalArgumentException: requirement failed: Decimal precision 8 exceeds max precision 7
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.sql.types.Decimal.set(Decimal.scala:114)
    at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:453)
    at org.apache.spark.sql.types.Decimal.apply(Decimal.scala)
    at org.apache.spark.sql.vectorized.ArrowColumnVector$DecimalAccessor.getDecimal(ArrowColumnVector.java:360)
    at org.apache.spark.sql.vectorized.ArrowColumnVector.getDecimal(ArrowColumnVector.java:105)
    at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDecimal(MutableColumnarRow.java:130)

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o332.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 3.0 failed 4 times, most recent failure: Lost task 44.3 in stage 3.0 (TID 329, 10.96.134.14, executor 1): java.lang.IllegalArgumentException: requirement failed: Decimal precision 8 exceeds max precision 7
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.sql.types.Decimal.set(Decimal.scala:114)
    at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:453)
    at org.apache.spark.sql.types.Decimal.apply(Decimal.scala)
    at org.apache.spark.sql.vectorized.ArrowColumnVector$DecimalAccessor.getDecimal(ArrowColumnVector.java:360)
    at org.apache.spark.sql.vectorized.ArrowColumnVector.getDecimal(ArrowColumnVector.java:105)
    at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDecimal(MutableColumnarRow.java:130)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:64)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:70)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3236)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3234)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3334)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:89)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:175)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:84)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:126)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3333)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3234)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: requirement failed: Decimal precision 8 exceeds max precision 7
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.sql.types.Decimal.set(Decimal.scala:114)
    at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:453)
    at org.apache.spark.sql.types.Decimal.apply(Decimal.scala)
    at org.apache.spark.sql.vectorized.ArrowColumnVector$DecimalAccessor.getDecimal(ArrowColumnVector.java:360)
    at org.apache.spark.sql.vectorized.ArrowColumnVector.getDecimal(ArrowColumnVector.java:105)
    at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDecimal(MutableColumnarRow.java:130)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:620)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:384)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

1 Answer 1

0

I think the error started from where you triggered df_spark_2.count().collect(). If you just want to take the count of dataframe, just use count() -

df_spark_2.count()

A simple explanation of both function is below.

count() - Returns the number of rows in this DataFrame.

Collect() - Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

Sign up to request clarification or add additional context in comments.

1 Comment

I changed it from df_spark_2.count().collect() to df_spark_2.count() and I still got the same error. I also ran df_spark_2.head() which gave me the same error too.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.