How to efficiently update Impala tables whose files are modified very frequently
Asked Answered
O

1

12

We have a Hadoop-based solution (CDH 5.15) where we are getting new files in HDFS in some directories. On top os those directories we have 4-5 Impala (2.1) tables. The process writing those files in HDFS is Spark Structured Streaming (2.3.1)

Right now, we are running some DDL queries as soon as we get the files written to HDFS:

  • ALTER TABLE table1 RECOVER PARTITONS to detect new partitions (and their HDFS directories and files) added to the table.

  • REFRESH table1 PARTITIONS (partition1=X, partition2=Y), using all the keys for each partition.

Right now, this DDL is taking a bit too long and they are getting queued in our system, damaging the data availability of the system.

So, my question is: Is there a way to do this data incorporation more efficiently?

We have considered:

  • Using the ALTER TABLE .. RECOVER PARTITONS but as per the documentation, it only refreshes new partitions.

  • Tried to use REFRESH .. PARTITON ... with multiple partitions at once, but the statement syntaxis does not allow to do that.

  • Tried batching the queries but the Hive JDBC drives does not support batching queries.

  • Shall we try to do those updates in parallel given that the system is already busy?

  • Any other way you are aware of?

Thanks!

Victor

Note: The way in which we know what partitions need refreshed is by using HDFS events as with Spark Structured Streaming we don´t know exactly when the files are written.

Note #2: Also, the files written in HDFS are sometimes small, so it would be great if it could be possible to merge those files at the same time.

Orangery answered 6/2, 2020 at 8:24 Comment(3)
Dont have an answer to your problem, sorry, but just wanted to mention that the new Impala version has added a "hands-off" metadata management feature. See impala.apache.org/docs/build/html/topics/impala_metadata.html.Societal
Thanks @mazaneicha! That seems very promising! We are planning to upgrade our stack so maybe that is yet another reason to do it.Orangery
Please check if msck repair command from hive will be useful.Intuitivism
O
5

Since nobody seems to have the answer for my problem, I would like to share the approach we took to make this processing more efficient, comments are very welcome.

We discovered (doc. is not very clear on this) that some of the information stored in the Spark "checkpoints" in HDFS is a number of metadata files describing when each Parquet file was written and how big was it:

$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata

w-r--r--   3 hdfs 68K   2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r--  3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r--   3 hdfs 68K   2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...

$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...

So, what we did was:

  • Build a Spark Streaming Job polling that _spark_metadata folder.
    • We use a fileStream since it allow us to define the file filter to use.
    • Each entry in that stream is one of those JSON lines, which is parsed to extract the file path and size.
  • Group the files by the parent folder (which maps to each Impala partition) they belong to.
  • For each folder:
    • Read a dataframe loading only the targeted Parquet files (to avoid race conditions with the other job writing the files)
    • Calculate how many blocks to write (using the size field in the JSON and a target block size)
    • Coalesce the dataframe to the desired number of partitions and write it back to HDFS
    • Execute the DDL REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
  • Finally, delete the source files

What we achieved is:

  • Limit the DDLs, by doing one refresh per partition and batch.

  • By having batch time and block size configurable, we are able to adapt our product to different deployment scenarios with bigger or smaller datasets.

  • The solution is quite flexible, since we can assign more or less resources to the Spark Streaming job (executors, cores, memory, etc.) and also we can start/stop it (using its own checkpointing system).

  • We are also studying the possibily of applying some data repartitioning, while doing this process, to have partitions as close as possible to the most optimum size.

Orangery answered 26/2, 2020 at 21:31 Comment(3)
Do you think the same can be achieved using KafkaConnect instead of Spark? In the last step "Finally, delete the source files", how did avoid removing new files coming in?Sheehan
The metadata filestream contains in each batch exactly the files to process, so you dont have to worry about new files that might have been written in the meantimeOrangery
As per KafkaConnect, I have never used, I will take a look to it. Thank you!Orangery

© 2022 - 2024 — McMap. All rights reserved.