1

I currently have a dataframe containing the results of the SQL query. The dataframe has columns patientID, date, and code.

val res1 = sqlContext.sql("select encounter.Member_ID AS patientID, encounter.Encounter_DateTime AS date, diag.code from encounter join diag on encounter.Encounter_ID = diag.Encounter_ID")

I am attempting to take this dataframe and place it into an RDD of the format RDD[Diagnostic] where Diagnostic is a case class of the form:

case class Diagnostic(patientID:String, date: Date, code: String)

Is this possible? My current attempt is throwing back a scala.MatchError coming from the below line.

val diagnostic: RDD[Diagnostic] = res1.map {
  case Row(patientID:String, date:java.util.Date, code:String) => Diagnostic(patientID=patientID, date=date, code=code)
}

Schema:

root
 |-- patientID: string (nullable = true)
 |-- date: string (nullable = true)
 |-- code: string (nullable = true)

Error message from res1.as[Diagnostic]:

Main.scala:170: overloaded method value as with alternatives:
[error]   (alias: Symbol)org.apache.spark.sql.DataFrame <and>
[error]   (alias: String)org.apache.spark.sql.DataFrame
[error]  does not take type parameters
[error]     val testlol: RDD[Diagnostic] = res1.as[Diagnostic]
[error]                                         ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Oct 9, 2016 3:16:38 PM

Entire error message:

[Stage 4:=======================================>                   (2 + 

1) / 3]16/10/09 14:23:32 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8)
scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/10/09 14:23:32 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 8, localhost): scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

16/10/09 14:23:32 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 8, localhost): scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
[error]         at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
[error]         at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
[error]         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
[error]         at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
[error]         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
[error]         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
[error]         at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
[error]         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
[error]         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
[error]         at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
[error]         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
[error]         at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
[error]         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
[error]         at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
[error]         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
[error]         at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
[error]         at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
[error]         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
[error]         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
[error]         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
[error]         at org.apache.spark.scheduler.Task.run(Task.scala:64)
[error]         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
[error]         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[error]         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[error]         at java.lang.Thread.run(Thread.java:745)
[error]
[error] Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 8, localhost): scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[trace] Stack trace suppressed: run last compile:run for the full output.
16/10/09 14:23:32 ERROR ContextCleaner: Error in cleaning thread
java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:146)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:144)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:144)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:143)
        at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
16/10/09 14:23:32 ERROR Utils: Uncaught exception in thread SparkListenerBus
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:996)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
        at java.util.concurrent.Semaphore.acquire(Semaphore.java:317)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:62)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
java.lang.RuntimeException: Nonzero exit code: 1
        at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 13 s, completed Oct 9, 2016 2:23:32 PM
2
  • can you show the entire error message? It should show the actual types that probably mismatch the ones you expect. Commented Oct 9, 2016 at 14:19
  • Added above the error message. Commented Oct 9, 2016 at 14:48

1 Answer 1

2

java.util.Date is not data type that can be stored in a DataFrame. From the looks of it date is a Timestamp String. If I am right case class should be defined as:

case class Diagnostic(patientID: String, date: java.sql.Timestamp, code: String)

you should replace pattern:

case Row(patientID: String, date: java.util.Date, code: String)

with:

case Row(patientID: String, date: java.sql.Timestamp, code: String)

and cast date to timestamp:

res1.select($"patientID", $"date".cast("timestamp"), $"code")

Finally you should use rdd method before mapping for the forward compatibility:

res1.select($"patientID", $"date".cast("timestamp"), $"code").rdd.map {
  ...
}

In general I would recommend using as method:

res1.as[Diagnostic].rdd
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.