5

For a DataFrame in pyspark, if a column is initialized with with F.lit(1) (or any other value), then it's assigned to some values inside pandas_udf (using shift() in this case, but can happen to any other functions), this causes the "Value at index is null" error.

Could anyone provide some hint at why does this happen? Is it a bug in pyspark?

See the code below and errors.

spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.createDataFrame([Row(id=1, name='a', c=3),
Row(id=2, name='b', c=6),
Row(id=3, name='a', c=2),
Row(id=4, name='b', c=9),
Row(id=5, name='c', c=7)])

df = df.withColumn('f', F.lit(1))


@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def shift_test(pdf):
    pdf['f'] = pdf['c'].shift(1)
    return pdf

df = df.groupby(['name']).apply(shift_test)
df.show()

There's no such error if I set the column f equal to c See the output below.

+---+---+----+---+
|  c| id|name|  f|
+---+---+----+---+
|  3|  1|   a|  1|
|  6|  2|   b|  1|
|  2|  3|   a|  1|
|  9|  4|   b|  1|
|  7|  5|   c|  1|
+---+---+----+---+


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-46-5b4a8c6e0258> in <module>
     18 
     19 df = df.groupby(['name']).apply(shift_test)
---> 20 df.show()

Py4JJavaError: An error occurred while calling o3378.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 97 in stage 426.0 failed 4 times, most recent failure: Lost task 97.3 in stage 426.0 (TID 6258, optoldevny1, executor 0): java.lang.IllegalStateException: Value at index is null
    at org.apache.arrow.vector.IntVector.get(IntVector.java:101)
    at org.apache.spark.sql.vectorized.ArrowColumnVector$IntAccessor.getInt(ArrowColumnVector.java:299)
    at org.apache.spark.sql.vectorized.ArrowColumnVector.getInt(ArrowColumnVector.java:84)
    at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getInt(MutableColumnarRow.java:117)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Value at index is null
    at org.apache.arrow.vector.IntVector.get(IntVector.java:101)
    at org.apache.spark.sql.vectorized.ArrowColumnVector$IntAccessor.getInt(ArrowColumnVector.java:299)
    at org.apache.spark.sql.vectorized.ArrowColumnVector.getInt(ArrowColumnVector.java:84)
    at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getInt(MutableColumnarRow.java:117)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more


3
  • Were you able to solve this? Commented Apr 27, 2020 at 5:53
  • Check my answer below Commented Apr 28, 2020 at 19:03
  • I had figured it out already. Thanks though :) Commented May 5, 2020 at 9:03

2 Answers 2

6

The reason you are getting the error is because of the null value introduced by the shift function and secondly because you are not making any changes to the returned schema to accept the null values.

When you are giving the return schema as df.schema, spark by default is taking nullable = False from the original schema. So you need to provide a new schema here where for column f, where you need to set nullable = True to avoid this error.

# Schema of output DataFrame
new_schema = StructType([
    StructField("c", IntegerType(), False), 
    StructField("id", IntegerType(), False), 
    StructField("name", StringType(), False), 
    StructField("f", IntegerType(), True)
  ])

@pandas_udf(new_schema, PandasUDFType.GROUPED_MAP)
def shift_test(pdf):
    pdf['f'] = pdf['c'].shift(1)
    return pdf
Sign up to request clarification or add additional context in comments.

Comments

2

It looks like that pyspark can't handle missing values when it comes out of a pandas_udf. Before it enters into a pandas_udf, it expects certain data type for each column (as specified by the schema in @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP).

If there is any missing value (as can be generated by shift here), it throws out, as Java can't handle missing values (The exception is a Java excepption: java.lang.IllegalStateException).

To fix this, one needs to replace such missing values with values in proper type, here an integer.

This new pandas_udf function fixes the problem:

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def shift_test(pdf):
    pdf['f'] = pdf['c'].shift(1)
    pdf['f'].fillna(value=-1, inplace=True) #replace missing values with -1
    return pdf

Here's the output

+---+---+----+---+
|  c| id|name|  f|
+---+---+----+---+
|  7|  5|   c| -1|
|  6|  2|   b| -1|
|  9|  4|   b|  6|
|  2|  3|   a| -1|
|  3|  1|   a|  2|
+---+---+----+---+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.