1

I wrote following code to fetch data from MongoDB

import com.typesafe.config.ConfigFactory
import org.mongodb.scala.{ Document, MongoClient, MongoCollection, MongoDatabase }

import scala.concurrent.ExecutionContext

object MongoService extends Service {
  val conf = ConfigFactory.load()
  implicit val mongoService: MongoClient = MongoClient(conf.getString("mongo.url"))
  implicit val mongoDB: MongoDatabase = mongoService.getDatabase(conf.getString("mongo.db"))
  implicit val ec: ExecutionContext = ExecutionContext.global

  def getAllDocumentsFromCollection(collection: String) = {
    mongoDB.getCollection(collection).find()
  }
}

But when I tried to get data from getAllDocumentsFromCollection I'm not getting each data for further manipulation. Instead I'm getting

FindObservable(com.mongodb.async.client.FindIterableImpl@23555cf5)

UPDATED:

object MongoService {
  // My settings (see available connection options)
  val mongoUri = "mongodb://localhost:27017/smsto?authMode=scram-sha1"

  import ExecutionContext.Implicits.global // use any appropriate context

  // Connect to the database: Must be done only once per application
  val driver = MongoDriver()
  val parsedUri = MongoConnection.parseURI(mongoUri)
  val connection = parsedUri.map(driver.connection(_))

  // Database and collections: Get references
  val futureConnection = Future.fromTry(connection)
  def db1: Future[DefaultDB] = futureConnection.flatMap(_.database("smsto"))
  def personCollection = db1.map(_.collection("person"))

  // Write Documents: insert or update

  implicit def personWriter: BSONDocumentWriter[Person] = Macros.writer[Person]
  // or provide a custom one

  def createPerson(person: Person): Future[Unit] =
        personCollection.flatMap(_.insert(person).map(_ => {})) // use personWriter
  def getAll(collection: String) =
    db1.map(_.collection(collection))

  // Custom persistent types
  case class Person(firstName: String, lastName: String, age: Int)
}

I tried to use reactivemongo as well with above code but I couldn't make it work for getAll and getting following error in createPerson enter image description here Please suggest how can I get all data from a collection.

7
  • 2
    Did you read the documentation - tl;dr the mongo Scala driver is built over the java async driver, that means thar every operation over the db will be executed asynchronously (meaning you won't get the results but instead an abstraction to work on them with callbacks). You may write your own observable, or just call the toFuture method to get a Future[Iterable[Document]] and operate over it, or if you really want your documents "right now" and make your code completely synchronous await the future Commented Dec 23, 2018 at 6:39
  • Or have a look at reactivemongo.org (I'm maintainer of) Commented Dec 23, 2018 at 10:47
  • @cchantep I've tried with reactivemongo. I've updated my question. Please kindly check above. I couldn't make it work :( Commented Dec 23, 2018 at 11:10
  • 1
    "I couldn't make it work for getAll" is not sufficiently specific to get helped, error? BTW the get all is not doing any query, so cannot return any result: reactivemongo.org/releases/0.1x/documentation/tutorial/… Commented Dec 23, 2018 at 11:26
  • @cchantep I've updated question with error on createPerson. Commented Dec 23, 2018 at 11:43

1 Answer 1

3

This is likely too late for the OP, but hopefully the following methods of retrieving & iterating over collections using mongo-spark can prove useful to others.

The Asynchronous Way - Iterating over documents asynchronously means you won't have to store an entire collection in-memory, which can become unreasonable for large collections. However, you won't have access to all your documents outside the subscribe code block for reuse. I'd recommend doing things asynchronously if you can, since this is how the mongo-scala driver was intended to be used.

db.getCollection(collectionName).find().subscribe(
    (doc: org.mongodb.scala.bson.Document) => {
        // operate on an individual document here
    },
    (e: Throwable) => {
        // do something with errors here, if desired
    },
    () => {
        // this signifies that you've reached the end of your collection
    }
)

The "Synchronous" Way - This is a pattern I use when my use-case calls for a synchronous solution, and I'm working with smaller collections or result-sets. It still uses the asynchronous mongo-scala driver, but it returns a list of documents and blocks downstream code execution until all documents are returned. Handling errors and timeouts may depend on your use case.

import org.mongodb.scala._
import org.mongodb.scala.bson.Document
import org.mongodb.scala.model.Filters
import scala.collection.mutable.ListBuffer

/* This function optionally takes filters if you do not wish to return the entire collection.
 * You could extend it to take other optional query params, such as org.mongodb.scala.model.{Sorts, Projections, Aggregates}
 */
def getDocsSync(db: MongoDatabase, collectionName: String, filters: Option[conversions.Bson]): ListBuffer[Document] = {
    val docs = scala.collection.mutable.ListBuffer[Document]()
    var processing = true
    val query = if (filters.isDefined) {
        db.getCollection(collectionName).find(filters.get)
    } else {
        db.getCollection(collectionName).find()
    }
    query.subscribe(
        (doc: Document) => docs.append(doc), // add doc to mutable list
        (e: Throwable) => throw e,
        () => processing = false
    )
    while (processing) {
        Thread.sleep(100) // wait here until all docs have been returned
    }
    docs
}
// sample usage of 'synchronous' method
val client: MongoClient = MongoClient(uriString)
val db: MongoDatabase = client.getDatabase(dbName)
val allDocs = getDocsSync(db, "myCollection", Option.empty)
val someDocs = getDocsSync(db, "myCollection", Option(Filters.eq("fieldName", "foo")))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.