Consuming nested JSON message from Kafka with ClickHouse
Asked Answered
U

2

9

Clickhouse can definitely read JSON messages from Kafka if they are flat JSON documents.

We indicate this with kafka_format = 'JSONEachRow' in Clickhouse.

This is the way we currently using it:

CREATE TABLE topic1_kafka
(
    ts Int64,
    event String,
    title String,
    msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1test.intra:9092,kafka2test.intra:9092,kafka3test.intra:9092',
kafka_topic_list = 'topic1', kafka_num_consumers = 1, kafka_group_name = 'ch1', 
kafka_format = 'JSONEachRow'

This is fine as long as producers send flat JSON to topic1_kafka. But not all producers send flat JSON, most of the applications generate nested JSON documents like this:

{
  "ts": 1598033988,
  "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663",
  "location": [39.920515, 32.853708],
  "stats": {
    "temp": 71.2,
    "total_memory": 32,
    "used_memory": 21.2
  }
}

Unfortunately the JSON document above is not compatible with JSONEachRow, therefore ClickHouse cannot map fields in the JSON document to columns in the table.

Is there any way to do this mapping?

EDIT: We want to map the nested json to a flat table like this:

CREATE TABLE topic1
(
    ts Int64,
    deviceId String,
    location_1 Float64,
    location_2 Float64,
    stats_temp Float64,
    stats_total_memory Float64,
    stats_used_memory Float64
) ENGINE = MergeTree()
Urtication answered 21/8, 2020 at 18:30 Comment(0)
F
4

I had the same problem and I was able to solve it using JSONAsString format. In this format, a single JSON object is interpreted as a single value. This format can only be parsed for a table with a single field of type String.

CREATE TABLE topic1_kafka (
  data String
) 
ENGINE = Kafka
SETTINGS 
    kafka_format = 'JSONAsString',
    ...


CREATE MATERIALIZED VIEW topic1_mv TO topic1 AS
SELECT 
    JSONExtractInt(data, 'ts') AS ts, 
    JSONExtractString(data, 'deviceId') AS deviceId, 
    JSONExtractFloat(data, 'location', 1) AS location_1,  -- arrays are indexed from 1
    JSONExtractFloat(data, 'location', 2) AS location_2,
    JSONExtractFloat(data, 'stats', 'temp') AS stats_temp, 
    JSONExtractFloat(data, 'stats', 'total_memory') AS stats_total_memory, 
    JSONExtractFloat(data, 'stats', 'memory') AS stats_used_memory 
FROM topic1_kafka

See also:

Fusiform answered 6/6, 2023 at 21:4 Comment(0)
C
7

It looks like the once way is getting 'raw' data as String and then process each row using JSON functions in Consumer Materialized View.

WITH '{"ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": 71.2, "total_memory": 32, "used_memory": 21.2 }}' AS raw
SELECT 
  JSONExtractUInt(raw, 'ts') AS ts,
  JSONExtractString(raw, 'deviceId') AS deviceId,
  arrayMap(x -> toFloat32(x), JSONExtractArrayRaw(raw, 'location')) AS location,
  JSONExtract(raw, 'stats', 'Tuple(temp Float64, total_memory Float64, used_memory Float64)') AS stats,
  stats.1 AS temp,
  stats.2 AS total_memory,
  stats.3 AS used_memory;

/*
┌─────────ts─┬─deviceId─────────────────────────────┬─location──────────────┬─stats────────────────────────┬─temp─┬─total_memory─┬────────used_memory─┐
│ 1598033988 │ cf060111-dbe6-4aa8-a2d0-d5aa17f45663 │ [39.920513,32.853706] │ (71.2,32,21.200000000000003) │ 71.2 │           32 │ 21.200000000000003 │
└────────────┴──────────────────────────────────────┴───────────────────────┴──────────────────────────────┴──────┴──────────────┴────────────────────┘
*/

Remark: for numbers with floating point should be used type Float64 not Float32 (see related CH Issue 13962).


Using the standard data types required changing the schema of JSON:

  1. represent stats as Tuple
CREATE TABLE test_tuple_field
(
    ts Int64,
    deviceId String,
    location Array(Float32),
    stats Tuple(Float32, Float32, Float32)
) ENGINE = MergeTree()
ORDER BY ts;


INSERT INTO test_tuple_field FORMAT JSONEachRow 
{ "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": [71.2, 32, 21.2]};
  1. represent stats as Nested Structure
CREATE TABLE test_nested_field
(
    ts Int64,
    deviceId String,
    location Array(Float32),
    stats Nested (temp Float32, total_memory Float32, used_memory Float32)
) ENGINE = MergeTree()
ORDER BY ts;


SET input_format_import_nested_json=1;
INSERT INTO test_nested_field FORMAT JSONEachRow 
{ "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": [71.2], "total_memory": [32], "used_memory": [21.2] }};

See the related answer ClickHouse JSON parse exception: Cannot parse input: expected ',' before.

Curarize answered 22/8, 2020 at 1:44 Comment(9)
What format should I specify in kafka_format when creating a clickhouse table with Kafka Engine?Urtication
I would try to test it for TSVRaw, TSV, CSV.Curarize
the problem is, when using CSV, if the message contains a comma, which definitely does, then clickhouse thinks its a different column. Therefore cannot parse the message. Same thing also applies for TSV, the message cannot contain a tab character.Urtication
probably it is impossible ;( , try to check each format (the full list: select * from system.formats)Curarize
look at virtual columns; in Altinity Kafka FAQ mentioned _raw_message - that may help (if it exist, in source code I don't see ref to it github.com/ClickHouse/ClickHouse/blob/…).Curarize
_raw_message is not implemented yet :(Urtication
try to use TSVRaw-formatter - select '{"a": 2, "b": "\t aa "}' format TSVRawCurarize
Interestingly, while TSV and CSV works as kafka_format, TSVRaw doesn't seem to work. Investigating...Urtication
Opened and issue for this.Urtication
F
4

I had the same problem and I was able to solve it using JSONAsString format. In this format, a single JSON object is interpreted as a single value. This format can only be parsed for a table with a single field of type String.

CREATE TABLE topic1_kafka (
  data String
) 
ENGINE = Kafka
SETTINGS 
    kafka_format = 'JSONAsString',
    ...


CREATE MATERIALIZED VIEW topic1_mv TO topic1 AS
SELECT 
    JSONExtractInt(data, 'ts') AS ts, 
    JSONExtractString(data, 'deviceId') AS deviceId, 
    JSONExtractFloat(data, 'location', 1) AS location_1,  -- arrays are indexed from 1
    JSONExtractFloat(data, 'location', 2) AS location_2,
    JSONExtractFloat(data, 'stats', 'temp') AS stats_temp, 
    JSONExtractFloat(data, 'stats', 'total_memory') AS stats_total_memory, 
    JSONExtractFloat(data, 'stats', 'memory') AS stats_used_memory 
FROM topic1_kafka

See also:

Fusiform answered 6/6, 2023 at 21:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.