1

I'm trying to get mongodb data (2gb) comments dataset to Azure databricks. It did run the first time and I was able to get the data (so guessing my connection string and the code is correct) but after that I keep getting the same error every-time I try to run it. I'm unable to resolve it. I tried but couldn't find much resources around it.

from pyspark.sql import SparkSession
database = database_name
collection = collection_name
connectionString= connection_string
spark=SparkSession.builder.config('spark.mongodb.input.uri',connectionString).config('spark.mongodb.input.uri', connectionString).config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1').getOrCreate()


###### Reading from MongoDB
df=spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",connectionString).option("database", database).option("collection", collection).load() ```

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 23) (10.139.64.6 executor 0): java.lang.NoSuchMethodError: com.mongodb.MongoNamespace.checkDatabaseNameValidity(Ljava/lang/String;)

Py4JJavaError                             Traceback (most recent call last)
<command-2293330354451765> in <module>
      1 # Reading from MongoDB
----> 2 df=spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",connectionString).option("database", database).option("collection", collection).load()

/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    208             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    209         else:
--> 210             return self._df(self._jreader.load())
    211 
    212     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115     def deco(*a, **kw):
    116         try:
--> 117             return f(*a, **kw)
    118         except py4j.protocol.Py4JJavaError as e:
    119             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o459.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 23) (10.139.64.6 executor 0): java.lang.NoSuchMethodError: com.mongodb.MongoNamespace.checkDatabaseNameValidity(Ljava/lang/String;)V
    at com.mongodb.ConnectionString.<init>(ConnectionString.java:371)
    at com.mongodb.spark.connection.DefaultMongoClientFactory.create(DefaultMongoClientFactory.scala:50)
    at com.mongodb.spark.connection.MongoClientCache.acquire(MongoClientCache.scala:55)
    at com.mongodb.spark.MongoConnector.acquireClient(MongoConnector.scala:239)
    at com.mongodb.spark.rdd.MongoRDD.compute(MongoRDD.scala:160)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2828)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2775)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2769)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2769)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1305)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1305)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1305)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3036)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2977)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2965)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1067)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2477)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2460)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2572)
    at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1193)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:419)
    at org.apache.spark.rdd.RDD.fold(RDD.scala:1187)
    at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1256)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:419)
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1232)
    at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:88)
    at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97)
    at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:390)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:444)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:400)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:400)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:273)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: com.mongodb.MongoNamespace.checkDatabaseNameValidity(Ljava/lang/String;)V
    at com.mongodb.ConnectionString.<init>(ConnectionString.java:371)
    at com.mongodb.spark.connection.DefaultMongoClientFactory.create(DefaultMongoClientFactory.scala:50)
    at com.mongodb.spark.connection.MongoClientCache.acquire(MongoClientCache.scala:55)
    at com.mongodb.spark.MongoConnector.acquireClient(MongoConnector.scala:239)
    at com.mongodb.spark.rdd.MongoRDD.compute(MongoRDD.scala:160)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more







3

1 Answer 1

0

on my case i have this cenary

The conection

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("myApp") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.2') \
    .getOrCreate() 

mongo_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", mongo_DB).option("collection", mongo_collection).load()
mongo_df.createOrReplaceTempView('collection')

But you need to register your spark.mongodb.input.uri and output.uri like this:

spark.mongodb.input.uri <your_connection_string>
spark.mongodb.output.uri <your_connection_string>

on your cluster config, like on print

enter image description here

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.