Filtering a spark partitioned table is not working in Pyspark
Asked Answered
E

2

7

I am using spark 2.3 and have written one dataframe to create hive partitioned table using dataframe writer class method in pyspark.

newdf.coalesce(1).write.format('orc').partitionBy('veh_country').mode("overwrite").saveAsTable('emp.partition_Load_table')

Here is my table structure and partitions information.

hive> desc emp.partition_Load_table;
OK
veh_code                varchar(17)
veh_flag                varchar(1)
veh_model               smallint
veh_country             varchar(3)

# Partition Information
# col_name              data_type               comment

veh_country              varchar(3)

hive> show partitions partition_Load_table;
OK
veh_country=CHN
veh_country=USA
veh_country=RUS

Now I am reading this table back in pyspark inside a dataframe.

    df2_data = spark.sql("""
    SELECT * 
    from udb.partition_Load_table
    """);

df2_data.show() --> is working

But I am not able to filter it using partition key column

from pyspark.sql.functions import col
newdf = df2_data.where(col("veh_country")=='CHN')

I am getting below error message:

: java.lang.RuntimeException: Caught Hive MetaException attempting to get partition metadata by filter from Hive. 
You can set the Spark configuration setting spark.sql.hive.manageFilesourcePartitions to false to work around this problem, 
however this will result in degraded performance. Please report a bug: https://issues.apache.org/jira/browse/SPARK
Caused by: MetaException(message:Filtering is supported only on partition keys of type string)

whereas when I am creating dataframe by specifying the hdfs absolute path of table. filter and where clause is working as expected.

newdataframe = spark.read.format("orc").option("header","false").load("hdfs/path/emp.db/partition_load_table")

below is working

newdataframe.where(col("veh_country")=='CHN').show()

my question is that why it was not able to filter the dataframe in first place. and also why it's throwing an error message " Filtering is supported only on partition keys of type string " even though my veh_country is defined as string or varchar datatypes.

Equimolecular answered 19/11, 2018 at 10:47 Comment(0)
T
5

I have stumbled on this issue also. What helped for me was to do this line:

spark.sql("SET spark.sql.hive.manageFilesourcePartitions=False")

and then use spark.sql(query) instead of using dataframe.

I do not know what happens under the hood, but this solved my problem.

Although it might be too late for you (since this question was asked 8 months ago), this might help for other people.

Typhoeus answered 5/8, 2019 at 11:32 Comment(3)
Thanks . :-) I will check and let you knowEquimolecular
Thanks for your effort actually I am notable to recreate the issue further, even if I turned off this property. I don't know the reason what it had happened at that time and how it get fixed. Thanks for your time.Equimolecular
a vote up for your efforts but I cannot accept it as an answer. looks like this property has something to do with this issue. though the issue is not recurring anymore. issues.apache.org/jira/browse/SPARK-18680Equimolecular
O
4

I know the topic is quite old but:

  1. I've received same error but the actual source problem was hidden much deeper in logs. If you facing same problem as me, go to the end of your stack trace and you might find actual reason for job to be failing. In my case:
    a. org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:865)\n\t... 142 more\nCaused by: MetaException(message:Rate exceeded (Service: AWSGlue; Status Code: 400; Error Code: ThrottlingException ... which basically means I've exceeded AWS Glue Data Catalog quota OR:
    b. MetaException(message:1 validation error detected: Value '(<my filtering condition goes here>' at 'expression' failed to satisfy constraint: Member must have length less than or equal to 2048 which means that filtering condition I've put in my dataframe definition is too long
    Long story short, deep dive into your logs because reason for your error might be really simple, just the top message is far from being clear.
  2. If you are working with tables that has huge number of partitions (in my case hundreds of thousands) I would strongly recommend against setting spark.sql.hive.manageFilesourcePartitions=False . Yes, it will resolve the issue but the performance degradation is enormous.
Overmodest answered 7/11, 2022 at 10:23 Comment(1)
Just want to add how incredibly helpful this comment is and how it ought to be upvoted. 1. setting spark.sql.hive.manageFilesourcePartitions=False is indeed a highly bad idea, especilly if you work with large data. 2. my log was telling me I was using the wrong type to filter among partitions. Namely, I have a file partitioned by date, and since date and strings are interchangeable in Spark, I wouldn't even think that line was causing a problem.Nickels

© 2022 - 2024 — McMap. All rights reserved.