3

I have a valid PostgreSQL query : when I copy/paste it in PSQL, I get the desired result.
But when I run with Spark SQL it leads to a NullPointerException.

Here is the snippet of code causing the error:

extractDataFrame().show()

private def extractDataFrame(): DataFrame = {
  val query =
    """(
      SELECT events.event_facebook_id, events.name, events.tariffrange,
        eventscounts.attending_count, eventscounts.declined_count, eventscounts.interested_count,
        eventscounts.noreply_count,
        artists.facebookid as artist_facebook_id, artists.likes as artistlikes,
        organizers.organizerid, organizers.likes as organizerlikes,
        places.placeid, places.capacity, places.likes as placelikes
      FROM events
        LEFT JOIN eventscounts on eventscounts.event_facebook_id = events.event_facebook_id
        LEFT JOIN eventsartists on eventsartists.event_id = events.event_facebook_id
          LEFT JOIN artists on eventsartists.artistid = artists.facebookid
        LEFT JOIN eventsorganizers on eventsorganizers.event_id = events.event_facebook_id
          LEFT JOIN organizers on eventsorganizers.organizerurl = organizers.facebookurl
        LEFT JOIN eventsplaces on eventsplaces.event_id = events.event_facebook_id
          LEFT JOIN places on eventsplaces.placefacebookurl = places.facebookurl
      ) df"""

  spark.sqlContext.read.jdbc(databaseURL, query, connectionProperties)
}

The SparkSession is defined as follows:

val databaseURL = "jdbc:postgresql://dbHost:5432/ticketapp" 
val spark = SparkSession
  .builder
  .master("local[*]")
  .appName("tariffPrediction")
  .getOrCreate()

val connectionProperties = new Properties
connectionProperties.put("user", "simon")
connectionProperties.put("password", "root")

And here is the complete stacktrace:

[SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 27, localhost): java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:]

The most surprising part is that if I remove one (whichever one) of the LEFT JOIN clauses in the SQL query, I don't get any errors...

3 Answers 3

8

I have a very similar issues instead with a Teradata data source, and it came down to the column nullability on the DataFrame did not match the underlying data (the column had nullable=false, but some rows had null values in that particular field). The cause in my case was the Teradata JDBC Driver not returning the correct column metadata. I am yet to find a workaround to this.

To see the code that is being generated (within which the NPE is being thrown):

  • import org.apache.spark.sql.execution.debug._
  • call .debugCodegen() on the DataSet/DataFrame

Hope this helps.

Sign up to request clarification or add additional context in comments.

Comments

1

This issue is related to Teradata JDBC driver. This problem is discussed at https://community.teradata.com/t5/Connectivity/Teradata-JDBC-Driver-returns-the-wrong-schema-column-nullability/m-p/76667/highlight/true#M3798.

The root cause is discussed on the first page. And the solution is on the third page.

People from Teradata said they fixed the issue in the 16.10.* driver with a MAYBENULL parameter, but I am still seeing an undeterministic behaviour.

Here is a similar discussion https://issues.apache.org/jira/browse/SPARK-17195

Comments

1

If anyone else is still looking for a solution, you can use NULLIF on the column that is causing problems, caused by a JOIN resulting in a null value for a column that was originally not null in the specified schema.

Related JIRA: https://issues.apache.org/jira/browse/SPARK-18859

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.