I get "java.lang.IllegalStateException: not ready" in org.bson.BasicBSONDecoder._decode while trying to use MongoDB as input RDD:
Configuration conf = new Configuration();
conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input");
JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class);
System.out.println(rdd.count());
The exception I get is: 14/08/06 09:49:57 INFO rdd.NewHadoopRDD: Input split:
MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57
WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException
java.lang.IllegalStateException: not ready
at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139)
at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123)
at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:618)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1962)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2059)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1984)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1906)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1865)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626)
at java.lang.Thread.run(Thread.java:804)
All the program output is here
Environment:
- Redhat
- Spark 1.0.1
- Hadoop 2.4.1
- MongoDB 2.4.10
- mongo-hadoop-1.3
I found the same problem. As a workaround I abandoned the newAPIHadoopRDD way, and implemented a parallel load mechanism based on defining intervals on the document id, and then loading each partition in parallel. The idea is implementing the following mongo shell code by using the MongoDB Java driver:
Now we can use the ranges to perform fast queries for collection fragments. Note the last fragment needs to be treated differently, as just a min constraint, to avoid losing the last document of the collection.
I implement this as a Java method 'LinkedList computeIdRanges(DBCollection coll, int rangeSize)' for a simple Range POJO, and then I paralellize the collection and transform it with flatMapToPair to generate an RDD similar to that returned by newAPIHadoopRDD.
You can play with the size of the ranges and the number of slices used to parallelize, to control the granularity of parallelism.
I hope that helps,
Greetings!
Juan Rodríguez Hortalá
I think I've found the issue: mongodb-hadoop has a "static" modifier on its BSON encoder/decoder instances in core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java. When Spark runs in multithreaded mode all the threads try and deserialise using the same encoder/decoder instances, which predicatbly has bad results.
Patch on my github here (have submitted a pull request upstream)
I'm now able to run an 8 core multithreaded Spark->mongo collection count() from Python!
I had the same combination of exceptions after importing a BSON file using mongorestore. Calling db.collecion.reIndex() solved the problem for me.