1

I am trying to fetch documents from mongodb using Java mongo client.

cities.parallelStream().forEach(name -> {
        FindIterable result = collection.find(new BsonDocument().append("_source", new BsonString(name))
                .append("addressid", new BsonNull()), BsonDocument.class);

 ..... stream above docs using kafka producer .... 

In the above snippet, cities is a ArrayList with approx size of 500 city names. As per business logic, I need to process each city and get all the documents that has addressId value as null. Each city results around 10k-40k documents.

Although this code works but it takes very long time. parallelStream() helped to little extent but could not see much difference. I see only 2 Thread as per CloudWatch logs with parallelStream.

The same query which is executed from Robo3T hardly take 3 seconds for each city. Below is the query

db.getCollection('adresses').find({"_source":"Chicago", "addressid":null}

I am new to these concepts, Please help in understanding the best approaches to fetch data in parallel.

5
  • 500 cities is a different ask than a single city. 500 * 25000 (avg?) docs to be examine is 12.5million docs. When you say "long time", how long? Commented Feb 15, 2022 at 14:17
  • @BuzzMoschetti its taking around 5-6 hours, total docs = 8344826 Commented Feb 15, 2022 at 15:15
  • 8344826/5.5 hours ~421 docs/sec; that's way too slow. I just inserted 12,500,000 docs into a collection (500 cities X 25000 addrs) and did a find() to get all of them and that took 46 seconds on a MacBookPro. $group by city and getting the count (25000, obviously) took 24.3 seconds. What is the overall size of your docs and what is infra setup? Commented Feb 15, 2022 at 16:27
  • @BuzzMoschetti 46seconds through MongoDb java client ? I see slowness only through java code. Through shell I dont see any issue. Commented Feb 15, 2022 at 19:07
  • I am modifying my multithreaded Java testbed. Any indexes on city or address, what percent of address is null, and how big is each doc? You are doing find() with no projection so the whole thing is coming back over the wire. Commented Feb 15, 2022 at 22:30

1 Answer 1

2

Consider 500 cities X 25000 addresses for a total of 12,500,000 docs with this simple shape:

{ city: "Cn", addr: "Am"}

where 0 < n < 499 and 0 < m < 10 (yes, addresses repeat). No indexes.

A "no effort agg" running on the mongo CLI the same machine as the server (MacBookPro 2.7GHz quadcore 16G RAM c2013) runs fast:

var nn = 0;
db.foo.aggregate().forEach(function(d) {
    nn++;
});
print("found " + nn);

found 12500000
50170 millis to fetch  // timing logic not included for clarity. 

The moment we add even one $match, however, things slow down significantly on a "time-per-city" basis:

db.foo.aggregate([
    {$match: {"city":"C0"}}
]).forEach(function(d) {
    nn++;
});
print("found " + nn);

found 25000
9489 millis to fetch

5.3 times faster but only accounting for 1/500th of the cities! So if we were to put this in a loop and ask for cities C0 through C499, it stands to reason it would take roughly 500 * 9.4 seconds, or 4700 seconds or 1 hour, 18 mins. It turns out that adding additional fields to the $match or (slightly) more complex pipeline logic in general does not slow down performance nearly as much as the initial $match on a single field. A Java program on a single thread yields roughly the same performance:

                    Document pquery = new Document();
                    pquery.put("city", "C"+i);

                    MongoCursor<Document> cc = coll.find(pquery).iterator();
                    int j = 0;
                    while(cc.hasNext()) {
                        Document src = cc.next();
                        j++;
                    }

// takes about 9.6 seconds for 1 city.
// 16 cities takes 164 seconds so roughly linear scale.

A multithreaded version where each thread operates on a different range of cities scales nicely with number of threads. 16 cities with 4 threads takes 45 seconds; 16 cities with 8 threads also takes nearly 45 seconds but is not a surprise given the co-lo client and server and thread capability on the MacBookPro. 100 cities with 4 threads takes 275 seconds, so 275 * 5 = 1375 seconds for 500 cities or about 23 minutes. The MacBookPro is surprisingly capable and it would require a medium-sized (CPU-wise) AWS instance to bring this under 10 minutes.

In general, therefore, the rule of thumb becomes you can go no faster than a single $match per thread if you need to drag the data over to the client. Larger payloads will obviously reduce the performance further.

Sign up to request clarification or add additional context in comments.

1 Comment

thanks for detailed analysis, I am currently running in AWS lambda and it has only 2 threads, main and worker thread. I will try running local setup.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.