Datastore Contention Errors
Asked Answered
L

2

0
Error: too much contention on these datastore entities. please try again.
at /Users/wgosse/Documents/data-transfer-request/node_modules/grpc/src/node/src/client.js:554:15 code: 409, metadata: Metadata { _internal_repr: {} }

We’re attempting to set up a system where a node event listener will pull messages from a Pubsub queue and use these messages to update datastore entities as they come in. Unfortunately, we’re running into a contention error when too many messages are pulled off at once. Normally, we would batch these requests but having this code in the event listener makes this difficult to pull off. Is there a way besides batching to eliminate these errors?

The entities we’re trying to update do have a shared ancestor if that’s relevant.

listenForMessages establishes the event listener and shows the callback with the update and acknowledgement logic.

// Start listener to wait for return messages
pubsub_model.listenForMessages((message) => {
  filepath_ctrl.updateFromSub(
    message.attributes,
    (err, data) => {
      if (err) {
        console.log('PUBSUB: Unable to update filepath entity. Error message: ', err);
        return false;
      }
      console.log('PUBSUB: Filepath entity updated.');

      // "Ack" (acknowledge receipt of) the message
      message.ack();
      return data;
    }
  );
});

/**
 * Establishes an event listener to recieve return messages post processing
 * @param {Integer} retries
 * @param {Function} messageHandler
 */
function listenForMessages(messageCallback) {
  pubsubConnect(
    0,
    return_topic,
    config.get('PUBSUB_RECIEVE_TOPIC'),
    return_sub,
    config.get('PUBSUB_RECIEVE_SUB'),
    (err) => {
      if (err) {
        console.log('PUBSUB: ERROR: Error encountered while attempting to establish listening connection: ', err);
        return false;
      }
      console.log('PUBSUB: Listening for messages...');
      //function for handling messages
      const msgHandlerConstruct = (message) => {
        messageHandler(messageCallback, message);
      };
      const errHandler = (puberr) => {
        console.log('PUBSUB: ERROR: Error encountered when listening for messages: ', puberr);
      }

      return_sub.on('message', msgHandlerConstruct);
      return_sub.on('error', errHandler);


      return true;
    }
  );
  return true;
}

/**
 * Business logic for processing return messages.  Upserts the message into the datastore as a filepath.
 * @param {object} message
 */
function messageHandler(callback, message) {
  console.log(`PUBSUB: Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${JSON.stringify(message.attributes)}`);
  // Datastore update logic

  //Callback MUST acknowledge after error detection
  callback(message);
}

updateFromSub takes a message and structures the attributes into an entity to be saved to datastore, then calls our update method.

/**
 * Gets the entity to be updated and updates anything that's changed in the message
 * @param {*} msg_id
 * @param {*} transfer_id
 * @param {*} cb
 */
module.exports.updateFromSub = function (msg_attributes, cb) {
  if (msg_attributes.id && msg_attributes.transfer_id) {
    filepath_model.read(msg_attributes.id, msg_attributes.transfer_id, (err, entity) => {
      if (err) {
        return cb(err);
      }
      writeUpdateToOject(entity, msg_attributes, (obj_err, updated_entity) => {
        if (obj_err) {
          return cb(err);
        }
        filepath_model.update(msg_attributes.id, msg_attributes.transfer_id, updated_entity, cb);
        return true;
      });
      return true;
    });
  } else {
    cb('Message missing id and/or transfer id.  Message: ', msg_attributes);
    return false;
  }
  return true;
};

The update method is from the GCP tutorial, but has been modified to accommodate a parent child relation.

const Datastore = require('@google-cloud/datastore');
const ds = Datastore({
    projectId: config.get('GCLOUD_PROJECT')
});
function update (id, parentId, data, cb) {
  let key;
  if (id) {
    key = ds.key([parentKind,
      parseInt(parentId, 10),
      kind,
      parseInt(id, 10)]);
  } else {
    key = ds.key([parentKind,
      parseInt(parentId, 10),
      kind]);
  }

  const entity = {
    key: key,
    data: toDatastore(data, ['description'])
  };

  ds.save(
    entity,
    (err) => {
      data.id = entity.key.id;
      cb(err, err ? null : data);
    }
  );
}
Lunitidal answered 13/12, 2017 at 15:44 Comment(0)
G
3

You are reaching writes per second limit on the same entity group. Default it is 1 write per second.

Datastore limits table.

https://cloud.google.com/datastore/docs/concepts/limits

It seems that pubsub generating messages with too high intensity, so datastore can't write them one by one within this limit. What you can try, is to use pubsub polling subscription, collect set of updates and write them with a single batch.

Gause answered 13/12, 2017 at 18:44 Comment(0)
N
0

Sounds like a case of hotspotting. When you need to perform a high rate of sustained writes to an entity, you may choose to manually shard your entities into entities of different kinds, but using the same key.

See here: https://cloud.google.com/datastore/docs/best-practices#high_readwrite_rates_to_a_narrow_key_range

Not answered 4/3, 2022 at 16:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.