Best practice to query large number of ndb entities from datastore
Asked Answered
H

4

62

I have run into an interesting limit with the App Engine datastore. I am creating a handler to help us analyze some usage data on one of our production servers. To perform the analysis I need to query and summarize 10,000+ entities pulled from the datastore. The calculation isn't hard, it is just a histogram of items that pass a specific filter of the usage samples. The problem I hit is that I can't get the data back from the datastore fast enough to do any processing before hitting the query deadline.

I have tried everything I can think of to chunk the query into parallel RPC calls to improve performance, but according to appstats I can't seem to get the queries to actually execute in parallel. No matter what method I try (see below) it always seems that the RPC's fall back to a waterfall of sequential next queries.

Note: the query and analysis code does work, it just runs to slowly because I can't get data quickly enough from the datastore.

Background

I don't have a live version I can share, but here is the basic model for the part of the system I am talking about:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

You can think of the samples as times when a user makes use of a capability of a given name. (ex: 'systemA.feature_x'). The tags are based upon customer details, system information, and the feature. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). So the tags form a denormalized set of tokens that could be used to find samples of interest.

The analysis I am trying to do consists of taking a date range and asking how many times was a feature of set of features (perhaps all features) used per day (or per hour) per customer account (company, not per user).

So the input to the handler be something like:

  • Start Date
  • End Date
  • Tag(s)

Output would be:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

Common Code for Queries

Here is some code in common for all queries. The general structure of the handler is a simple get handler using webapp2 that sets up the query parameters, runs the query, processes the results, creates data to return.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Methods Tried

I have tried a variety of methods to try to pull data from the datastore as quickly as possible and in parallel. The methods I have tried so far include:

A. Single Iteration

This is more of a simple base case to compare against the other methods. I just build the query and iterate over all the items letting ndb do what it does to pull them one after the other.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. Large Fetch

The idea here was to see if I could do a single very large fetch.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. Async fetches across time range

The idea here is to recognize that the samples are fairly well spaced across time so I can create a set of independent queries that split the overall time region into chunks and try to run each of these in parallel using async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. Async mapping

I tried this method because the documentation made it sound like ndb may exploit some parallelism automatically when using the Query.map_async method.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Outcome

I tested out one example query to collect overall response time and appstats traces. The results are:

A. Single Iteration

real: 15.645s

This one goes sequentially through fetching batches one after the other and then retrieves every session from memcache.

Method A appstats

B. Large Fetch

real: 12.12s

Effectively the same as option A but a bit faster for some reason.

Method B appstats

C. Async fetches across time range

real: 15.251s

Appears to provide more parallelism at the start but seems to get slowed down by a sequence of calls to next during iteration of the results. Also doesn't seem to be able to overlap the session memcache lookups with the pending queries.

Method C appstats

D. Async mapping

real: 13.752s

This one is the hardest for me to understand. It looks like it has q good deal of overlapping, but everything seems to stretch out in a waterfall instead of in parallel.

Method D appstats

Recommendations

Based upon all this, what am I missing? Am I just hitting a limit on App Engine or is there a better way to pull down large number of entities in parallel?

I am at a loss as to what to try next. I thought about rewriting the client to make multiple requests to app engine in parallel but this seems pretty brute force. I would really expect that app engine should be able to handle this use case so I am guessing there is something I am missing.

Update

In the end I found that option C was the best for my case. I was able to optimize it to complete in 6.1 seconds. Still not perfect, but much better.

After getting advice from several people, I found that the following items were key to understand and keep in mind:

  • Multiple queries can run in parallel
  • Only 10 RPC's can be in flight at once
  • Try to denormalize to the point that there are no secondary queries
  • This type of task is better left to map reduce and task queues, not real-time queries

So what I did to make it faster:

  • I partitioned the query space from the beginning based upon time. (note: the more equal the partitions are in terms of entities returned, the better)
  • I denormalized the data further to remove the need for the secondary session query
  • I made use of ndb async operations and wait_any() to overlap the queries with the processing

