Creating a KSQL Stream: How to extract value from complex json
Asked Answered
B

1

7

I am trying to create a stream in Apache/KAFKA KSQL The topic contains (somewhat complex JSON)

{
  "agreement_id": "dd8afdbe-59cf-4272-b640-b14a24d8234c",
  "created_at": "2018-02-17 16:00:00.000Z",
  "id": "6db276a8-2efe-4495-9908-4d3fc4cc16fa",
  "event_type": "data",
  "total_charged_amount": {
    "tax_free_amount": null,
    "tax_amounts": [],
    "tax_included_amount": {
      "amount": 0.0241,
      "currency": "EUR"
    }
  }
  "used_service_units": [
    {
      "amount": 2412739,
      "currency": null,
      "unit_of_measure": "bytes"
    }
  ]
}

Now creating a stream is easy for just simple stuff like event_type and created_at. That would be like this

CREATE STREAM tstream (event_type varchar, created_at varchar) WITH (kafka_topic='usage_events', value_format='json');

But now I need to access the used_service_units.... and I would like to extract the "amount" in the JSON above

How would I do this ?

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units[0].amount int) WITH (kafka_topic='usage_events', value_format='json');

Results in

line 1:78: mismatched input '[' expecting {'ADD', 'APPROXIMATE', ...

And if I instead create a stream like so

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units varchar) WITH (kafka_topic='usage_events', value_format='json');

And then does a SQL SELECT on the stream like this

SELECT EXTRACTJSONFIELD(used_service_units,'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units,'$[0].amount') FROM usage;

Neither of these alternatives work...

This one gave me

SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;'

Code generation failed for SelectValueMapper
Banksia answered 18/5, 2018 at 10:32 Comment(0)
B
6

It seems that ONE solution to this problem is to make the column datatype an array i.e.

CREATE STREAM usage (event_type varchar,created_at varchar, total_charged_amount varchar, used_service_units array<varchar> ) WITH (kafka_topic='usage_events', value_format='json');

Now I am able to do the following:

SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage
Banksia answered 18/5, 2018 at 12:9 Comment(4)
@RobinMoffatt, is this approach documented somewhere? Couldn't find it in the examples / recipes at confluent.io/product/ksql, but this was my exact question as well, so perhaps it's a more common requirement.Byars
@JochemSchulenklopper Happy to write it up but could do with understanding the specific question in more detail first - can you contact me? [email protected]. Thx.Oxfordshire
@RobinMoffatt, what would need to happen to that part if the JSON document contained more layers, more dicts, more items in an array, or dicts in arrays in dicts? In my case, I'm parsing the event data that's being sent from a webhook by a public SaaS provider, and the JSON document is quite leveled. I'd guess (hope perhaps) that there's some query language for expressing 'multilevel paths', like a variant of XPath for JSON, or CSS-like selectors against a JSON document in KSQL.... but I couldn't find the documentation in KSQL for such a thing. (A pointer to a document is equally valuable.)Byars
@JochemSchulenklopper saw an article on confluent about mildly nested JSON confluent.io/stream-processing-cookbook/ksql-recipes/…. It is too simplistic for my use, but it might help you.Pickmeup

© 2022 - 2024 — McMap. All rights reserved.