1

I'm trying to store a Spark DataFrame as a CSV on Azure Blob Storage from a local Spark cluster

First, I set the config with the Azure Account/Account Key (I'm not sure what is the proper config so I've set all those)

sparkContext.getConf.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)

sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.dfs.core.windows.net", accountKey)
    sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)

Then I try to store the CSV with the following

filePath = s"wasbs://${container}@${account}.blob.core.windows.net/${prefix}/${filename}"
dataFrame.coalesce(1)
  .write.format("csv")
  .options(Map(
    "header" -> (if (hasHeader) "true" else "false"),
    "sep" -> delimiter,
    "quote" -> quote
  ))
  .save(filePath)

But then this fails with Job aborted and the following stack trace

org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)

But when I look in the blob container, I can see my file however I cannot read it back in a Spark DataFrame, I get this error Unable to infer schema for CSV. It must be specified manually.; and following stack trace

org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:184)
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

It seems that the problem was already reported on Databricks forum!!

What is the proper way to store a DataFrame on Azure Blob?

1 Answer 1

0

It turns out way before the job fails there was an internal error

Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
    at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
    at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
    at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
    at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
    ... 18 more

What's happening is after creating a temp file with the actual data, it's trying to move the file to the location given by the user using CloudBlob.startCopyFromBlob. Like always, microsft people broke this by renaming this method to CloudBlob.startCopy.

I'm using "org.apache.hadoop" % "hadoop-azure" % "3.2.1" which is most recent for "hadoop-azure" and it seems to have stuck with the older startCopyFromBlob, so I need to use an old azure-storage version that has this method, probably 2.x.x.

See https://github.com/Azure/azure-storage-java/issues/113

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.