Error: too much contention on these datastore entities. please try again.
at /Users/wgosse/Documents/data-transfer-request/node_modules/grpc/src/node/src/client.js:554:15 code: 409, metadata: Metadata { _internal_repr: {} }
We’re attempting to set up a system where a node event listener will pull messages from a Pubsub queue and use these messages to update datastore entities as they come in. Unfortunately, we’re running into a contention error when too many messages are pulled off at once. Normally, we would batch these requests but having this code in the event listener makes this difficult to pull off. Is there a way besides batching to eliminate these errors?
The entities we’re trying to update do have a shared ancestor if that’s relevant.
listenForMessages establishes the event listener and shows the callback with the update and acknowledgement logic.
// Start listener to wait for return messages
pubsub_model.listenForMessages((message) => {
filepath_ctrl.updateFromSub(
message.attributes,
(err, data) => {
if (err) {
console.log('PUBSUB: Unable to update filepath entity. Error message: ', err);
return false;
}
console.log('PUBSUB: Filepath entity updated.');
// "Ack" (acknowledge receipt of) the message
message.ack();
return data;
}
);
});
/**
* Establishes an event listener to recieve return messages post processing
* @param {Integer} retries
* @param {Function} messageHandler
*/
function listenForMessages(messageCallback) {
pubsubConnect(
0,
return_topic,
config.get('PUBSUB_RECIEVE_TOPIC'),
return_sub,
config.get('PUBSUB_RECIEVE_SUB'),
(err) => {
if (err) {
console.log('PUBSUB: ERROR: Error encountered while attempting to establish listening connection: ', err);
return false;
}
console.log('PUBSUB: Listening for messages...');
//function for handling messages
const msgHandlerConstruct = (message) => {
messageHandler(messageCallback, message);
};
const errHandler = (puberr) => {
console.log('PUBSUB: ERROR: Error encountered when listening for messages: ', puberr);
}
return_sub.on('message', msgHandlerConstruct);
return_sub.on('error', errHandler);
return true;
}
);
return true;
}
/**
* Business logic for processing return messages. Upserts the message into the datastore as a filepath.
* @param {object} message
*/
function messageHandler(callback, message) {
console.log(`PUBSUB: Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${JSON.stringify(message.attributes)}`);
// Datastore update logic
//Callback MUST acknowledge after error detection
callback(message);
}
updateFromSub takes a message and structures the attributes into an entity to be saved to datastore, then calls our update method.
/**
* Gets the entity to be updated and updates anything that's changed in the message
* @param {*} msg_id
* @param {*} transfer_id
* @param {*} cb
*/
module.exports.updateFromSub = function (msg_attributes, cb) {
if (msg_attributes.id && msg_attributes.transfer_id) {
filepath_model.read(msg_attributes.id, msg_attributes.transfer_id, (err, entity) => {
if (err) {
return cb(err);
}
writeUpdateToOject(entity, msg_attributes, (obj_err, updated_entity) => {
if (obj_err) {
return cb(err);
}
filepath_model.update(msg_attributes.id, msg_attributes.transfer_id, updated_entity, cb);
return true;
});
return true;
});
} else {
cb('Message missing id and/or transfer id. Message: ', msg_attributes);
return false;
}
return true;
};
The update method is from the GCP tutorial, but has been modified to accommodate a parent child relation.
const Datastore = require('@google-cloud/datastore');
const ds = Datastore({
projectId: config.get('GCLOUD_PROJECT')
});
function update (id, parentId, data, cb) {
let key;
if (id) {
key = ds.key([parentKind,
parseInt(parentId, 10),
kind,
parseInt(id, 10)]);
} else {
key = ds.key([parentKind,
parseInt(parentId, 10),
kind]);
}
const entity = {
key: key,
data: toDatastore(data, ['description'])
};
ds.save(
entity,
(err) => {
data.id = entity.key.id;
cb(err, err ? null : data);
}
);
}