0

I am trying to read a collection in MongoDB in Spark with 234 million records. I want only 1 field.

case class Linkedin_Profile(experience : Array[Experience])
case class Experience(company : String)

val rdd = MongoSpark.load(sc, ReadConfig(Map("uri" -> mongo_uri_linkedin)))
val company_DS = rdd.toDS[Linkedin_Profile]()
val count_udf = udf((x: scala.collection.mutable.WrappedArray[String]) => {x.filter( _ != null).groupBy(identity).mapValues(_.size)})
val company_ColCount = company_DS.select(explode(count_udf($"experience.company")))
comp_rdd.saveAsTextFile("/dbfs/FileStore/chandan/intermediate_count_results.csv")

The job runs for 1 hour with half of jobs completed but after that it gives an error

com.mongodb.MongoCursorNotFoundException: 
Query failed with error code -5 and error message 
'Cursor 8962537864706894243 not found on server cluster0-shard-01-00-i7t2t.mongodb.net:37017' 
on server cluster0-shard-01-00-i7t2t.mongodb.net:37017

I tried changing the configuration with below, but to no avail.

System.setProperty("spark.mongodb.keep_alive_ms", "7200000")

Please suggest how to read this large collection.

2 Answers 2

1

The config property park.mongodb.keep_alive_ms is meant to control the life of the client. See docs here.

The issue you're experiencing seems to be related to server-side configuration. According to what's documented on this issue:

By specifing the cursorTimeoutMillis option, administrators can configure mongod or mongos to automatically remove idle client cursors after a specified interval. The timeout applies to all cursors maintained on a mongod or mongos, may be specified when starting the mongod or mongos and may be modified at any time using the setParameter command.

So, try starting your mongod daemon with specified cursorTimeoutMillis, such as:

mongod --setParameter cursorTimeoutMillis=10800000

This command tries to instruct the server to keep cursors valid for 3 hours.

Although this may in theory get rid of the annoyance, it is still a good idea to get the reads to complete faster. You might want to look into limiting the dataset sitting in collections to what you really want to load into Spark. There may be many options to tune the read speed worth looking into.

Sign up to request clarification or add additional context in comments.

Comments

1

Yes,By specifing the cursorTimeoutMillis option, you can avoid this. But,if you are not the administrators, you can cache the MongoRdd by Action first, then do something in spark env.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.