How to create an output stream (changelog) based on a table in KSQL correctly?
Asked Answered
L

0

4

Step 1: Create table

I currently have a table in KSQL which created by

CREATE TABLE cdc_window_table
    WITH (KAFKA_TOPIC='cdc_stream',
          VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
       COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;

At this point, it created a new table. I can view it by

SELECT *
FROM cdc_window_table
EMIT CHANGES;

which returns data like

+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART    |WINDOWEND      |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1             |1648767460000  |1648767480000  |1                    |
|a1             |1648767460000  |1648767480000  |2                    |
|a1             |1648767460000  |1648767480000  |3                    |
|a1             |1648767480000  |1648767500000  |1                    |
|a1             |1648767740000  |1648767760000  |1                    |

Step 2: Create output stream (changelog) - FAILED

I am trying to create an output stream (changelog) based on this table like this image:

(Image source: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)

enter image description here

After reading this, I tried these 4 methods:

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
                                                 application_id_count INT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON');

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='TUMBLING',
         WINDOW_SIZE='20 SECONDS');

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');

CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');

When I view by

SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;

It only shows table header without any changelog data coming:

+------------------+-----------------------+
|APPLICATION_ID    |APPLICATION_ID_COUNT   |
+------------------+-----------------------+

What would be correct way to create an output stream (changelog) based on a table?

Lindesnes answered 1/4, 2022 at 19:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.