Is there a way to consume a Kafka Ksql Push query from .NET
Asked Answered
A

2

7

I'm processing a large volume of Kafka messages in .NET with a Kafka consumer at the moment.

Step 1 in my processing is to Parse the JSON and to discard many of the messages based on the value of a specific field in the JSON.

I'd like to not process (and specifically, not download) those unwanted messages in the first place.

It looks like a kSql query - written as a push query - can effectively filter out the messages I need to process.

How can I consume these via .NET though? I saw some documents mentioning a REST API, but I doubt that this is a good idea, I need to process in excess of 100 000 records per minute at peak times of day.( If I can selectively download and process messages, I will only be processing about one third of the current volume.)

Unfortunately I don't have control over the publisher, so I can't change what/how the messages are published.

Axel answered 18/5, 2020 at 22:11 Comment(0)
E
2

Yes, you can use ksqlDB to do this

-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT) 
  WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');

-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
  SELECT * FROM my_source WHERE COL1='FOO';

Then using the REST API from within your application run a push query which will consume just the filtered messages:

SELECT * FROM target EMIT CHANGES;

Aside from ksqlDB, you might also want to have a look at this recently released project from the community: https://github.com/LGouellec/kafka-streams-dotnet

Exultant answered 19/5, 2020 at 8:19 Comment(1)
@Robbin Moffatt. Thanks. I saw the streams project you mentioned, but it looks like very early days. Still - worth a shot. Do you think processing huge volumes at high speed is really feasible via a REST API? It kinda seems a little bit dodgy to me - but I'm no expert at this. (mostly I'm worried about the speed if I have to make loads of REST calls)Axel
L
4

You can use ksqldb Linq provider in the following manner.

Install package with Nuget package manager:

Install-Package ksqlDB.RestApi.Client

Create query with C# (.NET):

var ksqlDbUrl = @"http://localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);
      
await using var context = new KSqlDBContext(contextOptions);

using var subscription = context.CreateQueryStream<Message>() //stream name
  .Where(p => p.RowTime >= 1510923225000) // add your own conditions
  //....
  .Select(l => new { l.Id, l.Message, l.RowTime })
  .Subscribe(onNext: message =>
  {
  }, onError: error => {  }, onCompleted: () => { });

The above C# code is equivalent to the following ksql:

SELECT Id, Message, RowTime FROM Messages WHERE RowTime >= 1510923225000 EMIT CHANGES;

Projects Wiki for more operators.

Lifeblood answered 5/2, 2021 at 15:19 Comment(1)
What's the reason that all the examples seem to have "http:\\" instead of the usual "http://"?Goaltender
E
2

Yes, you can use ksqlDB to do this

-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT) 
  WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');

-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
  SELECT * FROM my_source WHERE COL1='FOO';

Then using the REST API from within your application run a push query which will consume just the filtered messages:

SELECT * FROM target EMIT CHANGES;

Aside from ksqlDB, you might also want to have a look at this recently released project from the community: https://github.com/LGouellec/kafka-streams-dotnet

Exultant answered 19/5, 2020 at 8:19 Comment(1)
@Robbin Moffatt. Thanks. I saw the streams project you mentioned, but it looks like very early days. Still - worth a shot. Do you think processing huge volumes at high speed is really feasible via a REST API? It kinda seems a little bit dodgy to me - but I'm no expert at this. (mostly I'm worried about the speed if I have to make loads of REST calls)Axel

© 2022 - 2024 — McMap. All rights reserved.