5

I get "java.lang.IllegalStateException: not ready" in org.bson.BasicBSONDecoder._decode while trying to use MongoDB as input RDD:

Configuration conf = new Configuration();
conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input");

JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class);

System.out.println(rdd.count());

The exception I get is: 14/08/06 09:49:57 INFO rdd.NewHadoopRDD: Input split:

MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57 
WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException 
java.lang.IllegalStateException: not ready
            at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139)
            at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123)
            at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
            at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
            at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
            at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
            at java.lang.reflect.Method.invoke(Method.java:618)
            at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1962)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2059)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1984)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
            at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
            at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1906)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1865)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
            at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
            at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626)
            at java.lang.Thread.run(Thread.java:804)

All the program output is here

Environment:

  • Redhat
  • Spark 1.0.1
  • Hadoop 2.4.1
  • MongoDB 2.4.10
  • mongo-hadoop-1.3
2
  • It seems like the first exception was actually: java.lang.ArrayIndexOutOfBoundsException: Array index out of range: -1. I'm not sure if anyone will be able to help without having some example data/code to reproduce the problem. Commented Aug 11, 2014 at 5:15
  • The first exception is once "Array index out of range: -1" and once "not ready", but I think that a "not ready" exception is the real one and the second if a result of it. Commented Aug 11, 2014 at 6:39

3 Answers 3

5

I think I've found the issue: mongodb-hadoop has a "static" modifier on its BSON encoder/decoder instances in core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java. When Spark runs in multithreaded mode all the threads try and deserialise using the same encoder/decoder instances, which predicatbly has bad results.

Patch on my github here (have submitted a pull request upstream)

I'm now able to run an 8 core multithreaded Spark->mongo collection count() from Python!

Sign up to request clarification or add additional context in comments.

1 Comment

Looks like your PR was accepted. github.com/mongodb/mongo-hadoop/releases Version 1.3.1 of the mongo-hadoop driver has the change. I was confused for a bit though as it looks like the maven repo group name for mongo-hadoop has changed. The new dep should be... <dependency> <groupId>org.mongodb.mongo-hadoop</groupId> <artifactId>mongo-hadoop-core</artifactId> <version>1.3.1</version> </dependency> Using this version has stopped this error for me.
4

I found the same problem. As a workaround I abandoned the newAPIHadoopRDD way, and implemented a parallel load mechanism based on defining intervals on the document id, and then loading each partition in parallel. The idea is implementing the following mongo shell code by using the MongoDB Java driver:

// Compute min and max id of the collection
db.coll.find({},{_id:1}).sort({_id: 1}).limit(1)
   .forEach(function(doc) {min_id = doc._id})
db.coll.find({},{_id:1}).sort({_id: -1}).limit(1)
   .forEach(function(doc) {max_id = doc._id})

// Compute id ranges
curr_id = min_id
ranges = []
page_size = 1000
// to avoid the use of Comparable in the Java translation
while(! curr_id.equals(max_id)) {
    prev_id = curr_id    
    db.coll.find({_id : {$gte : curr_id}}, {_id : 1})
           .sort({_id: 1})
           .limit(page_size + 1)
           .forEach(function(doc) {
                       curr_id = doc._id
                   })
    ranges.push([prev_id, curr_id])
}

Now we can use the ranges to perform fast queries for collection fragments. Note the last fragment needs to be treated differently, as just a min constraint, to avoid losing the last document of the collection.

db.coll.find({_id : {$gte : ranges[1][0], $lt : ranges[1][1]}})
db.coll.find({_id : {$gte : ranges[2][0]}})

I implement this as a Java method 'LinkedList computeIdRanges(DBCollection coll, int rangeSize)' for a simple Range POJO, and then I paralellize the collection and transform it with flatMapToPair to generate an RDD similar to that returned by newAPIHadoopRDD.

List<Range> ranges = computeIdRanges(coll, DEFAULT_RANGE_SIZE);
JavaRDD<Range> parallelRanges = sparkContext.parallelize(ranges, ranges.size());
JavaPairRDD<Object, BSONObject> mongoRDD = 
   parallelRanges.flatMapToPair(
     new PairFlatMapFunction<MongoDBLoader.Range, Object, BSONObject>() {
       ...
       BasicDBObject query = range.max.isPresent() ?
           new BasicDBObject("_id", new BasicDBObject("$gte", range.min)
                            .append("$lt", range.max.get()))
         : new BasicDBObject("_id", new BasicDBObject("$gte", range.min));
       ...

You can play with the size of the ranges and the number of slices used to parallelize, to control the granularity of parallelism.

I hope that helps,

Greetings!

Juan Rodríguez Hortalá

2 Comments

BTW there is an unresolved Jira that seems to be related to this problem at jira.mongodb.org/browse/HADOOP-154, that is why I decided to try a different way to solve this problem, at least for now
This is a very nice solution! I was originally using newAPIHadoopRDD() with the Mongo-Hadoop driver and was getting upwards of 2,000 leaked Mongo connnections. The same task with this implementation now has a max of 32 connections at one time, each of which are cleaned when complete.
0

I had the same combination of exceptions after importing a BSON file using mongorestore. Calling db.collecion.reIndex() solved the problem for me.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.