8

I'm stuck with a weird issue. I'm trying to locally connect Spark to MongoDB using mongodb spark connector.

Apart from setting up spark I'm using the following code:

val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))

// Load the movie rating data from Mongo DB
val movieRatings = MongoSpark.load(sc, readConfig).toDF()

movieRatings.show(100)

However, I get a compilation error:

java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property.

On line where I set up readConfig. I don't get why it's complaining for not set uri when I clearly have a uri property in the Map. I might be missing something.

3 Answers 3

11

You can do it from SparkSession as mentioned here

val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()

create dataframe using the config

val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"))
val df = MongoSpark.load(spark)

Write df to mongodb

MongoSpark.save(
df.write
    .option("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .mode("overwrite"))

In your code: prefixes are missing in config

val readConfig = ReadConfig(Map(
    "spark.mongodb.input.uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", 
    "spark.mongodb.input.readPreference.name" -> "secondaryPreferred"), 
    Some(ReadConfig(sc)))

val writeConfig = WriteConfig(Map(
    "spark.mongodb.output.uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))
Sign up to request clarification or add additional context in comments.

Comments

1

For Java, either you can set the configs while creating spark session or first create the session and then set it as runtime configs.

1.

SparkSession sparkSession = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnector")
    .config("spark.mongodb.input.uri","mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()

OR

2.

 SparkSession sparkSession = SparkSession.builder()
        .master("local")
        .appName("MongoSparkConnector")
        .getOrCreate()

Then,

     String mongoUrl = "mongodb://localhost:27017/movie_db.movie_ratings";
   sparkSession.sparkContext().conf().set("spark.mongodb.input.uri", mongoURL);
   sparkSession.sparkContext().conf().set("spark.mongodb.output.uri", mongoURL);
   Map<String, String> readOverrides = new HashMap<String, String>();
   readOverrides.put("collection", sourceCollection);
   readOverrides.put("readPreference.name", "secondaryPreferred");
   ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides);
   Dataset<Row> df = MongoSpark.loadAndInferSchema(sparkSession,readConfig);

Comments

0

If you are using MongoDB Spark Connector with version 10.1 or greater, the config key for the SparkConf has changed.

SparkSession sparkSession = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnector")
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.read.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()

The different ways for setting the configurations are still available.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.