I am still not getting the performance I would expect or like, but it is workable for now. I just wish their was a better way to pull large numbers of sequential entities into memory quickly in handlers.

Harmonic answered 16/7, 2012 at 17:24 Comment(5)
I have made some progress and gotten option C to work in a little under 9 seconds. I think I can optimize it further. What I found is that if I break the initial query up into 40 pieces and if I send off a query for all the session entities at the same time, then most of the RPC time can be overlapped. My current best effort is doing an RPC total time of 245 seconds in a real time of 9 seconds. I will try some more options and post back about what works best. In the meantime let me know if anyone has more ideas.Harmonic
Hi, I realise this question is old, but regarding D. Async Mapping, is your addCountForPeriod method writing to the datastore? If yes, then I think that may be causing the cascading, because of the mix of async datastore operations and synchronous datastore operations.Surprint
Thanks for an excellent post. I came across this after posting with a similar problem here: #25796642. Like you, I am frustrated that I cannot improve the performance of async queries. I would at the very least like to understand why they are so slow.Aileneaileron
I´m having the same performance problems, trying to find a more general solution here #26759950Crud
This question should be in the general Q&A section of StackOverflow as an example of a proper question stackoverflow.com/help/how-to-askTooley
P
8

Large processing like this should not be done in a user request, which has a 60s time limit. Instead, it should be done in a context that supports long-running requests. The task queue supports requests up to 10 minutes, and (I believe) normal memory restraints (F1 instances, the default, have 128MB of memory). For even higher limits (no request timeout, 1GB+ of memory), use backends.

Here's something to try: set up a URL that, when accessed, fires off a task queue task. It returns a web page that polls every ~5s to another URL that responds with true/false if the task queue task has been completed yet. The task queue processes the data, which can take some 10s of seconds, and saves the result to the datastore either as the computed data or a rendered web page. Once the initial page detects that it has completed, the user is redirected to the page, which fetches the now computed results from the datastore.

Porker answered 17/7, 2012 at 7:50 Comment(3)
I had been thinking of using a backend as well. I am still hoping to get the query to work inside a normal deadline, but if that doesn't work I will fallback to using a backend to run it as you are describing. Since one of my bottlenecks is loading all the session objects into local cache, there may also be a way to get a performance boost using backends if I can keep all the sessions in memory at all times.Harmonic
That answers nothing. The question was specific to how the datastore is supposed to work, and it doesn't. The same problem applied to task queues and backends when one has to fetch 100,000 or 1M entities. Dog slow, expensive datastoreRollie
Take a look at the MapReduce andwer by Martin Berends below. Backends have been deprecated. There is a nice guide describing the migration process: cloud.google.com/appengine/docs/python/modules/convertingMorganite
T
2

The new experimental Data Processing feature (an AppEngine API for MapReduce) looks very suitable for solving this problem. It does automatic sharding to execute multiple parallel worker processes.

Trimurti answered 22/2, 2014 at 5:12 Comment(0)
H
1

I have a similar problem and after working with Google support for few weeks I can confirm there is no magic solution at least as of December 2017.

tl;dr: One can expect throughput from 220 entities/second for standard SDK running on B1 instance up to 900 entities/second for a patched SDK running on a B8 instance.

The limitation is CPU related and changing the instanced type directly impacts performance. This is confirmed by similar results obtained on B4 and B4_1G instances

The best throughput I got for an Expando entity with about 30 fields is:

Standard GAE SDK

  • B1 instance: ~220 entities/second
  • B2 instance: ~250 entities/second
  • B4 instance: ~560 entities/second
  • B4_1G instance: ~560 entities/second
  • B8 instance: ~650 entities/second

Patched GAE SDK

  • B1 instance: ~420 entities/second
  • B8 instance: ~900 entities/second

