15

I'm trying to connect spark (pyspark) to mongodb as follows:

conf = SparkConf()
conf.set('spark.mongodb.input.uri', default_mongo_uri)
conf.set('spark.mongodb.output.uri', default_mongo_uri)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession \
    .builder \
    .appName("my-app") \
    .config("spark.mongodb.input.uri", default_mongo_uri) \
    .config("spark.mongodb.output.uri", default_mongo_uri) \
    .getOrCreate()

But when I do the following:

users = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("uri", '{uri}.{col}'.format(uri=mongo_uri, col='users')).load()

I get this error:

java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql.DefaultSource

I did the same thing from pyspark shell and I was able to retrieve data. This is the command I ran:

pyspark --conf "spark.mongodb.input.uri=mongodb_uri" --conf "spark.mongodb.output.uri=mongodburi" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2

But here we have the option to specify the package we need to use. But what about standalone apps and scripts. how can I configure mongo-spark-connector there.

Any ideas?

7 Answers 7

9

If you are using SparkContext & SparkSession, you have mentioned the connector jar packages in SparkConf, check the following Code:

    from pyspark import SparkContext,SparkConf
    conf = SparkConf().set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2")
    sc = SparkContext(conf=conf)

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
    .config("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
    .getOrCreate()

    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    df.printSchema()

If you are using only SparkSession then use following code:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
    .config("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.2') \
    .getOrCreate()

    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    df.printSchema()
Sign up to request clarification or add additional context in comments.

Comments

8

Here how I did it in Jupyter notebook:
1. Download jars from central or any other repository and put them in directory called "jars":
mongo-spark-connector_2.11-2.4.0
mongo-java-driver-3.9.0
2. Create session and write/read any data

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

working_directory = 'jars/*'

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection") \
    .config("spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection") \
    .config('spark.driver.extraClassPath', working_directory) \
    .getOrCreate()

people = my_spark.createDataFrame([("JULIA", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
                            ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", 22)], ["name", "age"])

people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.select('*').where(col("name") == "JULIA").show()

As a result you will see this:
enter image description here

Comments

2

If you're using the newest version of mongo-spark-connector, i.e. v10.0.1 at the time of writing this, you need to use SparkConf object, as stated by the mongo documentation (https://www.mongodb.com/docs/spark-connector/current/configuration/).

Besides, you don't need to manually download anything, it will do it for you.

Bellow is the solution I came up with, for :

  • mongo-spark-connector: 10.0.1
  • mongo server : 5.0.8
  • spark : 3.2.0
def init_spark():
    password = os.environ["MONGODB_PASSWORD"]
    user = os.environ["MONGODB_USER"]
    host = os.environ["MONGODB_HOST"]
    db_auth = os.environ["MONGODB_DB_AUTH"]
    mongo_conn = f"mongodb://{user}:{password}@{host}:27017/{db_auth}"

    conf = SparkConf()

    # Download mongo-spark-connector and its dependencies.
    # This will download all the necessary jars and put them in your $HOME/.ivy2/jars, no need to manually download them :
    conf.set("spark.jars.packages",
             "org.mongodb.spark:mongo-spark-connector:10.0.1")

    # Set up read connection :
    conf.set("spark.mongodb.read.connection.uri", mongo_conn)
    conf.set("spark.mongodb.read.database", "<my-read-database>")
    conf.set("spark.mongodb.read.collection", "<my-read-collection>")

    # Set up write connection
    conf.set("spark.mongodb.write.connection.uri", mongo_conn)
    conf.set("spark.mongodb.write.database", "<my-write-database>")
    conf.set("spark.mongodb.write.collection", "<my-write-collection>")
    # If you need to update instead of inserting :
    conf.set("spark.mongodb.write.operationType", "update")

    SparkContext(conf=conf)

    return SparkSession \
        .builder \
        .appName('<my-app-name>') \
        .getOrCreate()

spark = init_spark()
df = spark.read.format("mongodb").load()

df_grouped = df.groupBy("<some-column>").agg(mean("<some-other-column>"))

df_grouped.write.format("mongodb").mode("append").save()

Comments

1

I was also facing same error "java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql.DefaultSource" while trying to connect to MongoDB from Spark (2.3).

I had to download and copy mongo-spark-connector_2.11 JAR file(s) into jars directory of spark installation.

That resolved my issue and I was successfully able to call my spark code via spark-submit.

Hope it helps.

1 Comment

This worked for me, but now I get an error Caused by: java.lang.ClassNotFoundException: org.bson.conversions.Bson instead.
0

Here is how this error got resolved by downloading the jar files below. (Used the solution of this question.)

1.Downloaded the jar files below.

mongo-spark-connector_2.11-2.4.1 from here

mongo-java-driver-3.9.0 from here

  1. copy and paste both these jar files into 'jars' location in spark directory.

  2. Pyspark Code in jupiter notebook:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("mongo").\
config("spark.mongodb.input.uri","mongodb://127.0.0.1:27017/$database.$table_name").\
config("spark.mongodb.output.uri","mongodb://127.0.0.1:27017/$database.$table_name").\
getOrCreate()

df=spark.read.format('com.mongodb.spark.sql.DefaultSource')\
    .option( "uri", "mongodb://127.0.0.1:27017/$database.$table_name") \
    .load()

df.printSchema()

#create Temp view of df to view the data 
table = df.createOrReplaceTempView("df")

#to read table present in mongodb
query1 = spark.sql("SELECT * FROM df ")
query1.show(10)


Comments

0

I faced the similar issue, the issue is with version compatibility between spark and mongo_spark_connector, my tech stack versions are as below:

spark version - 2.4.8, java - 8, Mongo DB - 7.0
I have tried multiple versions of mongo-spark-connector jars, finally mongo-spark-connector_2.11-2.4.4 jar works for me and also I have used mongo-java-driver-3.12.5.jar.

df = spark.read\
      .format("com.mongodb.spark.sql.DefaultSource")\
      .option("spark.mongodb.input.uri","mongo_uri") \
      .option("collection", "mongo_collection_name")\
      .load()

Comments

-1

You are not using sc to create the SparkSession. Maybe this code can help you:

conf.set('spark.mongodb.input.uri', mongodb_input_uri)
conf.set('spark.mongodb.input.collection', 'collection_name')
conf.set('spark.mongodb.output.uri', mongodb_output_uri)
sc = SparkContext(conf=conf)
spark = SparkSession(sc) # Using the context (conf) to create the session

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.