spark 2.4 Parquet column cannot be converted in file, Column: [Impressions], Expected: bigint, Found: BINARY
Asked Answered
M

3

11

I'm facing a weird issue that I cannot understand.

I have source data with a column "Impressions" that is sometimes a bigint / sometimes a string (when I manually explore the data).

The HIVE schema registered for this column is of Long.

Thus, when loading the data:

spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW adwords_ads_agg_Yxz AS

SELECT
    a.customer_id
    , a.Campaign
    , ...
    , SUM(BIGINT(a.Impressions)) as Impressions
    , SUM(BIGINT(a.Cost))/1000000 as Cost
FROM adwords_ad a
LEFT JOIN ds_ad_mapping m ON BIGINT(a.Ad_ID) = BIGINT(m.adEngineId) AND a.customer_id = m.reportAccountId
WHERE a.customer_id in (...)
AND a.day >= DATE('2019-02-01')
GROUP BY
    a.customer_id
    , ...
""")

I'm making sure that everything get converted to BIGINT. The error happens later on, on the step:

spark.sql("CACHE TABLE adwords_ads_agg_Yxz")

After seeing this error, I ran the same code in a notebook and tried to have more debug, first of all by making sure that the conversion happens to the column for BIGINT / long:

from pyspark.sql.types import LongType

df = df.withColumn("Impressions", f.col("Impressions").cast(LongType()))
df.createOrReplaceTempView('adwords_ads_agg_Yxz')

and then printing the schema from this freshly converted df:

root
 |-- customer_id: long (nullable = true)
 |-- Campaign: string (nullable = true)
 |-- MatchType: string (nullable = true)
 |-- League: string (nullable = false)
 |-- Ad_Group: string (nullable = true)
 |-- Impressions: long (nullable = true) <- Here!
 |-- Cost: double (nullable = true)

and then doing the caching, but the error remains:

Spark Job Progress An error occurred while calling o84.sql. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 47.0 failed 4 times, most recent failure: Lost task 9.3 in stage 47.0 (TID 2256, ip-172-31-00-00.eu-west-1.compute.internal, executor 10): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file s3a://bucket/prod/reports/adwords_ad/customer_id=1111111/date=2019-11-21/theparquetfile.snappy.parquet. Column: [Impressions], Expected: bigint, Found: BINARY

Has anyone encounter this problem and / or would have an idea what is causing this?

If I remove the caching, the error will happen when trying to write the data to parquet. I don't know either why it is mentioning adwords_ad table at this point when I'm trying to refresh / write a temporary table

Marc answered 28/11, 2019 at 21:5 Comment(5)
interesting to get a -1 without explanationsMarc
you can not cast binary directly to bigint, try to convert it to StringType first: BIGINT(string(a.Impressions)).Temporal
@Temporal unfortunately that changed nothing..Marc
weirdly enough, I have 2 tables using the Impressions / Cost cols (that are signalled as BINARY) and this transformation only worked for one of the two. Not matter what I do with the second one, it still says it's a binary column.Marc
spark transformations are lazily evaluated. errors only shows when you take an action(i.e. cache, write). I guess that's why you got a down-vote. I think you should focus on the SQL and/or issues from your source data.Temporal
K
3

When using hive table over parquet, and then read it using SPARK, SPARK takes the schema of the parquet and not of the hive table defenition.

it make sense that into ur parquet files schema Impressions is a BINARY, and it doesnt matter that in the hive table its Long, because spark take the schema from the parquet file.

Kerchief answered 30/11, 2019 at 22:13 Comment(2)
if casting to string doesnt resolve the problem, you should write a custom UDF that convert BinaryType(actually its just alias for bytearray in python) to String.Kerchief
interesting, thank you for your help, I'll try thatMarc
G
3

This mostly happens when columns in .parquet files are in double or float

One liner answer, set

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

TL;DR

When we read data using spark, specially parquet data

data = spark.read.parquet(source_path)

Spark tries to optimize and read data in vectorized format from the .parquet files. And even if we do explicit data type casting,

new_data = data.withColumn(col_name, col(col_name).cast(TimestampType()))

spark will use native data types in parquet(whatever original data type was there in .parquet files) during runtime.

This causes issue while writing data due to mismatch in data and column type

To resolve this issue, disable vectorized reader.

To know about Vectorized reader, refer below

The vectorized Parquet reader enables native record-level filtering using push-down filters, improving memory locality, and cache utilization. If you disable the vectorized Parquet reader, there may be a minor performance impact. You should only disable it, if you have decimal type columns in your source data.

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

Gib answered 15/1 at 8:14 Comment(0)
E
2

Try disabling the vectorized parquet reader as mentioned in below troubleshoot guide:

https://learn.microsoft.com/en-us/azure/databricks/kb/scala/spark-job-fail-parquet-column-convert

Echinoid answered 19/5, 2022 at 9:5 Comment(2)
this doesn't work, Spark still throws exception when I tried to cache that data.Divisionism
It worked in my environment.Thymol

© 2022 - 2024 — McMap. All rights reserved.