2

I'm trying to get a better understanding of the Scala MongoDB src

Using the scala mongodb driver (api doc : http://mongodb.github.io/mongo-scala-driver/)

when I use

  val collection: MongoCollection[Document] = database.getCollection("mycollection");

      val observable: Observable[Completed] = collection.insertOne(doc)


      observable.subscribe(new Observer[Completed] {
        override def onNext(result: Completed): Unit = println("Inserted")
        override def onError(e: Throwable): Unit = println("Failed")
        override def onComplete(): Unit = println("Completed")
      })

Is this implicit method

/**
     * Subscribes to the [[Observable]] and requests `Long.MaxValue`.
     *
     * Uses the default or overridden `onNext`, `onError`, `onComplete` partial functions.
     *
     * @param doOnNext anonymous function to apply to each emitted element.
     * @param doOnError anonymous function to apply if there is an error.
     * @param doOnComplete anonymous function to apply on completion.
     */
    def subscribe(doOnNext: T => Any, doOnError: Throwable => Any, doOnComplete: () => Any): Unit = {
      observable.subscribe(new Observer[T] {
        override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue)

        override def onNext(tResult: T): Unit = doOnNext(tResult)

        override def onError(throwable: Throwable): Unit = doOnError(throwable)

        override def onComplete(): Unit = doOnComplete()

      })
    }

src : https://github.com/mongodb/mongo-scala-driver/blob/master/driver/src/main/scala/org/mongodb/scala/ObservableImplicits.scala

called from :

 /**
   * Request `Observable` to start streaming data.
   *
   * This is a "factory method" and can be called multiple times, each time starting a new [[Subscription]].
   * Each `Subscription` will work for only a single [[Observer]].
   *
   * If the `Observable` rejects the subscription attempt or otherwise fails it will signal the error via [[Observer.onError]].
   *
   * @param observer the `Observer` that will consume signals from this `Observable`
   */
  def subscribe(observer: Observer[_ >: T]): Unit

src : https://github.com/mongodb/mongo-scala-driver/blob/master/driver/src/main/scala/org/mongodb/scala/Observable.scala

It seems that calling subscribe invoked a new thread (as it's called subscribe) but I don't see where this new thread is called from the src?

Implicits are used to achieve this 'wiring' that invokes the implicit subscribe method when I use observable.subscribe(new Observer[Completed] {.... ?

Update :

Using this code :

import org.mongodb.scala.MongoClient;
import org.mongodb.scala.bson.collection.immutable.Document;
import org.mongodb.scala._
import org.scalatest._
import Matchers._
import org.mongodb.scala._

class MongoSpec extends FlatSpec with Matchers {

  "Test MongoDb" should "insert" in {
    {
      val mongoClient: MongoClient = MongoClient()
      val database: MongoDatabase = mongoClient.getDatabase("scala-poc");

      val doc: Document = Document("_id" -> 6, "name" -> "MongoDB", "type" -> "database",
        "count" -> 1, "info" -> Document("x" -> 203, "y" -> 100))

      val collection: MongoCollection[Document] = database.getCollection("documents");

      val observable: Observable[Completed] = collection.insertOne(doc)

      observable.subscribe(new Observer[Completed] {
        override def onNext(result: Completed): Unit = println("Inserted")
        override def onError(e: Throwable): Unit = println(" \n\nFailed " + e + "\n\n")
        override def onComplete(): Unit = println("Completed")
      })

      mongoClient.close();

    }

  }
}

causes below exception :

Failed com.mongodb.MongoClientException: Shutdown in progress

The mongoClient.close(); is being invoked before insertOne method is completed.

So insertOne or subscribe method is asynchronous ?

1
  • 1
    For this you should look at the Java driver source code, not at Scala which is a simple wrapper around it. But of course insertOne is asynchronous, because Scala driver is using the async Java driver. Commented Apr 25, 2016 at 10:02

1 Answer 1

1
  1. No, subscribe(doOnNext, doOnError, doOnComplete) calls subscribe(observer) (as you can see from the implementation quoted in your question). So if it was also called from there, you'd get an infinite loop. The "wiring" is used when you write something like observer.subscribe(x => println(s"next = $x"), error => error.printStackTrace(), () => {}).

  2. No, subscribe doesn't create a new thread. Classes implementing Observable mostly wrap classes from the Java MongoDB driver and call their own subscribe methods, e.g. override def subscribe(observer: Observer[_ >: TResult]): Unit = observe(wrapped).subscribe(observer). These subscribe methods also don't start new threads: see https://mongodb.github.io/mongo-java-driver/3.1/driver-async/reference/observables/ for some explanation.

Sign up to request clarification or add additional context in comments.

1 Comment

please see update, there appears to be an asynchronous operation occurring , im not sure where.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.