How to write more than 25 items/rows into Table for DynamoDB?
Asked Answered
G

7

14

I am quite new to Amazon DynamoDB. I currently have 20000 rows that I need to add to a table. However, based on what I've read, it seems that I can only write up to 25 rows at a time using BatchWriteItem class with 25 WriteRequests. Is it possible to increase this? How can I write more than 25 rows at a time? It is currently taking about 15 minutes to write all 20000 rows. Thank you.

Giefer answered 26/6, 2015 at 5:55 Comment(0)
C
13

You can only send up to 25 items in a single BatchWriteItem request, but you can send as many BatchWriteItem requests as you want at one time. Assuming you've provisioned enough write throughput, you should be able to speed things up significantly by splitting those 20k rows between multiple threads/processes/hosts and pushing them to the database in parallel.

It's maybe a bit heavyweight for that small of a dataset, but you can use AWS Data Pipeline to ingest data from S3. It basically automates the process of creating a Hadoop cluster to suck down your data from S3 and send it to DynamoDB in a bunch of parallel BatchWriteItem requests.

Chanticleer answered 26/6, 2015 at 6:24 Comment(4)
Thank you David. I'll try using some parallel threads. What's provisioned throughput?Giefer
If I use AWS Data Pipeline, does this mean I should output all of the data from my app into S3? Is the benefit of outputting to S3 ==> Data Pipeline ==> DynamoDB versus. directly writing to DynamoDB the speed of writing files to S3?Giefer
I edited in a link with more info about provisioned throughput, but the short story is that you tell DynamoDB ahead of time when you create a table how many reads/writes you want to be able to do to it per second. If you send requests faster than this, the excess ones will be rejected.Chanticleer
Yes, you'd need to get all your data to S3 somehow to use data pipeline. If it's already all local to your app, uploading it to S3 first and then importing to DynamoDB will almost certainly be slower than doing a parallel upload directly to DynamoDB. If your app is getting it from somewhere else and that somewhere could be S3, data pipeline will save you from having to write the code to do the parallel transfer.Chanticleer
E
8

I was looking for some code to do this with the JavaScript SDK. I couldn't find it, so I put it together myself. I hope this helps someone else!

function multiWrite(table, data, cb) {
    var AWS = require('aws-sdk');
    var db = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'});

    // Build the batches
    var batches = [];
    var current_batch = [];
    var item_count = 0;
    for(var x in data) {
        // Add the item to the current batch
        item_count++;
        current_batch.push({
            PutRequest: {
                Item: data[x]
            }
        });
        // If we've added 25 items, add the current batch to the batches array
        // and reset it
        if(item_count%25 == 0) {
            batches.push(current_batch);
            current_batch = [];
        }
    }
    // Add the last batch if it has records and is not equal to 25
    if(current_batch.length > 0 && current_batch.length != 25) batches.push(current_batch);

    // Handler for the database operations
    var completed_requests = 0;
    var errors = false;
    function handler(request) {
        return function(err, data) {
            // Increment the completed requests
            completed_requests++;

            // Set the errors flag
            errors = (errors) ? true : err;

            // Log the error if we got one
            if(err) {
                console.error(JSON.stringify(err, null, 2));
                console.error("Request that caused database error:");
                console.error(JSON.stringify(request, null, 2));
            }

            // Make the callback if we've completed all the requests
            if(completed_requests == batches.length) {
                cb(errors);
            }
        }
    }

    // Make the requests
    var params;
    for(x in batches) {
        // Items go in params.RequestItems.id array
        // Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
        params = '{"RequestItems": {"' + table + '": []}}';
        params = JSON.parse(params);
        params.RequestItems[table] = batches[x];

        // Perform the batchWrite operation
        db.batchWrite(params, handler(params));
    }
}
Elsyelton answered 20/6, 2016 at 20:2 Comment(1)
Note, this doesn't handle failures in each batchWrite. For example, you may need to handle the UnprocessedItems returned from the batchWrite result. See for more info docs.aws.amazon.com/amazondynamodb/latest/APIReference/…Ulu
D
3
function putInHistory(data,cb) {
  var arrayOfArray25 = _.chunk(data, 25);
  async.every(arrayOfArray25, function(arrayOf25, callback) {
   var params = {
     RequestItems: {
    [TABLES.historyTable]: []
   }
 };
 arrayOf25.forEach(function(item){
  params.RequestItems[TABLES.historyTable].push({
    PutRequest: {
      Item: item
    }
  })
 });
 docClient.batchWrite(params, function(err, data) {
   if (err){ 
     console.log(err);
     callback(err);
   } else {
     console.log(data);
     callback(null, true);
   };
 });
}, function(err, result) {
 if(err){
   cb(err);
 } else {
   if(result){
     cb(null,{allWritten:true});
   } else {
    cb(null,{allWritten:false});
   }
 }
});
}

You can use lodash to make chunks of data from the array and then use async library's each/every method to do a batchWrite on chunks of 25 elements

Delogu answered 9/2, 2018 at 9:10 Comment(0)
A
1

I wrote an npm package that should work as a simple drop-in replacement for the batchWrite method, you just need to pass the dynamoDB instance as the first parameter and things should work: https://www.npmjs.com/package/batch-write-all

Check the example in the project readme file:

// Use bellow instead of this: dynamodb.batchWrite(params).promise();
batchWriteAll(dynamodb, params).promise();
Absorption answered 28/6, 2021 at 17:20 Comment(0)
M
1

Using aws cli and aws-vault, this is what I do.

Let's imagine you have the following file (data.json) with 1000 rows

