MongoDB Java API slow reading peformance
Asked Answered
P

5

22

We are reading from a local MongoDB all documents from a collections and performance is not very brillant.

We need to dump all the data, don't be concerned why, just trust it's really needed and there is no workaround possible.

We've 4mio documents that look like :

{
    "_id":"4d094f58c96767d7a0099d49",
    "exchange":"NASDAQ",
    "stock_symbol":"AACC",
    "date":"2008-03-07",
    "open":8.4,
    "high":8.75,
    "low":8.08,
    "close":8.55,
    "volume":275800,
    "adj close":8.55
}

And we're using this for now trivial code to read:

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("localhost");
MongoCollection<Document> collection = database.getCollection("test");

MutableInt count = new MutableInt();
long start = System.currentTimeMillis();
collection.find().forEach((Block<Document>) document -> count.increment() /* actually something more complicated */ );
long start = System.currentTimeMillis();

We're reading the whole collection at 16 seconds ( 250k row/sec), that is really not impressive at all with small documents. Bear in mind we want to load 800mio rows. No aggregate, map reduce or similar are possible.

Is this as fast as MongoDB gets or are there other ways to load documents faster (other techniques, moving Linux, more RAM, settings...)?

Philip answered 30/8, 2018 at 16:9 Comment(7)
I think this article about MongoDB bulk operations can be helpful to understand one how could improve the performance. For getting how many objects can be found, you should use the db.collection.stats(). This is how to call it from java. I would choose the less network- and data intensive way to do any operation on MongoDB.Berke
Do you really need to churn through 800mio documents or can you perhaps just look at a subset? That'd be the first thing I would suggest to change if possible. Also, projection would help to limit the amount of data that needs to get deserialized. Without understanding what exactly you want to do in your loop this is a very broad question...Qoph
What operation do you want to do with your documents? You should describe your requirement so someone can help you. For example, if just want count like your example then use built in count function is better. I think you want to do more than thatSotted
I've seen similar situation in other question. For some reason even if you want ALL documents from a collection, Mongo uses index and random disk operations. Do you use compression? Encryption?Cheeseburger
that is really not impressive at all.. Well, that depends also on your hardware. Please, provide more info about that. Which hardware do you use? OS? 800 million rows in 16 secs would be brilliant for a Raspberry Pi, I think!Anasarca
@all, actually the whole dataset it's 4 billions, 800 millions is already a subset. 16 seconds is for 4 mio rows (8sec on a better linux bases server). We want to load all of them ;-), trust mePhilip
note, that 250k rows/s with utf-8 text of length 227 (like in example) is 432mB/s. This is quite performant if the objects are not compressed and taken from the disk. Are your server and client on the same machine? What do you see in atop output? Which resource is depleted?Cheeseburger
D
12

You didn't specify your use case so it's very hard to tell you how to tune your query. (I.e: Who would want to load 800mil row at a time just for count?).

Given your schema, I think your data is almost read-only and your task is related to data aggregation.

Your current work is just read the data, (most likely your driver will read in batch), then stop, then perform some calculation (hell yeah, an int wrapper is used to increase the processing time more), then repeat. That's not a good approach. The DB does not magically fast if you do not access it the correct way.

If the computation is not too complex, I suggest you to use the aggregation framework instead of loading all into your RAM.

Something just you should consider to improve your aggregation:

  1. Divide your dataset to smaller set. (Eg: Partition by date, partition by exchange...). Add index to support that partition and operate aggregation on partition then combine the result (Typical divide-n-conquer approach)
  2. Project only needed fields
  3. Filter out unnecessary document (if possible)
  4. Allow diskusage if you can't perform your aggregation on memory (if you hit the 100MB limit per pipiline).
  5. Use builtin pipeline to speedup your computation (eg: $count for your example)

If your computation is too complex that you cannot express with aggregation framework, then use mapReduce. It operates on the mongod process and data does not need to transfer over network to your memory.

Updated

So look like you want to do an OLAP processing, and you stuck at ETL step.

You do not need to and have to avoid load the whole OLTP data to OLAP every time. Only need to load new changes to your data warehouse. Then first data loading/dumping takes more time is normal and acceptable.

For first time loading, you should consider following points:

  1. Divide-N-Conquer, again, breaks your data to smaller dataset (with predicate like date / exchange / stock label...)
  2. Do parallel computation, then combine your result (You have to partition your dataset properly)
  3. Do computation on batch instead of processing in forEach: Load the data partition then compute instead of compute one by one.
