Ensuring that all messages have been read from Kafka topic using REST Proxy
Asked Answered
S

2

7

I'm new to Kafka, and our team is investigating patterns for inter-service communication.

The goal

We have two services, P (Producer) and C (Consumer). P is the source of truth for a set of data that C needs. When C starts up it needs to load all of the current data from P into its cache, and then subscribe to change notifications. (In other words, we want to synchronize data between the services.)

The total amount of data is relatively low, and changes are infrequent. A brief delay in synchronization is acceptable (eventual consistency).

We want to decouple the services so that P and C do not need to know about each other.

The proposal

When P starts up, it publishes all of its data to a Kafka topic that has log compaction enabled. Each message is an aggregate with a key of its ID.

When C starts up, it reads all of the messages from the beginning of the topic and populates its cache. It then keeps reading from its offset to be notified of updates.

When P updates its data, it publishes a message for the aggregate that changed. (This message has the same schema as the original messages.)

When C receives a new message, it updates the corresponding data in its cache.

enter image description here

Constraints

We are using the Confluent REST Proxy to communicate with Kafka.

The issue

When C starts up, how does it know when it's read all of the messages from the topic so that it can safely start processing?

It's acceptable if C does not immediately notice a message that P sent a second ago. It's not acceptable if C starts processing before consuming a message that P sent an hour ago. Note that we don't know when updates to P's data will occur.

We do not want C to have to wait for the REST Proxy's poll interval after consuming each message.

Southwards answered 26/7, 2019 at 14:43 Comment(7)
I don't quite understand your first question "When C starts up, how does it know when it's read all of the messages from the topic so that it can safely start processing?". Do you mean how the Consumer knows from what offset to start processing, in case it has previously stopped?Nourishment
@GiorgosMyrianthous I think the question is more about ensuring that C has a fully populated cache. C needs its internal cache of data to be fully populated and up to date before it can finish its boot process. When reading from the log, there is no indication to C that it has consumed everything that it needs. C doesn't necessarily know how many records are in the log, nor does it necessarily know how many records should make up its cache. So it has no way of knowing when its cache is ready for action.Jarita
@Jarita is correct. From the CLI, I can check if my consumer group has a lag of 0, meaning it's up-to-date with writes. From the REST Proxy, I haven't seen any way to check this.Southwards
Doesn't the REST Proxy expose the group information as well? Or at least topic offset informationRaid
@cricket_007 I'm not seeing it; just the consumer offsets: docs.confluent.io/current/kafka-rest/api.html#consumers (thanks for the tag correction BTW!)Southwards
Yeah. You could make a fake group that's not reading any data, then seek it to the end, then find offsets from that... At least that's one way without using a separate service that does expose lagRaid
@cricket_007 You don't even need a fake group; you can use the same one. This is only on service startup, and you're going to be seeking to the beginning after this anyway. I did a proof of concept and this looks like it will work. If you want to write up an answer I'll accept it.Southwards
R
4

If you would like to find the end partitions of a consumer group, in order to know when you've gotten all data at a point in time, you can use

POST /consumers/(string: group_name)/instances/(string: instance)/positions/end

Note that you must do a poll (GET /consumers/.../records) before that seek, but you don't need to commit.

If you don't want to affect the offsets of your existing consumer group, you would have to post a separate one.

You can then query offsets with

GET /consumers/(string: group_name)/instances/(string: instance)/offsets

Note that there might be data being written to the topic between calculating the end offsets and actually reaching the end, so you might want to have some additional settings to do a few more consumptions once you finally do reach the end.

Raid answered 29/7, 2019 at 18:15 Comment(3)
POSTing to /positions/end changes the fetch (consumed) offset, but does not commit offsets. GETting /offsets gets the last committed offsets. Even committing offsets after seeking to end doesn't appear to change anything. Any thoughts?Southwards
It appears that you have to seek to end, poll (GET /records), then commit offsets. After that you can query to get the offsets.Southwards
We did implement this solution, and had nondeterministic results during testing. Sometimes we'd get an empty array for offsets when it should return values. It's dependent on everything from timeouts on the fetch requests to max_bytes query parameters, and I don't recommend this approach.Southwards
S
0

Alternative solution (not tested):

What if the consumer also acts as a producer?

  1. When C starts up, it publishes a message to the compacted topic (the same one it’s going to read from) with a key that won’t overlap with the keys from P. The value is a GUID or a random number; basically a nonce.
  2. C subscribes to the compacted topic and starts consuming.
  3. When C receives its unique key with a nonce matching what it sent (it could get the key multiple times if the cleaner thread hasn’t compacted the log yet), it knows that it can safely start processing.

This does assume a single partition.

Southwards answered 31/7, 2019 at 21:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.