What is partition key in AWS Kinesis all about?
Asked Answered
A

3

61

I was reading about AWS Kinesis. In the following program, I write data into the stream named TestStream. I ran this piece of code 10 times, inserting 10 records into the stream.

var params = {
    Data: 'More Sample data into the test stream ...',
    PartitionKey: 'TestKey_1',
    StreamName: 'TestStream'
};

kinesis.putRecord(params, function(err, data) {
   if (err) console.log(err, err.stack); // an error occurred
   else     console.log(data);           // successful response
});

All the records were inserted successfully. What does partition key really mean here? What is it doing in the background? I read its documentation but did not understand what it meant.

Auric answered 23/1, 2018 at 10:52 Comment(0)
P
105

Partition keys only matter when you have multiple shards in a stream (but they're required always). Kinesis computes the MD5 hash of a partition key to decide what shard to store the record on (if you describe the stream you'll see the hash range as part of the shard decription).

So why does this matter?

Each shard can only accept 1,000 records and/or 1 MB per second (see PutRecord doc). If you write to a single shard faster than this rate you'll get a ProvisionedThroughputExceededException.

With multiple shards, you scale this limit: 4 shards gives you 4,000 records and/or 4 MB per second. Of course, there are caveats.

The biggest is that you must use different partition keys. If all of your records use the same partition key then you're still writing to a single shard, because they'll all have the same hash value. How you solve this depends on your application: if you're writing from multiple processes then it might be sufficient to use the process ID, server's IP address, or hostname. If you're writing from a single process then you can either use information that's in the record (for example, a unique record ID) or generate a random string.

Second caveat is that the partition key counts against the total write size, and is stored in the stream. So while you could probably get good randomness by using some textual component in the record, you'd be wasting space. On the other hand, if you have some random textual component, you could calculate your own hash from it and then stringify that for the partition key.

Lastly, if you're using PutRecords (which you should, if you're writing a lot of data), individual records in the request may be rejected while others are accepted. This happens because those records went to a shard that was already at its write limits, and you have to re-send them (after a delay).


The other answer points out that records are ordered within a partition, and claims that this is the real reason for a partition key. However, this ordering reflects the order in which Kinesis accepted the records, which is not necessarily the order that the client intended.

  • If the client is single-threaded and uses the PutRecord API, then yes, ordering should be consistent between client and partition.
  • If the client is multi-threaded, then all of the standard distributed systems causes of disorder (internal thread scheduling, network routing, service scheduling) could cause inconsistent ordering.
  • If the client uses the PutRecords API, individual records from a batch can be rejected and must be resent. The doc is very clear that this API call does not preserve ordering. And in a high-volume environment, this is the API that you will use.

In addition to inconsistent ordering when writing, a reshard operation introduces the potential for inconsistencies when reading. You must follow the chain from parent to child(ren), recognizing that there may be more or fewer children and that the split may not be even. A naive "one thread per shard" approach (such as used by Lambda) won't work.

So, bottom line: yes, shards provide ordering. However, relying on that order may introduce hard-to-diagnose bugs into your application.

In most cases, it won't matter. But if you need guaranteed order (such as when processing a transaction log), then you must add your own ordering information into your record when it's written, and ensure that records are properly ordered when read.

