How can we define nested json properties (including arrays) using Flink SQL API?
Asked Answered
C

1

6

We have the following problem while using Flink SQL: we have configured Kafka Twitter connector to add tweets to Kafka and we want to read the tweets from Kafka in a table using Flink SQL.

How can we define nested json properties (including arrays) using Flink SQL API ?

We have tried the following, but it does not work (the values returned are empty):

CREATE TABLE kafka_tweets(
  payload ROW(`HashtagEntities` ARRAY[VARCHAR])
) WITH (
  'connector' = 'kafka',
  'topic' = 'twitter_status',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

In the twitter response HashtagEntities is an array of objects.

Catholicize answered 27/4, 2021 at 17:27 Comment(1)
Welcome to SO. Please read stackoverflow.com/editing-help#codeSecretin
H
1
CREATE TABLE `table` (
  `userid` BIGINT,
  `json_data` VARCHAR(2147483647),
  `request_id` AS JSON_VALUE(`json_data`, '$.request_id'),
  `items` ARRAY<ROW<`itemid` BIGINT, `shopid` BIGINT>>,
  `event_time` AS `TO_TIMESTAMP`(`FROM_UNIXTIME`(`timestamp`, 'yyyy-MM-dd HH:mm:ss')),
  `version` AS `TO_TIMESTAMP`(`FROM_UNIXTIME`(`timestamp`, 'yyyy-MM-dd')),
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' MINUTE
)
Hashimoto answered 15/2, 2022 at 6:40 Comment(1)
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Lytic

© 2022 - 2024 — McMap. All rights reserved.