1

So far, I am able to retrieve data from MongoDB using mongo-hadoop-core 1.4.2. The data I want to manipulate are values inside arrays inside an embed document inside each document in the collection I am querying, and I need these values as Double's. The data retrieved from collections has type RDD[(Object, org.bson.BSONObject)], which means each document is a tuple of types (Object, org.bson.BSONObject).

Whenever I want get an embed document, I do (working on spark-shell 1.5.1):

import com.mongodb.{BasicDBObject, BasicDBList} // classes I am using here.    

// 'documents' already taken from collection.
scala> documents
res4: org.apache.spark.rdd.RDD[(Object, org.bson.BSONObject)] = NewHadoopRDD[0] at newAPIHadoopRDD at <console>:32

// getting one document.
scala> val doc = documents.take(1)(0) 
doc: (Object, org.bson.BSONObject) = ( ... _id fields ... , ... lots of fields ...)

// getting an embed document from tuple's second element.
scala> val samples = doc._2.get("samp") match {case x: BasicDBObject => x}
samples: com.mongodb.BasicDBObject = (... some fields ...)

// getting an embed document.
scala> val latency = samples.get("latency") match {case x: BasicDBObject => x}
latency: com.mongodb.BasicDBObject = { "raw" : [ 9.71 , 8.77 , 10.16 , 9.49 , 8.54 , 10.29 , 9.55 , 9.16 , 10.78 , 10.31 , 9.54 , 10.69 , 10.33 , 9.58 , 9.07 , 9.72 , 9.48 , 8.72 , 10.59 , 9.81 , 9.31 , 10.64 , 9.87 , 9.29 , 10.38 , 9.64 , 8.86 , 10.84 , 10.06 , 9.29 , 8.45 , 9.08 , 7.55 , 9.75 , 9.05 , 10.38 , 9.64 , 8.25 , 10.27 , 9.54 , 8.52 , 10.26 , 9.53 , 7.87 , 9.76 , 9.02 , 10.27 , 7.93 , 9.73 , 9 , 10.07 , 9.35 , 7.66 , 13.68 , 11.92 , 14.72 , 14 , 12.55 , 11.77 , 11.02 , 11.59 , 10.87 , 10.4 , 9.13 , 10.28 , 9.55 , 10.43 , 8.33 , 9.66 , 8.93 , 8.05 , 11.26 , 10.53 , 9.81 , 10.2 , 9.42 , 7.73 , 9.76 , 9.04 , 8.29 , 9.34 , 7.21 , 10.05 , 9.32 , 10.28 , 8.59 , 10.15 , 9.53 , 7.88 , 9.9 , 9.15 , 13.96 , 13.19 , 11 , 13.6 , 13.01 , 12.17 , 11.39 , 10.64 , 9.9] , "xtrf" : { "...

// getting a bson array.
scala> val array = latency.get("raw") match {case x: BasicDBList => x}
array: com.mongodb.BasicDBList =  [ 9.71 , 8.77 , 10.16 , 9.49 , 8.54 , 10.29 , 9.55 , 9.16 , 10.78 , 10.31 , 9.54 , 10.69 , 10.33 , 9.58 , 9.07 , 9.72 , 9.48 , 8.72 , 10.59 , 9.81 , 9.31 , 10.64 , 9.87 , 9.29 , 10.38 , 9.64 , 8.86 , 10.84 , 10.06 , 9.29 , 8.45 , 9.08 , 7.55 , 9.75 , 9.05 , 10.38 , 9.64 , 8.25 , 10.27 , 9.54 , 8.52 , 10.26 , 9.53 , 7.87 , 9.76 , 9.02 , 10.27 , 7.93 , 9.73 , 9 , 10.07 , 9.35 , 7.66 , 13.68 , 11.92 , 14.72 , 14 , 12.55 , 11.77 , 11.02 , 11.59 , 10.87 , 10.4 , 9.13 , 10.28 , 9.55 , 10.43 , 8.33 , 9.66 , 8.93 , 8.05 , 11.26 , 10.53 , 9.81 , 10.2 , 9.42 , 7.73 , 9.76 , 9.04 , 8.29 , 9.34 , 7.21 , 10.05 , 9.32 , 10.28 , 8.59 , 10.15 , 9.53 , 7.88 , 9.9 , 9.15 , 13.96 , 13.19 , 11 , 13.6 , 13.01 , 12.17 , 11.39 , 10.64 , 9.9]

Converting type Object to BasicDBObject is quite inconvenient but I need to do it in order to use get(key: String). I could also use .asInstanceOf[BasicDBObject] instead of match {case x: BasicDBObject => x} but is there any better way??. Getting specific types, like Double, Int, String and Date, is straight forward using inhereted methods from BasicBsonObject class.

As for BasicDBList, there's a get(key: String) method, inherited from BasicBsonList, that returns an Object that can be cast to Double but only using a .asInstanceOf[Double] call and there's a toArray() inherited from java.util.ArrayList that returns an array of Object's that I can't cast to Double, even with .map(_.asInstanceOf[Double]) as I'm doing here:

scala> val arrayOfDoubles = array.toArray.map(_.asInstanceOf[Double])
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:37)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $iwC$$iwC$$iwC.<init>(<console>:50)
at $iwC$$iwC.<init>(<console>:52)
at $iwC.<init>(<console>:54)
at <init>(<console>:56)
at .<init>(<console>:60)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

but sometimes it works. In some documents this cast works, in other documents it doesn't and prints that error message above. Could this be a problem in the data structure given by MongoDB, to spark, but just in these documents? Smaller arrays, with 30 values, seems to always work.

my solution so far is this inefficient conversion:

scala> val arrayOfDoubles = array.toArray.map(_.toString.toDouble)
arrayOfDoubles: Array[Double] = Array(9.71, 8.77, 10.16, 9.49, 8.54, 10.29, 9.55, 9.16, 10.78, 10.31, 9.54, 10.69, 10.33, 9.58, 9.07, 9.72, 9.48, 8.72, 10.59, 9.81, 9.31, 10.64, 9.87, 9.29, 10.38, 9.64, 8.86, 10.84, 10.06, 9.29, 8.45, 9.08, 7.55, 9.75, 9.05, 10.38, 9.64, 8.25, 10.27, 9.54, 8.52, 10.26, 9.53, 7.87, 9.76, 9.02, 10.27, 7.93, 9.73, 9.0, 10.07, 9.35, 7.66, 13.68, 11.92, 14.72, 14.0, 12.55, 11.77, 11.02, 11.59, 10.87, 10.4, 9.13, 10.28, 9.55, 10.43, 8.33, 9.66, 8.93, 8.05, 11.26, 10.53, 9.81, 10.2, 9.42, 7.73, 9.76, 9.04, 8.29, 9.34, 7.21, 10.05, 9.32, 10.28, 8.59, 10.15, 9.53, 7.88, 9.9, 9.15, 13.96, 13.19, 11.0, 13.6, 13.01, 12.17, 11.39, 10.64, 9.9)

Am I missing something here or things are really inconvenient? Why do all these methods have to return Object or BSONObject? is there any way to overcome this problem I found? Where have this java.lang.Integer come from if there aren't integers in the array being cast to double?

1 Answer 1

1

First of all, I'd advice you to have a look at casbah if you haven't yet.

To answer your question: if you import Java conversions:

import scala.collection.JavaConversions._

You should be able to map directly over the collection without the toArray call. If your array contains either Doubles or Integers you could cast it to Number and get the double value. Like so:

array.map(_.asInstanceOf[Number].doubleValue)

I don't know what your data source looks like, but given the fact you occasionally get an Integer where you expect a Double it's likely to assume that you store round decimal numbers (e.g. 11.0) as integer (e.g. 11).

Sign up to request clarification or add additional context in comments.

1 Comment

I've read about casbah before and I figured it doesn't fit the purpose of a spark cluster cluster. I'm trying to use it now but only as a library that adds functionalities to the java driver classes, mongo-hadoop is still querying the database and giving an RDD. But I still need to add the mongo-java-driver dependency even thought it's said on casbah documentation that it has the classes with it. Am i supposed to keep the java driver dependency?? The JavaConversions worked just fine, thanks. But I'm gonna work on my code to make it use casbah now.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.