0

I am using MongoDB Spark Connector to get a collection. The aim is that we want to return all the documents that are present in the collection. We want to return all these documents as an array of JSON documents.

I am able to get the collection but I am not sure how to convert the customRDD object which contains the list of documents to a JSON format. I can convert the first document as you can see in the code but how to convert all the documents that are read from the collection and then make one JSON message and send it.

Expected Output:

This can be the array of documents.

{
   "objects":[
      {
         ...
      },
      {
         ....
      }
   ]
} 

Existing Code:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config._
import com.mongodb.spark._
import org.json4s.native.JsonMethods._
import org.json4s.JsonDSL.WithDouble._

 var conf = new SparkConf()
    conf.setAppName("MongoSparkConnectorIntro")
      .setMaster("local")
      .set("spark.hadoop.validateOutputSpecs", "false")
      .set("spark.mongodb.input.uri","mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred")
      .set("spark.mongodb.output.uri","mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred")

    sc = new SparkContext(conf)
    val spark = SparkSession.builder().master("spark://192.168.137.103:7077").appName("MongoSparkConnectorIntro").config("spark.mongodb.input.uri", "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred").config("spark.mongodb.output.uri", "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred").getOrCreate()


    //val readConfig = ReadConfig(Map("collection" -> "metadata_collection", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred"))
    val customRdd = MongoSpark.load(sc, readConfig)

    //println("Before Printing the value" + customRdd.toString())
    println("The Count: "+customRdd.count)
    println("The First Document: " + customRdd.first.toString())

    val resultJSOn = "MetaDataFinalResponse" -> customRdd.collect().toList

    val stringResponse = customRdd.first().toJson()
    println("Final Response: " +stringResponse)
    return stringResponse

Note:

I don't want to further map the JSON documents into another model. I want them to be as it is. I just want to aggregate them in one JSON message.

Spark Version: 2.4.0

SBT File:

name := "Test"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.7.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

1 Answer 1

1

This answer generates json string without escape characters and much more efficient but you need to collect RDD to perform this(you can remove the code from my previous answer);

// We will create a new Document with the documents that are fetched from MongoDB
import scala.collection.JavaConverters._
import org.bson.Document
// Collect customRdd and convert to java array 
// (we can only create new Document with java collections)
val documents = customRdd.collect().toSeq.asJava
// Create new document with the field name you want
val stringResponse = new Document().append("objects", documents).toJson()
Sign up to request clarification or add additional context in comments.

7 Comments

Thanks for posting the answer. I tried this and it works but I am getting escape characters in the message with every key value " \ ". I think it is because of serialization. Can u tell me how can I avoid that? Because I don't want escape characters in the final message.
Like ({ \"date\" : \"27-04-2019\", \"sourceAddress\" : \"65.32.25.10\"...) this is how I am getting the JSON. How can I remove the escape characters
Hi @OldWolfs thanks for updating the answer again. The last line where u are creating new Document, scala is not able to recognize the "Document" do I need to include any other library for that in the project.
Hi @omerkhalid I've create your project using maven it may have some dependency differences. Document class comes from the package mongo-java-driver so you can add the following dependency to sbt and try "org.mongodb" % "mongodb-java-driver" % "3.9.0"
brother are u sure that there is no other dependency, I added the one that u mentioned but it is still not loading the Document Package.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.