Best way to join two (or more) kafka topics in KSQL emiting changes from all topics?
Asked Answered
V

2

6

We have a "microservices" platform and we are using debezium for change data capture from databases on these platforms which is working nicely.

Now, we'd like to make it easy for us to join these topics and stream the results into a new topic which could be consumed by multiple services.

Disclaimer: this assumes v0.11 ksqldb and cli (seems like much of this might not work in older versions)

Example of two tables from two database instances that stream into Kafka topics:

-- source identity microservice (postgres)
CREATE TABLE public.user_entity (
    id varchar(36) NOT NULL,
    first_name varchar(255) NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');

-- source organization microservice (postgres)
CREATE TABLE public.user_info (
    id varchar(36) NOT NULL,
    user_entity_id varchar(36) NOT NULL,
    business_unit varchar(255) NOT NULL,
    cost_center varchar(255) NOT NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');

Option 1 : Streams

CREATE STREAM stream_user_info_by_user_entity_id
AS SELECT * FROM stream_user_info
PARTITION BY user_entity_id
EMIT CHANGES;

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN stream_user_info_by_user_entity_id ui WITHIN 365 DAYS ON ue.id = ui.user_entity_id 
EMIT CHANGES;

Notice WITHIN 365 DAYS, conceptually these tables could go a very long time without being changed so this window would be technically infinitely large. This looks fishy and seems to hint that this is not a good way to do this.

Option 2 : Tables

CREATE TABLE ktable_user_info_by_user_entity_id (
    user_entity_id,
    first_name,
    business_unit,
    cost_center
)
with (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON ue.id = ui.user_entity_id 
EMIT CHANGES;

We no longer need the window WITHIN 365 DAYS, so this feels more correct. However this only emits a change when a message is sent to the stream not the table.

In this example: User updates first_name -> change is emitted User updates business_unit -> no change emitted

Perhaps there might be a way to create a merged stream partitioned by the user_entity_id and join to child tables which would hold the current state, which leads me to....

Option 3 : Merged stream and tables

-- "master" change stream with merged stream output
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

CREATE STREAM stream_user_entity_by_id
AS SELECT * FROM stream_user_entity
PARTITION BY id
EMIT CHANGES;

CREATE TABLE ktable_user_entity_by_id (
    id VARCHAR PRIMARY KEY,
    first_name VARCHAR
) with (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');

SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

This one looks the best, but appears to have a lot of moving components for each table we have 2 streams, 1 insert query, 1 ktable. Another potential issue here might be a hidden race condition where the stream emits the change before the table is updated under the covers.

Option 4 : More merged tables and streams

CREATE STREAM stream_user_entity_changes_enriched
AS SELECT 
    ue.id AS user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_by_id ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

CREATE STREAM stream_user_info_changes_enriched
AS SELECT 
    ui.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_info_by_user_entity_id ui
LEFT JOIN ktable_user_entity_by_id ue ON ui.user_entity_id = ue.id
EMIT CHANGES;


CREATE STREAM stream_user_changes_enriched (user_entity_id VARCHAR, first_name VARCHAR, business_unit VARCHAR, cost_center VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes_enriched', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_entity_changes_enriched;
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_info_changes_enriched;

This is conceptually the same as the earlier one, but the "merging" happens after the joins. Conceivably, this might eliminate any potential race condition because we're selecting primarily from the streams instead of the tables.

The downside is that the complexity is even worse than option 3 and writing and tracking all these streams for any joins with more than two tables would be kind of mind numbing...

Question : What method is the best for this use case and/or are we attempting to do something that ksql shouldn't be used for? Would we better off to just offload this to traditional RDBMS or spark alternatives?

Vaas answered 23/9, 2020 at 14:10 Comment(5)
i think option 2 is the expected behavior. Although a change in the table does not emit an event immediately, any subsequent changes on the stream after that will emit events on the output stream with the new information from the table. I'm not sure what should be the expected behavior if a change in the table emits event immediately in case the change is an update on an existing row, we may need to go back to the old events in the output stream to update, but since stream is immutable, this should not happen.Sclaff
yeah, i agree that this expected behavior. i'm looking for a solution that essentially works around this behavior.Vaas
Do you think table-table (ksql table) is more suitable in this case? What you want as the output here does not sound like a stream.Sclaff
I would like a stream, I want other applications to be able to consume the stream and be notified of any updates of these two combined stream. I can join them in the consumer for example, but if i have multiple service who need to be aware of these it would be preferred to have one joined stream on the kafka side and the consumer doesn't need to be aware of the logic involved.Vaas
The joined table is not a stream in the sense that historical records are kept. If I'm correct, applications consuming the joined table are still notified of any updates of the two streams by the latest state of the joined records. This sounds like what you wantSclaff
V
4

I'm going to attempt to answer my own question, only accept if upvoted.

The answer is: Option 3

Here are the reasons for this use case this would be the best, while perhaps could be subjective

  • Streams partitioned by primary and foreign keys are common and simple.
  • Tables based on these streams are common and simple.
  • Tables used in this way will not be a race condition.

All options have merits, e.g. if you don't care about emitting all the changes or if the data behaves like streams (logs or events) instead of slow changing dimensions (sql tables).

As for "race conditions", the word "table" tricks your mind that you are actually processing and persisting data. In reality is that they are not actually physical tables, they actually behave more like sub-queries on streams. Note: It might be an exception for aggregation tables which actually produce topics (which I would suggest is a different topic, but would love to see comments)

In the end (syntax may have some slight bugs):

---------------------------------------------------------
-- shared objects (likely to be used by multiple queries)
---------------------------------------------------------

-- shared streams wrapping topics
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');

-- shared keyed streams (i like to think of them as "indexes")
CREATE STREAM stream_user_entity_by_id AS 
SELECT * FROM stream_user_entity PARTITION BY id
EMIT CHANGES;
CREATE STREAM stream_user_info_by_user_entity_id AS 
SELECT * FROM stream_user_info PARTITION BY user_entity_id
EMIT CHANGES;

-- shared keyed tables (inferring columns with schema registry)
CREATE TABLE ktable_user_entity_by_id (id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');
CREATE TABLE ktable_user_info_by_user_entity_id (user_entity_id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');


---------------------------------------------------------
-- query objects (specific to the produced data)
---------------------------------------------------------
-- "master" change stream (include all tables in join)
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

-- pretty simple looking query
SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

The "shared" objects are basically the streaming schema (temptation is to create for all our topics, but that's another question) and the second portion is like the query schema. It ultimately is a functional, clean, and repeatable pattern.

Vaas answered 27/9, 2020 at 3:25 Comment(2)
Why do you create tables if you can join the streams directly?Clairclairaudience
Small mistake in the last select, it should be like this ``` SELECT uec.user_entity_id, ue.first_name, ui.business_unit, ui.cost_center FROM stream_user_changes uc LEFT JOIN ktable_user_entity_by_id ue ON uc.user_entity_id = ue.id LEFT JOIN ktable_user_info_by_user_entity_id ui ON uc.user_entity_id = ui.user_entity_id EMIT CHANGES; ``` Also the reason to use the table is to make sure to use the latest data from both join and also you don't need any window since the table only keep the last record of each stream.Malchus
S
0

i like your approach number 3. indeed i have tried to use that one to merge streams with different primary keys into one master stream and then grouping them in a materialized view.

the join seems to work, but i ended up having the same situation as in a regular stream-table join ... indeed i see changes in the master stream, but somehow those changes are only triggered downstream (to the group by table) when they affect the first table in the table-stream join and not the others.

So basically what i have achieved is the following:

  1. debezium --> create 3 streams: A,B,AB (AB is a matching table between ids in A and ids in B, used in postgres to make an n-to-n join)
  2. stream A,B,C are repartitioned by one id (A_id) and merged into one stream. in this step all elements of B get assigned a virtual A_id since it is not relevant to them
  3. the 3 KTables are created (i still keep wondering, why? is this a sort of self-join?)
  4. a materialized view (table) is created by grouping the master stream after the joining it to the 3 KTables

when a change in A,B, or AB happens, i see changes in the master stream too, but the materialized view is only updated when changes on stream B occur. Of course destroying the table and recreating it makes it "up-to-date".

are you facing the same problem?

Sabec answered 20/4, 2022 at 10:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.