Is it ok to use Apache Kafka "infinite retention policy" as a base for an Event sourced system with CQRS?
Asked Answered
H

2

8

I'm currently evaluating options for designing/implementing Event Sourcing + CQRS architectural approach to system design. Since we want to use Apache Kafka for other aspects (normal pub-sub messaging + stream processing), the next logical question would be, "Can we use the Apache Kafka store as event store for CQRS"?, or more importantly would that be a smart decision?

Right now I'm unsure about this. This source seems to support it: https://www.confluent.io/blog/okay-store-data-apache-kafka/

This other source recommends against that: https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c

In my current tests/experiments, I'm having problems similar to those described by the 2nd source, those are:

  1. recomposing an entity: Kafka doesn't seem to support fast retrieval/searching of specific events within a topic (for example: all commands related to an order's history - necessary for the reconstruction of the entity's instance, seems to require the scan of all the topic's events and filter only those matching some entity instance identificator, which is a no go). [This other person seems to have arrived to a similar conclusion: Query Kafka topic for specific record -- that is, it is just not possible (without relying on some hacky trick)]
  2. - write consistency: Kafka doesn't support transactional atomicity on their store, so it seems a common practice to just put a DB with some locking approach (usually optimistic locking) before asynchronously exporting the events to the Kafka queue (I can live with this though, the first problem is much more crucial to me).
  3. The partition problem: On the Kafka documentation, it is mentioned that "order guarantee", exists only within a "Topic's partition". At the same time they also say that the partition is the basic unit of parallelism, in other words, if you want to parallelize work, spread the messages across partitions (and brokers of course). But this is a problem, because an "Event store" in an event sourced system needs the order guarantee, so this means I'm forced to use only 1 partition for this use case if I absolutely need the order guarantee. Is this correct?

Even though this question is a bit open, It really is like that: Have you used Kafka as your main event store on an event sourced system? How have you dealt with the problem of recomposing entity instances out of their command history (given that the topic has millions of entries scanning all the set is not an option)? Did you use only 1 partition sacrificing potential concurrent consumers (given that the order guarantee is restricted to a specific topic partition)?

Any specific or general feedback would the greatly appreciated, as this is a complex topic with several considerations.

Thanks in advance.

EDIT There was a similar discussion 6 years ago here: Using Kafka as a (CQRS) Eventstore. Good idea? Consensus back then was also divided, and a lot of people that suggest this approach is convenient, mention how Kafka deals natively with huge amounts of real time data. Nevertheless the problem (for me at least) isn't related to that, but is more related to how inconvenient are Kafka's capabilities to rebuild an Entity's state- Either by modeling topics as Entities instances (where the exponential explosion in topics amount is undesired), or by modelling topics es entity Types (where amounts of events within the topic make reconstruction very slow/unpractical).

Hammering answered 8/11, 2019 at 9:24 Comment(0)
S
4

your understanding is mostly correct:

  1. kafka has no search. definitely not by key. there's a seek to timestamp, but its imperfect and not good for what youre trying to do.
  2. kafka actually supports a limited form of transactions (see exactly once) these days, although if you interact with any other system outside of kafka they will be of no use.
  3. the unit of anything in kafka (event ordering, availability, replication) is a partition. there are no guarantees across partitions of the same topic.

all these dont stop applications from using kafka as the source of truth for their state, so long as:

  1. your problem can be "sharded" into topic partitions so you dont care about order of events across partitions
  2. youre willing to "replay" an entire partition if/when you lose your local state as bootstrap.
  3. you use log compacted topics to try and keep a bound on their size (because you will need to replay them to bootstrap, see above point)

both samza and (IIUC) kafka-streams back their state stores with log-compacted kafka topics. internally to kafka offset and consumer group management is stored as a log compacted topic with brokers holding a "materialized view" in memory - when ownership of a partition of __consumer_offsets moves between brokers the new leader replays the partition to rebuild this view.

Syncom answered 9/11, 2019 at 16:4 Comment(9)
Hi radai, great input, I find it very helpfull! What you suggest regarding "sharding" a problem into topic partitions is something I haven't though about before that can solve the "concurrent consumer problem". What I get about log compacted topics (towardsdatascience.com/…), is that it maintains only 1 event (the latest) per key, and that wouldn't help me alot, since an instance's history would be described by all its events and not only the last one. I general I believe I could go for a mixed approach. Thanks for your detailed answer.Hammering
Log compacted topic seems very usefull when the latest event (key) aggregates somehow its historic data (similar to an aggregate instance current state), or when I'm only interested in the "latest value" like "current price" or "current conversion rate". For event sourcing where a single event/command only describes a small "state mutation" and at the same time we want all those historic mutations, it seems the log compacted topic wouldn't help there (unless there is something important I overlooked that is).Hammering
What you mention about "willing to "replay" an entire partition if/when you lose your local state". I believe we could do that for projecting new view models/query models, and as for the "main view model" we'll have to figure that out on case by case basis. Again, great feedback, thanks.Hammering
log-compacted topics are usually used when people only care about the last value per key - so for people maintaining a journal of changes to some KV store for example.Syncom
@tony_008 Log compacted topics are very useful for storing aggregate snapshots. Due to Kafka's transaction support, in response to a command, you atomically publish your events to the events topic and your updated aggregate snapshot to a compacted snapshot topic (then storing it in a KV store) - Kafka Streams makes this automatic with KTables. You can join the command topic with the snapshots KTable and then output events + updated snapshots with a few lines of wiring code.Brutal
@Brutal thx for the suggestion, just to double check, are you suggesting using 1 topic for the events and a 2nd topic (compacted) for the aggregates current snapshots?Hammering
@TomW, regarding your comment about kafka streams and KTables - it sounds really interesting, could you please point me to some relevant documentation describing this approach in a little more detail? I have tested publishing to 2 topics (1 for atomic events, and other for aggregate snapshots) under kafka transactional boundary and works great, I'd be great if I could integrate kafka streams + KTables in the way you mention there. ThanksHammering
See infoq.com/news/2018/07/event-sourcing-kafka-streams and the linked talk (I haven’t watched it, but it sounds like what I’m talking about). To your previous question: yes, exactly.Brutal
Ok, thanks I'll give it a look, I also found this: confluent.io/blog/… - sounds like what you meant.Hammering
B
0

I was in several projects that uses Kafka as long term storage, Kafka has no problem with it, specially with the latest versions of Kafka, they introduced something called tiered storage, which give you the possibility in Cloud environment to transfer the older data to slower/cheaper storage.

And you should not worry that much about transactions, in todays IT there are other concepts to deal with it like Event Sourcing, [Boundary Context][3,] yes, you should differently when you are designing your applications, how?, that is explained in this video.

But you are right, your choice about query this data will be limited, easiest way is to use Kafka Streams and KTable but this will be a Key/Value database so you can only ask questions about your data over primary key.

Your next best choice is to implement the Query part of the CQRS with the help of Frameworks like Akka Projection, I wrote a blog about how can you use Akka Projection with Elasticsearch, which you can find here and here.

Bifoliolate answered 2/6, 2022 at 5:35 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.