8

I'm trying to load some csv data into a spark cluster and run some queries on it, but i'm running into problems getting the data loaded.

See code sample below - I've generated a header and am trying to parse the columns, but the process fails when running against the (large, column rich) data set with an obfuscated error message: 'java.lang.String is not a valid external type for schema of string'

This doesn't seem to be solved elsewhere on the internet - any one know what the problem might be?

(I originally thought this might be related to null or empty fields being loaded, but the process fails after some time, and the source data is very very sparse)


var headers = StructType(header_clean.split(",").map(fieldName ⇒ StructField(fieldName, StringType, true)))
var contentRdd = contentNoHeader.map(k => k.split(",")).map(
    p => Row(p.map( x => x.replace("\"", "").trim)))

contentRdd.createOrReplaceTempView("someView")

val domains = spark.sql("SELECT DISTINCT domain FROM someView")

For reference, bottom of error log (very spammy, lots of columns

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true) AS pageUrl#377
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true)    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt    :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)    :  :  +- input[0, org.apache.spark.sql.Row, true]    : 
+- 87    :- null    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)   at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)   at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85)   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)  ... 3 more Caused by: java.lang.RuntimeException: [Ljava.lang.String; is not a valid external type for schema of string   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source)   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276) ... 17 more
3
  • 2
    I have a similar problem (trying to create a dataframe with a Double as one of the columns) and am getting a very similar error. I don't have a full answer yet, but looking at some code on GitHub that appears to generate this exception, it seems this is related to boxing/unboxing and the decision to validate schemas being sent in from the driver program. Just thought I'd add some info since the question hasn't been answered yet. Commented Nov 7, 2016 at 17:46
  • 1
    I had a similar issue that (java.util.HashMap is not a valid external type for schema of map) that I resolved by explicitly converting to scala's Map with import collection.JavaConverters._ and .toMap. maybe converting to scala.String will help here as well Commented Nov 8, 2016 at 13:28
  • @blueberryfields can you attach more logs to debug, thanks Commented Apr 16, 2018 at 18:55

1 Answer 1

1

I solved this problem by split the element of Row. You can do this:

StructType(header_clean.split(",").map(fieldName ⇒StructField(fieldName, StringType, true)))
var contentRdd = contentNoHeader.map(k => k.split(",")).map(
p => {
  val ppp = p.map( x => x.replace("\"", "").trim)
  Row(ppp(0),ppp(1),ppp(2))
})
Sign up to request clarification or add additional context in comments.

2 Comments

I don't think so you should mention your personal information like your email address or mobile number in your answer. and also as far as your concern is to be recommended, SO is not the platform for it.
I'm so sorry to do that, because this is the first to recommend.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.