For standard GAE SDK I tried various approaches including multi-threading but the best proved to be fetch_async with wait_any. Current NDB library already does a great job of using async and futures under the hood so any attempt to push that using threads only make it worse.

I found two interesting approaches to optimize this:

Matt Faus explains the problem very well:

GAE SDK provides an API for reading and writing objects derived from your classes to the datastore. This saves you the boring work of validating raw data returned from the datastore and repackaging it into an easy-to-use object. In particular, GAE uses protocol buffers to transmit raw data from the store to the frontend machine that needs it. The SDK is then responsible for decoding this format and returning a clean object to your code. This utility is great, but sometimes it does a bit more work than you would like. [...] Using our profiling tool, I discovered that fully 50% of the time spent fetching these entities was during the protobuf-to-python-object decoding phase. This means that the CPU on the frontend server was a bottleneck in these datastore reads!

GAE-data-access-web-request

Both approaches try to reduce the time spent doing protobuf to Python decoding by reducing the number of fields decoded.

I tried both approaches but I only succeed with Matt's. SDK internals changed since Evan published his solution. I had to change a bit the code published by Matt here, but is was pretty easy - if there is interest I can publish the final code.

For a regular Expando entity with about 30 fields I used Matt's solution to decode only couple fields and obtained a significant improvement.

In conclusion one need to plan accordingly and don't expect to be able to process much more than few hundreds entities in a "real-time" GAE request.

Hypothyroidism answered 7/12, 2017 at 12:35 Comment(0)
C
0

Large data operations on App Engine best implemented using some sort of mapreduce operation.

Here's a video describing the process, but including BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

It doesn't sound like you need BigQuery, but you probably want to use both the Map and Reduce portions of the pipeline.

The main difference between what you're doing and the mapreduce situation is that you're launching one instance and iterating through the queries, where on mapreduce, you would have a separate instance running in parallel for each query. You will need a reduce operation to "sum up" all the data, and write the result somewhere though.

The other problem you have is that you should use cursors to iterate. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

If the iterator is using a query offset, it'll be inefficient, since an offset issues the same query, skips past a number of results, and gives you the next set, while the cursor jumps straight to the next set.

Cheops answered 16/7, 2012 at 17:43 Comment(6)
could you show a simple example for how to use your approach to get entities in parallel? i thought a tasklet would take care of this but it does not seem like it.Ezaria
I am not using cursors because none of the queries restart in the middle later. They all grab all the entities immediately with no offset. As far as map reduce, I thought about that but this isn't an offline analysis, it is meant to be a live query that internal users will dynamically change as they explore the data. My understanding of map reduce is that it doesn't fit this real-time interactive usecase.Harmonic
I might have made a bad assumption, I was thinking the datastore_v3.Next calls in C was due to the use of some offset based iterator. Mapreduce isn't ideal for the interactive usecase in my experience because a) you can't predict how long the operation will take, and b) you typically have to write your results to the datastore rather than receive an easy result that you can put on a template. It gets a bit ugly on the client side, I think you need a way to poll to see if the result is ready. However, because of the parallel nature, it does tend to be faster than serializing queries.Cheops
Agreed that map reduce can parallelize. I was just hoping that ndb and async operations could also parallelize enough for my use case. I don't need to parallelize the computation, simply the data retrieval. I had also considered using urlfetch to write a multi-level handler that would spawn off requests to subhandlers to get the data and then collect and process it in the parent handler. It just seems like there has to be an easier way.Harmonic
I don't think your going to be able to reliably do this as a live query, especially if your dataset (returned results gets a lot larger).Maitland
Agree MapReduce is the way to go, in my limited experience I've seen it performing much better than my own queries, not sure why. Shame Google is not maintaining their own MR package. I wonder whether there's any work at all going into improving the datastore and its abysmal performances and costs, all work seems to go for GCE and the cloud storeRollie

© 2022 - 2024 — McMap. All rights reserved.