I would like to update a specific collection in MongoDb via Spark in Java. I am using the MongoDB Connector for Hadoop to retrieve and save information from Apache Spark to MongoDb in Java.
After following Sampo Niskanen's excellent post regarding retrieving and saving collections to MongoDb via Spark, I got stuck with updating collections.
MongoOutputFormat.java includes a constructor taking String[] updateKeys, which I am guessing is referring to a possible list of keys to compare on existing collections and perform an update. However, using Spark's saveAsNewApiHadoopFile() method with parameter MongoOutputFormat.class, I am wondering how to use that update constructor.
save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);
Prior to this, MongoUpdateWritable.java was being used to perform collection updates. From examples I've seen on Hadoop, this is normally set on mongo.job.output.value, maybe like this in Spark:
save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);
However, I'm still wondering how to specify the update keys in MongoUpdateWritable.java.
Admittedly, as a hacky way, I've set the "_id" of the object as my document's KeyValue so that when a save is performed, the collection will overwrite the documents having the same KeyValue as _id.
JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
BSONObject o = (BSONObject) s._1;
//for all keys, set _id to key:value_
String id = "";
for (String key : o.keySet()){
id += key + ":" + (String) o.get(key) + "_";
}
o.put("_id", id);
o.put("result", s._2);
return new Tuple2<>(null, o);
});
save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);
I would like to perform the mongodb collection update via Spark using MongoOutputFormat or MongoUpdateWritable or Configuration, ideally using the saveAsNewAPIHadoopFile() method. Is it possible? If not, is there any other way that does not involve specifically setting the _id to the key values I want to update on?