{ "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}},
... to 1000

and you need to split it into chunk files with 25 rows in each!

I use the following c# code in LinqPad to generate the .sh file and json chunks to be able to insert them into dynamodb using aws cli

void Main()
{
var sourcePath= @"D:\data\whereYourMainJsonFileIsLocated\";
var sourceFilePath = @"data.json";

var awsVaultProfileName = "dev";
var env = "dev"; 
var tableName = "dynamodb-table-name";

var lines = System.IO.File.ReadAllLines(sourcePath + sourceFilePath);

var destinationPath = Path.Combine(sourcePath, env);
var destinationChunkPath = Path.Combine(sourcePath, env, "chunks");

if (!System.IO.Directory.Exists(destinationChunkPath))
    System.IO.Directory.CreateDirectory(destinationChunkPath);

System.Text.StringBuilder shString= new System.Text.StringBuilder();

for (int i = 0; i < lines.Count(); i = i+25)
{
    var pagedLines = lines.Skip(i).Take(25).ToList().Distinct().ToList();

    System.Text.StringBuilder sb = new System.Text.StringBuilder();
    sb.AppendLine("{");
    sb.AppendLine($"  \"{tableName}\": [");
    
    foreach (var element in pagedLines)
    {
        if (element == pagedLines.Last())
            sb.AppendLine(element.Substring(0, element.Length-1));
        else
            sb.AppendLine(element);
    }
    
    sb.AppendLine("]");
    sb.AppendLine("}");

    var fileName = $"chunk{i / 25}.json";
    System.IO.File.WriteAllText(Path.Combine(destinationChunkPath, fileName), sb.ToString(), Encoding.Default);


    shString.AppendLine($@"aws-vault.exe exec {awsVaultProfileName} -- aws dynamodb batch-write-item --request-items file://chunks/{fileName}");
}

System.IO.File.WriteAllText(Path.Combine(destinationPath, $"{tableName}-{env}.sh"), shString.ToString(), Encoding.Default);
}

the result would be chunk files as chunk0.json, chunk1.json, etc

{
  "dynamodb-table-name": [
    { "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
    { "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
    { "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}}
  ]
}

and .sh file

aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk0.json
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk1.json
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk2.json

and finally just run the .sh file and you have all data in your table!

Missal answered 28/9, 2021 at 12:5 Comment(0)
C
0

From the answer from @Geerek here is the solution with a lambda function:

exports.handler = (event, context, callback) => {
  console.log(`EVENT: ${JSON.stringify(event)}`);

  var AWS = require('aws-sdk');

  AWS.config.update({ region: process.env.REGION })

  var docClient = new AWS.DynamoDB.DocumentClient();

  const {data, table, cb} = event

  // Build the batches
  var batches = [];
  var current_batch = [];
  var item_count = 0;

  for (var i = 0; i < data.length; i++) {
    // Add the item to the current batch
    item_count++
    current_batch.push({
      PutRequest: {
        Item: data[i],
      },
    })
    // If we've added 25 items, add the current batch to the batches array
    // and reset it
    if (item_count % 25 === 0) {
      batches.push(current_batch)
      current_batch = []
    }
  }

  // Add the last batch if it has records and is not equal to 25
  if (current_batch.length > 0 && current_batch.length !== 25) {
    batches.push(current_batch)
  }

  // Handler for the database operations
  var completed_requests = 0
  var errors = false

  function handler (request) {

    console.log('in the handler: ', request)

    return function (err, data) {
      // Increment the completed requests
      completed_requests++;

      // Set the errors flag
      errors = (errors) ? true : err;

      // Log the error if we got one
      if(err) {
        console.error(JSON.stringify(err, null, 2));
        console.error("Request that caused database error:");
        console.error(JSON.stringify(request, null, 2));
        callback(err);
      }else {
        callback(null, data);
      }

      // Make the callback if we've completed all the requests
      if(completed_requests === batches.length) {
        cb(errors);
      }
    }
  }

  // Make the requests
  var params;
  for (var j = 0; j < batches.length; j++) {
    // Items go in params.RequestItems.id array
    // Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
    params = '{"RequestItems": {"' + table + '": []}}'
    params = JSON.parse(params)
    params.RequestItems[table] = batches[j]

    console.log('before db.batchWrite: ', params)

    // Perform the batchWrite operation
    docClient.batchWrite(params, handler(params))
  }
};
Closure answered 30/6, 2020 at 21:20 Comment(0)
P
0
const { dynamoClient } = require("./resources/db");
const { v4: uuid } = require("uuid");

const batchWriteLooper = async () => {
   let array = [];
   for (let i = 0; i < 2000; i++) {
   array.push({
     PutRequest: {
      Item: {
      personId: uuid(),
      name: `Person ${i}`,
      age: Math.floor(Math.random() * 100),
      gender: "Male",
      createdAt: new Date(),
      updatedAt: new Date(),
    },
  },
});
}

var perChunk = 20; // items per chunk
var result = array.reduce((resultArray, item, index) => {
const chunkIndex = Math.floor(index / perChunk);

if (!resultArray[chunkIndex]) {
  resultArray[chunkIndex] = []; // start a new chunk
}
resultArray[chunkIndex].push(item);
return resultArray;
}, []);

Promise.all(
result.map(async (chunk) => {
  const params = {
    RequestItems: {
      "persons": chunk,
    },
  };
  return await dynamoClient.batchWrite(params).promise();
})
).then(() => {
 console.log("done");
 });
};

batchWriteLooper();
Paramedical answered 11/3, 2022 at 12:36 Comment(1)
Welcome to Stack Overflow. Code is a lot more helpful when it is accompanied by an explanation. Stack Overflow is about learning, not providing snippets to blindly copy and paste. Please edit your question and explain how it answers the specific question being asked. See How to Answer.Pitanga

© 2022 - 2024 — McMap. All rights reserved.