Diverticulitis answered 4/9, 2018 at 1:41 Comment(6)
thanks, we really need to get the 800mio rows and send them to another real time calculation engine. So, aggregations, map reduce and similar are not an option and we need the info.Philip
So it looks like you're doing an OLAP operations. Why do you have to send it 800mill rows every time? I think you should send the new documents onlySotted
yes :-), we need to load them at least once when startingPhilip
I wonder why you have do computation on your app server (like your count example). Use the divide-n-conquer pattern i described above. Divide your dataset into smaller ones, then do the parallel loading (Since your those smaller one does not overlap). Do not compute per document when loading (as foreach in your question), compute in batch instead (IE, after read a partitions, do blah blah, then continue)Sotted
Count is just an amazing oversimplification. We need to load all data and aftewards do some advanced maths in real time (no way to do this in mongoDB), but it's another 'story'. For now we need to 'dump' . Thanks for your hintsPhilip
I've summarized our discussion in my updated answer. Good luck~Sotted
N
4

collection.find().forEach((Block<Document>) document -> count.increment());

This line may be adding up a lot of time since you are iterating over 250k records in memory.

To quickly check if thats the case, you can try this -

long start1 = System.currentTimeMillis();
List<Document> documents = collection.find();
System.out.println(System.currentTimeMillis() - start1);

long start2 = System.currentTimeMillis();
documents.forEach((Block<Document>) document -> count.increment());
System.out.println(System.currentTimeMillis() - start2);

This will help you understand how much time it actually takes to get the documents from database and how much time the iteration is taking.

Nerval answered 30/8, 2018 at 16:16 Comment(3)
Hi, eventually I'm interested in the sum of both, it's not going to change as my code only calls once find()Philip
Why do you want to use count? Can't you get that by documents.size()?Nerval
it's just a test without converting the answer, but you can imagine any code there that needs to be done on the server sidePhilip
B
4

First, as @xtreme-biker commented, the performance greatly depend on your hardware. Specifically, my first advise would be checking whether you are running on a virtual machine or a native host. In my case with a CentOS VM on an i7 with an SDD drive I can read 123,000 docs per second but exactly the same code running on the Windows Host on the same drive reads up to 387,000 docs per second.

Next, let´s assume that you really need to read the full collection. This is to say that you must perform a full-scan. And let´s assume that you cannot change the configuration of your MongoDB server but only optimize your code.

Then everything comes down to what

collection.find().forEach((Block<Document>) document -> count.increment());

actually does.

A quick unrolling of MongoCollection.find() shows that it actually does this:

ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
Decoder<Document> codec = new DocumentCodec();
FindOperation<Document> fop = new FindOperation<Document>(ns,codec);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
QueryBatchCursor<Document> cursor = (QueryBatchCursor<Document>) fop.execute(readBinding);
AtomicInteger count = new AtomicInteger(0);
try (MongoBatchCursorAdapter<Document> cursorAdapter = new MongoBatchCursorAdapter<Document>(cursor)) {
    while (cursorAdapter.hasNext()) {
        Document doc = cursorAdapter.next();
        count.incrementAndGet();
    }
}

Here the FindOperation.execute() is rather fast (under 10ms) and most of the time is spent inside the while loop, and specifically inside the private method QueryBatchCursor.getMore()

getMore() calls DefaultServerConnection.command() and it´s time is consumed basically in two operations: 1) fetching string data from the server and 2) converting string data into BsonDocument.

It turns out that Mongo is quite smart with regard of howw many network round trips it will do to fetch a large result set. It will first fetch 100 results with a firstBatch command and then fetch larger batches with nextBatch being the batch size depending on the collection size up to a limit.

So, under the wood something like this will happen to fetch the first batch.

ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
FieldNameValidator noOpValidator = new NoOpFieldNameValidator();
DocumentCodec payloadDecoder = new DocumentCodec();
Constructor<CodecProvider> providerConstructor = (Constructor<CodecProvider>) Class.forName("com.mongodb.operation.CommandResultCodecProvider").getDeclaredConstructor(Decoder.class, List.class);
providerConstructor.setAccessible(true);
CodecProvider firstBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("firstBatch"));
CodecProvider nextBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("nextBatch"));
Codec<BsonDocument> firstBatchCodec = fromProviders(Collections.singletonList(firstBatchProvider)).get(BsonDocument.class);
Codec<BsonDocument> nextBatchCodec = fromProviders(Collections.singletonList(nextBatchProvider)).get(BsonDocument.class);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
BsonDocument find = new BsonDocument("find", new BsonString(collectionName));
Connection conn = readBinding.getReadConnectionSource().getConnection();

