Get nested fields from Kafka message using Apache Flink SQL
Asked Answered
O

4

9

I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects.

The documentation suggests that it should be a MAP type but when I set that, I get the following error

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP

Here is my SQL

        CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

And my JSON looks something like this:

{
  "id": "message-1",
  "title": "Some Title",
  "properties": {
    "foo": "bar"
  }
}
Ogburn answered 23/9, 2020 at 9:17 Comment(0)
A
9

You can use ROW to extract nested fields in your JSON messages. Your DDL statement would look something like:

CREATE TABLE input(
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );
Accession answered 24/9, 2020 at 8:27 Comment(2)
Excellent, this works! How about deeply nested JSON, seems like this could get unwieldy? Is there any better approaches for dealing with JSON data using PyFlink and SQL?Ogburn
There isn't a better way to do this for now, AFAIK. There is a plan to support SQL JSON functions in Flink SQL soon, though: cwiki.apache.org/confluence/pages/… This should make things considerably easier!Accession
B
8

[2022 Update]

In release Apache Flink 1.13 there is no system built-in JSON functions. They are introduced in 1.14 version. Check this

If you are using version <1.14, then see below solution.

How can I create table with nested JSON input ?

JSON input example:

{
    "id": "message-1",
    "title": "Some Title",
    "properties": {
        "foo": "bar",
        "nested_foo":{
            "prop1" : "value1",
            "prop2" : "value2"
        }
    }
}

create statement

CREATE TABLE input(
                id VARCHAR,
                title VARCHAR,
                properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );

How can I select nested columns?

SELECT properties.foo, properties.nested_foo.prop1 FROM input;

Note that if you output the results with

SELECT properties FROM input

You see the results in row format. The content of the column properties will be

+I[bar, +I[prop1,prop2]]
Behr answered 19/4, 2022 at 8:47 Comment(0)
B
2

If use the format=raw, you can use the JSON_VALUE function to extract the field of interest from payload: Here is the code below:

 CREATE TABLE input(
        payload STRING,
        foo AS JSON_VALUE(payload, '$.properties.foo' RETURNING STRING),
) WITH (
  'connector' = 'kafka',        
  'format' = 'raw'
)
Bois answered 21/2, 2023 at 9:35 Comment(0)
U
1

You may also try

CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP<STRING, STRING>
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

The only difference is: MAP<STRING, STRING> vs MAP

Untrimmed answered 26/2, 2021 at 7:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.