KSQL Windowed Aggregation Stream
Asked Answered
V

1

7

I am trying to group events by one of its properties and over time using the KSQL Windowed Aggregation, specifically the Session Window.

I have a STREAM made from a kafka topic with the TIMESTAMP property well specified.

When I try to create a STREAM with a Session Windowing with a query like:

CREATE STREAM SESSION_STREAM AS
SELECT ...
  FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
   GROUP BY ...;

I always get the error:

Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.

Is it possible to create a STREAM with a Windowed Aggregation?


When I try as suggested to create a TABLE and then a STREAM that contains all the session starting events, with a query like:

CREATE STREAM SESSION_START_STREAM AS
SELECT *
  FROM SESSION_TABLE
 WHERE WINDOWSTART=WINDOWEND;

KSQL informs me that:

KSQL does not support persistent queries on windowed tables

How to create a STREAM of events starting a session window in KSQL?

Vision answered 22/5, 2020 at 18:6 Comment(1)
I am using the cloud version of KSQL offered by confluent.cloudVision
F
4

Your create stream statement, if switched to a create table statement will create a table that is constantly being updated. The sink topic SESSION_STREAM will contain the stream of changes to the table, i.e. its changelog.

ksqlDB models this as a TABLE, because it has TABLE semantics, i.e. only a single row can exist in the table with any specific key. However, the changelog will contain the STREAM of changes that have been applied to the table.

If what you want is a topic containing all the sessions then something like this will create that:

-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT) 
    WITH (kafka_topic='data', value_format='json');

-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
  FROM DATA
WINDOW SESSION (5 SECONDS)
   GROUP BY USER_ID;

This will create a SESSIONS topic that contains the changes to the SESSIONS table: i.e. its changelog.

If you want to convert this into a stream of session start events, then unfortunately ksqlDB doesn't yet allow you to directly change create a stream from the table, but you can create a stream over the table's change log:

-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');

-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS 
    SELECT * FROM SESSION_STREAM 
    WHERE WINDOWSTART = WINDOWEND;

Note, with the upcoming 0.10 release you'll be able to name the key column in the SESSION_STREAM correctly:

CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
Ferbam answered 22/5, 2020 at 20:38 Comment(2)
Thanks for your answer Andrew, I found 2 problems when trying this solution on confluent.cloud: it is needed to specify both replica and partition and the topic name for the table changelog stream it is not the same as the table name but something like: pksxxx-yyyv1SESSION . I accept your answer as it works!Vision
@FilippoVitale's comment for that changelog topic is important. I met similar issue, I posted my answer at https://mcmap.net/q/1625717/-how-to-create-an-output-stream-changelog-based-on-a-table-in-ksql-correctlyBackhouse

© 2022 - 2024 — McMap. All rights reserved.