how to efficiently move data from Kafka to an Impala table?
Asked Answered
E

2

14

Here are the steps to the current process:

  1. Flafka writes logs to a 'landing zone' on HDFS.
  2. A job, scheduled by Oozie, copies complete files from the landing zone to a staging area.
  3. The staging data is 'schema-ified' by a Hive table that uses the staging area as its location.
  4. Records from the staging table are added to a permanent Hive table (e.g. insert into permanent_table select * from staging_table).
  5. The data, from the Hive table, is available in Impala by executing refresh permanent_table in Impala.

existing data flow

I look at the process I've built and it "smells" bad: there are too many intermediate steps that impair the flow of data.

About 20 months ago, I saw a demo where data was being streamed from an Amazon Kinesis pipe and was queryable, in near real-time, by Impala. I don't suppose they did something quite so ugly/convoluted. Is there a more efficient way to stream data from Kafka to Impala (possibly a Kafka consumer that can serialize to Parquet)?

I imagine that "streaming data to low-latency SQL" must be a fairly common use case, and so I'm interested to know how other people have solved this problem.

Experienced answered 25/1, 2016 at 23:54 Comment(2)
Here is an alternative architecture that seems has less hops and might be relatively fasterFunke
Does this help? blog.cloudera.com/blog/2015/11/…Munsey
H
2

If you need to dump your Kafka data as-is to HDFS the best option is using Kafka Connect and Confluent HDFS connector.

You can either dump the data to a parket file on HDFS you can load in Impala. You'll need I think you'll want to use a TimeBasedPartitioner partitioner to make parquet files every X miliseconds (tuning the partition.duration.ms configuration parameter).

Addign something like this to your Kafka Connect configuration might do the trick:

# Don't flush less than 1000 messages to HDFS
flush.size = 1000 

# Dump to parquet files   

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

partitioner.class = TimebasedPartitioner

# One file every hour. If you change this, remember to change the filename format to reflect this change
partition.duration.ms = 3600000
# Filename format
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm
Hambley answered 31/10, 2017 at 10:5 Comment(0)
A
0

Answering that question in year 2022, I would say that solution would be streaming messages from Kafka to Kudu and integrate Impala with Kudu, as it has already tight integration.

Here is example of Impala schema for Kudu:

CREATE EXTERNAL TABLE my_table
STORED AS KUDU
TBLPROPERTIES (
  'kudu.table_name' = 'my_kudu_table'
);

Apache Kudu supports SQL inserts and it uses own file format under the hood. Alternatively you could use Apache Phoenix which supports inserts and upserts (if you need exactly once semantic) and uses HBase under the hood.

As long as the Impala is your final way of accessing the data, you shouldn't care about underlaying formats.

Aluminate answered 25/11, 2022 at 8:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.