How to use mapreduce to bulk update datastore entities that satisfy a query?
Asked Answered
H

2

6

I want to use the mapreduce library to update all entities that satisfy a query. There are a couple of complications:

  1. The query that finds the entities to update checks if the value of a particular property "property1" is contained in a long list of values (~10000 entries) from a csv file
  2. For each entity satisfying the query, another property "property2" needs to be updated to be equal to the value in the second column and same row of the csv file

I know how to upload the csv file to Blobstore and read each row using a Blobstore input reader. I am also aware of the Datastore input reader that gets entities using a query.

My question is how can I create a Mapper class that reads input data from the Blobstore, fetches the datastore entities and updates them as efficiently as possible?

Heaps answered 15/1, 2015 at 7:22 Comment(6)
I doubt map-reduce gives you the performance you want. Usually map-reduce operates on data that can be naturally taken into parts; but in your situation since your are working with a CSV file, the majority of time and memory used would be on parsing that CSV file into multiple lines (multiple strings); alternatively, if you treat that CSV string as a single stream then you are blocked by that serial operation of readLine(). Since your processing isn't time-consuming (correct me if not) compared to the cost of task division, I don't see map reduce benefit you in any aspect.Grazier
I was hoping to benefit at least from the batching of gets and puts to the datastore since the alternative will be to get each complete entity, change a property and put back into the datastore.Heaps
While map-reduce, MR, is great for work on many entities the fact it would also need to process the csv might slow things down. One option is to load the CSV into datastore as well, do a MR on all the entities then in the mapper do a .get on the CSV kind to see if the passed entity is there. If it is update otherwise skip. Not the best way but the only I can think of. FYI you can use put_multi to do batches cloud.google.com/appengine/docs/python/ndb/functionsMasseuse
this should help you out. ikaisays.com/2010/08/11/…Duumvir
@Sridhar, that tutorial is just creating new entities based on the information in the CSV. In my case, I need to find the existing datastore entity corresponding to each row of the CSV and update it. So, am I supposed to query the datastore ~10,000 times to get the entity for each row or is there a better way?Heaps
@Heaps yes in that there is a place where it reads from the blob as you cann the input type is blob during the reduce part. so i think that should help you out since you have a blob copy of datastore.Duumvir
O
3

Given that the list of possible values for property1 is long, using a query to filter doesn't seem like a good option (because you would need to use a IN filter, which actually runs one query per value)

An alternative using MR would be to load your CSV into memory using a Map (from property1 to property2), and then fire a MR job that iterates all entities, and if their property1 is part of the Keys on the Map, modify it using the mapped value.

As @Ryan B says, you don't need to use MR for this if you just want to take advantage of batch puts, as you can use an Iterable to put using the DatastoreService.

Opportunity answered 19/1, 2015 at 17:11 Comment(2)
Thanks! I agree that using a query with IN filter for such a long list would be very inefficient. I have a a few quick questions: 1. Would you know of an optimal way to load the CSV into a Map? 2. Wouldn't it be better to iterate over the keys of the map and get the corresponding entity from the datastore rather than iterating over all the datastore entities to see if they are in the map? 3. If I just use batch puts, how will I know which operations completed successfully?Heaps
1. Depends on the job: if it's a one-time-only thing, you might even want to load programmatically (i.e. use a script to load the CSV and output java code populating the map). If not, use Blobstore, get the file and iterate on a background task (and maybe store it in Memcache) 2. Well, it depends on how sparse your map is in relation with the entities, and if you have and index over property1 or not... but yes, the other option is that. 3. Read put's documentation on transaction management: you can decide to commit/rollback or let the default config work.Opportunity
C
2

You can use a DatastoreInputReader, and in the map function, find out if the property1 is actually in the csv: Reading from a csv each time would be very slow, what you can do is use memcache to provide that info after it is read just once from it's own Datastore model. To populate the datastore model, I would recommend using property1 as the custom Id of each row, that way, querying it is straight forward. You would only update the Datastore for those values that actually change and use mutation pool to make it performant (op.db.Put()). I leave you pseudo code (sorry... I only have it in python) of how the different pieces would look like, I further recommend you reading this article on Mapreduce on Google App Engine: http://sookocheff.com/posts/2014-04-15-app-engine-mapreduce-api-part-1-the-basics/

#to get the to_dict method
from google.appengine.ext import ndb
from mapreduce import operation as op 
from mapreduce.lib import pipeline
from mapreduce import mapreduce_pipeline

class TouchPipeline(pipeline.Pipeline):
    """
    Pipeline to update the field of entities that have certain condition
    """

    def run(self, *args, **kwargs):
        """ run """
        mapper_params = {
            "entity_kind": "yourDatastoreKind",
        }
        yield mapreduce_pipeline.MapperPipeline(
            "Update entities that have certain condition",
            handler_spec="datastore_map",
            input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
            params=mapper_params,
            shards=64)


class csvrow(ndb.Model):
  #you dont store property 1 because you are going to use its value as key
  substitutefield=ndb.StringProperty()

def create_csv_datastore():
  # instead of running this, make a 10,000 row function with each csv value, 
  # or read it from the blobstore, iterate and update the values accordingly
  for i in range(10000):
    #here we are using our own key as id of this row and just storing the other column that
    #eventually will be subtitute if it matches
    csvrow.get_or_insert('property%s' % i, substitutefield = 'substitute%s').put()


def queryfromcsv(property1):
  csvrow=ndb.Key('csvrow', property1).get()
  if csvrow:
    return csvrow.substitutefield
  else:
    return property1

def property1InCSV(property1):
  data = memcache.get(property1)
  if data is not None:
      return data
  else:
      data = self.queryfromcsv(property1)
      memcache.add(property1, data, 60)
      return data

def datastore_map(entity_type):
  datastorepropertytocheck = entity_type.property1
  newvalue = property1InCSV(datastorepropertytocheck)
  if newvalue!=datastoreproperty:
    entity_type.property11 = newvalue
    #use the mutation pool
    yield op.db.Put(entity)
Countless answered 25/1, 2015 at 1:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.