pyspark - getting Latest partition from Hive partitioned column logic
Asked Answered
G

2

7

I am new to pySpark. I am trying get the latest partition (date partition) of a hive table using PySpark-dataframes and done like below. But I am sure there is a better way to do it using dataframe functions (not by writing SQL). Could you please share inputs on better ways.

This solution is scanning through entire data on Hive table to get it.

df_1 = sqlContext.table("dbname.tablename");

df_1_dates = df_1.select('partitioned_date_column').distinct().orderBy(df_1['partitioned_date_column'].desc())

lat_date_dict=df_1_dates.first().asDict()

lat_dt=lat_date_dict['partitioned_date_column']
Gallinule answered 7/3, 2019 at 21:40 Comment(2)
Use show partitions dbname.tablename and pick the last row of the dataframe that is returned to get the latest partition.Villanovan
Just FYI, there is an issue on the Spark tracker about this. SPARK-12890: "Spark SQL query related to only partition fields should not scan the whole data."Assimilable
D
5

I agree with @philantrovert what has mentioned in the comment. You can use below approach for partition pruning to filter to limit the number of partitions scanned for your hive table.

>>> spark.sql("""show partitions test_dev_db.newpartitiontable""").show();
+--------------------+
|           partition|
+--------------------+
|tran_date=2009-01-01|
|tran_date=2009-02-01|
|tran_date=2009-03-01|
|tran_date=2009-04-01|
|tran_date=2009-05-01|
|tran_date=2009-06-01|
|tran_date=2009-07-01|
|tran_date=2009-08-01|
|tran_date=2009-09-01|
|tran_date=2009-10-01|
|tran_date=2009-11-01|
|tran_date=2009-12-01|
+--------------------+

>>> max_date=spark.sql("""show partitions test_dev_db.newpartitiontable""").rdd.flatMap(lambda x:x).map(lambda x : x.replace("tran_date=","")).max()
>>> print max_date
2009-12-01
>>> query = "select city,state,country from test_dev_db.newpartitiontable where tran_date ='{}'".format(max_date)

>>> spark.sql(query).show();
+--------------------+----------------+--------------+
|                city|           state|       country|
+--------------------+----------------+--------------+
|         Southampton|         England|United Kingdom|
|W Lebanon        ...|              NH| United States|
|               Comox|British Columbia|        Canada|
|           Gasperich|      Luxembourg|    Luxembourg|
+--------------------+----------------+--------------+

>>> spark.sql(query).explain(True)
== Parsed Logical Plan ==
'Project ['city, 'state, 'country]
+- 'Filter ('tran_date = 2009-12-01)
   +- 'UnresolvedRelation `test_dev_db`.`newpartitiontable`

== Analyzed Logical Plan ==
city: string, state: string, country: string
Project [city#9, state#10, country#11]
+- Filter (tran_date#12 = 2009-12-01)
   +- SubqueryAlias newpartitiontable
      +- Relation[city#9,state#10,country#11,tran_date#12] orc

== Optimized Logical Plan ==
Project [city#9, state#10, country#11]
+- Filter (isnotnull(tran_date#12) && (tran_date#12 = 2009-12-01))
   +- Relation[city#9,state#10,country#11,tran_date#12] orc

== Physical Plan ==
*(1) Project [city#9, state#10, country#11]
+- *(1) FileScan orc test_dev_db.newpartitiontable[city#9,state#10,country#11,tran_date#12] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 1, PartitionFilters: [isnotnull(tran_date#12), (tran_date#12 = 2009-12-01)], PushedFilters: [], ReadSchema: struct<city:string,state:string,country:string>

you can see in above plan that PartitionCount: 1 it has scanned only one partition from 12 available partitions.

Dopp answered 10/8, 2019 at 8:59 Comment(0)
A
2

Building on Vikrant's answer, here is a more general way of extracting partition column values directly from the table metadata, which avoids Spark scanning through all the files in the table.

First, if your data isn't already registered in a catalog, you'll want to do that so Spark can see the partition details. Here, I'm registering a new table named data.

spark.catalog.createTable(
    'data',
    path='/path/to/the/data',
    source='parquet',
)
spark.catalog.recoverPartitions('data')
partitions = spark.sql('show partitions data')

To show a self-contained answer, however, I'll manually create the partitions DataFrame so you can see what it would look like, along with the solution for extracting a specific column value from it.

from pyspark.sql.functions import (
    col,
    regexp_extract,
)

partitions = (
    spark.createDataFrame(
        [
            ('/country=usa/region=ri/',),
            ('/country=usa/region=ma/',),
            ('/country=russia/region=siberia/',),
        ],
        schema=['partition'],
    )
)
partition_name = 'country'

(
    partitions
    .select(
        'partition',
        regexp_extract(
            col('partition'),
            pattern=r'(\/|^){}=(\S+?)(\/|$)'.format(partition_name),
            idx=2,
        ).alias(partition_name),
    )
    .show(truncate=False)
)

The output of this query is:

+-------------------------------+-------+
|partition                      |country|
+-------------------------------+-------+
|/country=usa/region=ri/        |usa    |
|/country=usa/region=ma/        |usa    |
|/country=russia/region=siberia/|russia |
+-------------------------------+-------+

The solution in Scala will look very similar to this, except the call to regexp_extract() will look slightly different:

    .select(
        regexp_extract(
            col("partition"),
            exp=s"(\\/|^)${partitionName}=(\\S+?)(\\/|$$)",
            groupIdx=2
        ).alias(partitionName).as[String]
    )

Again, the benefit of querying partition values in this way is that Spark will not scan all the files in the table to get you the answer. If you have a table with tens or hundreds of thousands of files in it, your time savings will be significant.

Assimilable answered 14/1, 2021 at 17:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.