BsonDocument results = conn.command(databaseName,find,noOpValidator,readPref,firstBatchCodec,readBinding.getReadConnectionSource().getSessionContext(), true, null, null);
BsonDocument cursor = results.getDocument("cursor");
long cursorId = cursor.getInt64("id").longValue();

BsonArray firstBatch = cursor.getArray("firstBatch");

Then the cursorId is used to fetch each next batch.

In my opinion, the "problem" with the implementation of the driver is that the String to JSON decoder is injected but the JsonReader —in which the decode() method relies— is not. This is this way even down to com.mongodb.internal.connection.InternalStreamConnection where you are already near the socket communication.

Therefore, I think that there is hardly anything that you could do to improve on MongoCollection.find() unless you go as deep as InternalStreamConnection.sendAndReceiveAsync()

You can´t reduce the number of round trips and you can´t change the way the response is converted into BsonDocument. Not without bypassing the driver and writing your own client which I doubt is a good idea.

P.D. If you want to try some of the code above, you´ll need the getCluster() method which requires a dirty hack into mongo-java-driver.

private Cluster getCluster() {
    Field cluster, delegate;
    Cluster mongoCluster = null;
    try {
        delegate = mongoClient.getClass().getDeclaredField("delegate");
        delegate.setAccessible(true);
        Object clientDelegate = delegate.get(mongoClient);
        cluster = clientDelegate.getClass().getDeclaredField("cluster");
        cluster.setAccessible(true);
        mongoCluster = (Cluster) cluster.get(clientDelegate);
    } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
        System.err.println(e.getClass().getName()+" "+e.getMessage());
    }
    return mongoCluster;
}
Bottomry answered 10/9, 2018 at 6:16 Comment(3)
We've been playing around and we've similar conclusion, decoding is taking a big part of the time. ThanksPhilip
@Philip I think this still supports the idea that you should rewrite this particular operation in Python or C++, at least as a test.Lodestar
@OldPro we worked on this idea and decoupling listening the port and decoding int the same thread, and more. Once done and tuned I'll write our findings. A pitty bson doesn't put fieldNames in maps so the same ones are not send over, and over again.. some hint on what we're improve ;-)Philip
M
2

What i think should i did in your case was a simple solution and simultaneously an efficient way is to maximize the overall throughput by using parallelCollectionScan

Allows applications to use multiple parallel cursors when reading all the documents from a collection, thereby increasing throughput. The parallelCollectionScan command returns a document that contains an array of cursor information.

Each cursor provides access to the return of a partial set of documents from a collection. Iterating each cursor returns every document in the collection. Cursors do not contain the results of the database command. The result of the database command identifies the cursors, but does not contain or constitute the cursors.

A simple example with parallelCollectionScan should be somethink like this one

 MongoClient mongoClient = MongoClients.create();
 MongoDatabase database = mongoClient.getDatabase("localhost");
 Document commandResult = database.runCommand(new Document("parallelCollectionScan", "collectionName").append("numCursors", 3));
Merrymaker answered 4/9, 2018 at 8:17 Comment(3)
This is good if the bottleneck is backend CPU. But often it's Mongo's read speed.Cheeseburger
Bad luck, we just get one cursor in return -> 'This command will not return more than one cursor for the WiredTiger storage engine.' docs.mongodb.com/manual/reference/command/…Philip
And even, more, it is completely removed in the version 4.2 and later.Isochronal
L
0

By my count, you are processing about 50 MiB/s (250k row/sec * 0.2 KiB/row). That's getting into both disk drive and network bottleneck territory. What kind of storage is MongoDB using? What kind of bandwidth do you have between the client and MongoDB server? Have you tried co-locating the server and client on a high-speed (>= 10 Gib/s) network with minimal (< 1.0 ms) latency? Keep in mind that if you are using a cloud computing provider like AWS or GCP, they are going to have virtualization bottlenecks that are on top of physical ones.

You asked about settings that might help. You can try changing the compression settings on the connection and on the collection (options are "none", snappy, and zlib). Even if neither improve on snappy, seeing the difference that setting makes (or doesn't make) might help figure out what part of the system is under the most stress.

Java does not have good performance for number crunching compared to C++ or Python, so you might consider rewriting this particular operation in one of those languages and then integrating that with your Java code. I suggest you do a test run of just looping over the data in Python and comparing that to the same in Java.

Lodestar answered 9/9, 2018 at 22:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.