Pacien answered 23/1, 2018 at 11:24 Comment(12)
Is there a limit to data the shard can hold? What if I have only 1 partition key in my application but 100 shards?Auric
@SuhailGupta - As I said, "you must use different partition keys." If you use the same partition key for every record then every record will go to a single shard and you'll be limited to the 1,000 records or 1 Mb per second limit of that shard.Pacien
What does it mean to use different partition key to insert each record? Why would I want to use same partition key for each record?Auric
Re "What does it mean to use different partition key to insert each record": PartitionKey: 'TestKey_1' versus PartitionKey: 'TestKey_2' versus PartitionKey: 'TestKey_3' and so on. Surely that doesn't need to be explained?Pacien
Re "Why would I want to use same partition key for each record?" - sometimes a single writer doesn't need the throughput of multiple shards but there are multiple writers. In that case using the same partition key for each writer simplifies the code. For example, in this Log4J appender that I wrote the default partition key is used for every message logged from a single application instance. If you have lots of instances, that spreads the load across shards.Pacien
@SuhailGupta - so did this answer your question?Pacien
If I may ask as well, what would happen if I had 2 shards but more than 2 possible partition keys? Does Kinesis just make sure that all data for a specific partition key uses the same shard? For example if I had 4 possible partition keys it will just split 2 keys over one shard and the other 2 keys over the other shard?Fein
@GarethMcCumskey - in general, yes, the partition keys will be assigned equally to shards. It's based on a hash function, so if you just have 4 keys it's possible that 3 will go to one shard and 1 to the other, or even 4 and 0 (but that's extremely unlikely).Pacien
How do I decrypt the value in the data key?Adit
As per the AWS docs, generally the number of partition keys must be very large than the number of shards available. So assuming a scenario, where i have 2 shards available and 500 sensors are each pushing 100-byte data per second to the Data stream, and if i were to use sensor-id as the partition key, won't the md5 hash generate 500 different values? So how does these 500 different values maps to either of the two created shards.I know that there is a start and end hash key for each shards but that is allotted as soon as the shard is created.Boiardo
Also what is the difference between partition key and sequence number? In case of the sensor example, what would the sequence number that i should be using?Boiardo
It is similar to Redis hash slots (partition key) defining data stored on what node (what kinesis shard). Ordering is an add-on feature, as we can see it doesn't appear on Redis.Swane
P
50

The accepted answer explains what are partition keys and and what they're used for in Kinesis (to decide to which shard to send the data to). Unfortunately, it does not explain why partition keys are needed in the first place.

In theory AWS could create a random partition key for each record which will result a near-perfect spread.

The real reason partitions are used is for "ordering/streaming". Kinesis maintains ordering (sequence number) for each shard.

In other words, by streaming X and afterwards Y to shard Z it is guaranteed, that X will be pulled from the stream before Y (when pulling records from all shards). On the other hand, by streaming X to shard Z1 and afterwards Y to shard Z2 there is no guarantee on the ordering (when pulling records from all shards). Y may definitely be pulled before X.

The shard "streaming" capability is useful in many cases.

(E.g. a video service streaming a movie to a user using the username and the movie name as the partition key).

(E.g. working on a stream of common events, and applying aggregation).

In cases where ordering (streaming) or grouping (e.g aggregation) is not required, generating a random partition key will suffice.

Professed answered 6/9, 2019 at 17:50 Comment(1)
Does that mean the shard limit of 500 is the limit that a single stream can be used for partition ordering? or do the shards live and die if there's nothing in them?Stheno
A
2

If you trouble understanding shards then think of them as different queues handled by a single resource of kinesis which you have created. As previously explained, you can use multiple queues because one queue can only handle 1MB/sec data coming in. So use multiple queues to handle more throughput. Mostly you will be using an AWS lambda to process the queue. Having multiple queues also get you multiple lambdas meaning now you can handle much more data coming out. But this is not related to your question on what does partition key means.

If you don't define your own partition key then your data is going to get randomly to any of the queues/shards and your lambda is only working on the same type of data always and this way the order of your data isnt going to be maintained because if lambda rejects a batch of records kinesis is going release the same batch of data again before pushing the next batch.

but if you used a proper partitioning key, lets say a customerID or a movieID, you obviously want the events of the purchase or the content of the movie to come in the same order. and if you used the same queue/shard using a unique customerID/movieID you would eventually get all the data sequentially into the same queue/shard eventually being retrieved by the lambda and if lambda rejects a batch, kinesis will make sure unless that batch is successfully processed, it wont get the next batch of data.

This, my friend is the answer to your question. Hope it helps.

Albertina answered 5/1, 2023 at 12:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.