0

I'm attempting to improve the below code that creates a MongoDB connection and inserts a document using the insertDocument method:

import com.typesafe.scalalogging.LazyLogging
import org.mongodb.scala.result.InsertOneResult
import org.mongodb.scala.{Document, MongoClient, MongoCollection, MongoDatabase, Observer, SingleObservable}
import play.api.libs.json.JsResult.Exception

object MongoFactory extends LazyLogging {

  val uri: String = "mongodb+srv://*********"
  val client: MongoClient = MongoClient(uri)
    val db: MongoDatabase = client.getDatabase("db")
  val collection: MongoCollection[Document] = db.getCollection("col")

  def insertDocument(document: Document) = {
    val singleObservable: SingleObservable[InsertOneResult] = collection.insertOne(document)

    singleObservable.subscribe(new Observer[InsertOneResult] {
      override def onNext(result: InsertOneResult): Unit = println(s"onNext: $result")

      override def onError(e: Throwable): Unit = println(s"onError: $e")

      override def onComplete(): Unit = println("onComplete")


    })

  }

}

The primary issue I see with the above code is that if the connection becomes stale due to MongoDB server going offline or some other condition then the connection is not restarted.

An improvement to cater for this scenario is :

object MongoFactory extends LazyLogging {

  val uri: String = "mongodb+srv://*********"
  var client: MongoClient = MongoClient(uri)
  var db: MongoDatabase = client.getDatabase("db")
  var collection: MongoCollection[Document] = db.getCollection("col")

  def isDbDown() : Boolean = {
    try {
      client.getDatabase("db")
      false
    }
    catch {
        case e: Exception =>
          true
    }
  }

  def insertDocument(document: Document) = {

    if(isDbDown()) {
      client = MongoClient(uri)
      db = client.getDatabase("db")
      collection = db.getCollection("col")
    }

    val singleObservable: SingleObservable[InsertOneResult] = collection.insertOne(document)

    singleObservable.subscribe(new Observer[InsertOneResult] {
      override def onNext(result: InsertOneResult): Unit = println(s"onNext: $result")

      override def onError(e: Throwable): Unit = println(s"onError: $e")

      override def onComplete(): Unit = println("onComplete")


    })

  }

}

I expect this to handle the scenario if the DB connection becomes unavailable but is there a more idiomatic Scala method of determining

1
  • Is it necessary to explicite test if connection is fine? You could reconnect and try again in onError instead of explicitely using the connection every time. Consider also some scala lib for retrying in case of failure, e.g. github.com/softwaremill/retry Commented Jan 9, 2021 at 19:35

1 Answer 1

1

Your code does not create connections. It creates MongoClient instances.

As such you cannot "create a new connection". MongoDB drivers do not provide an API for applications to manage connections.

Connections are managed internally by the driver and are created and destroyed automatically as needed in response to application requests/commands. You can configure connection pool size and when stale connections are removed from the pool.

Furthermore, execution of a single application command may involve multiple connections (up to 3 easily, possibly over 5 if encryption is involved), and the connection(s) used depend on the command/query. Checking the health of any one connection, even if it was possible, wouldn't be very useful.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.