1

My data is located in azure cosmos DB, I have mounted the dataset on azure databricks.

I can read a csv file using pandas and load it to a spark dataframe.

df = pd.read_csv('/dbfs/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv')
sdf = spark.createDataFrame(df)
sdf.head()

This works with the following output to console, I can do further processing with this data frame.

(1) Spark Jobs
sdf:pyspark.sql.dataframe.DataFrame = [Forest: string, LoadBalanceMoveReason: string ... 4 more fields]
Out[34]: Row(Forest='AUSP282', LoadBalanceMoveReason='DefaultEncryption', CompletionDate='5/26/2020 12:00:00 AM', efficiencyRopCount=None, efficiencySize=0.9966470723725392, efficiencyIOPS=None)

But when I try to read the file directly using spark dataframe, it fails with a read error.

df = spark.read.csv('/dbfs/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv')
df

Returns

Py4JJavaError                             Traceback (most recent call last)
<command-4117735793908621> in <module>
----> 1 df = spark.read.csv('/dbfs/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv')
      2 df

/databricks/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)
    533             path = [path]
    534         if type(path) == list:
--> 535             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    536         elif isinstance(path, RDD):
    537             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     96     def deco(*a, **kw):
     97         try:
---> 98             return f(*a, **kw)
     99         except py4j.protocol.Py4JJavaError as e:
    100             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3781.csv.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at com.databricks.backend.daemon.driver.ClassLoaders$ReplWrappingClassLoader.loadClass(ClassLoaders.scala:65)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:405)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
    at java.util.ServiceLoader$LazyIterator.access$700(ServiceLoader.java:323)
    at java.util.ServiceLoader$LazyIterator$2.run(ServiceLoader.java:407)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:409)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:696)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:780)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:317)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:807)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.ReadSupport
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

2 Answers 2

1

To read via spark methods from mounted storage you shouldn't include the /dbfs prefix:

df = spark.read.csv('/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv')
Sign up to request clarification or add additional context in comments.

1 Comment

That does not work. I get the same error, df = spark.read.csv('/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv') returns java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport
1

Try the below one,

df=spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("mode", "DROPMALFORMED").load("/dbfs/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv")
df.show()

Edited #1 Honestly I believe there is some configuration issue in your cluster creation. If your sole intention is to read COSMOS db Data. Then you can try the following,

readConfig = {
  "Endpoint" : "https://<cosmos_end_point_name>.documents.azure.com:443/",
  "Masterkey" : "<master_key_value>",
  "Database" : "<database_name>",
  "preferredRegions" : "East US",
  "Collection": "<collection_name>",
  "schema_samplesize" : "1000",
  "query_pagesize" : "200000",
  "query_custom" : "SELECT * FROM c"
}
df = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()

For doing this Add the Maven library for Spark CosmosDB under Maven Packages in your cluster configuration. Assume your environment is compatible then try using 'com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:3.0.2'

2 Comments

That does not work too, I still have the same error.
Try my edited second approach. Probably it can help.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.