Overwrite parquet file with pyarrow in S3
Asked Answered
T

3

19

I'm trying to overwrite my parquet files with pyarrow that are in S3. I've seen the documentacion and I haven't found anything.

Here is my code:

from s3fs.core import S3FileSystem
import pyarrow as pa
import pyarrow.parquet as pq

s3 = S3FileSystem(anon=False)
output_dir = "s3://mybucket/output/my_table"

my_csv = pd.read_csv(file.csv)
my_table = pa.Table.from_pandas(my_csv , preserve_index=False)

pq.write_to_dataset(my_table, 
                    output_dir,
                    filesystem=s3,
                    use_dictionary=True,
                    compression='snappy')

Is there something like mode = "overwrite" option in write_to_dataset function?

Trotta answered 30/8, 2018 at 11:22 Comment(0)
T
3

Here's a solution using pyarrow.parquet (need version 8+! see docs regarding arg: "existing_data_behavior") and S3FileSystem.

Now decide if you want to overwrite partitions or parquet part files which often compose those partitions.

Overwrite single .parquet file

pq.write_to_dataset(
    my_table, 
    root_path='bucket/mydata/year=2022/data_part001.parquet',
    filesystem=s3,
    existing_data_behavior="overwrite_or_ignore"
)

Overwrite .parquet files with common basename within each partition

pq.write_to_dataset(
    my_table, 
    root_path='bucket/mydata',
    partition_cols=['year'],
    basename_template='data_part001.parquet',
    filesystem=s3,
    existing_data_behavior="overwrite_or_ignore"
)

Overwriting existing partitions that match new records

If some of your new records belong to a partition that already exists, that entire partition will be overwritten and new partitions will be added with:

pq.write_to_dataset(
    my_table,
    root_path='bucket/mydata',
    partition_cols=['year'],
    filesystem=s3,
    existing_data_behavior="delete_matching"
)
Tubate answered 7/7, 2022 at 8:11 Comment(0)
A
2

I think the best way to do it is with AWS Data Wrangler that offers 3 differents write modes:

  1. append
  2. overwrite
  3. overwrite_partitions

Example:

import awswrangler as wr

wr.s3.to_parquet(
    dataframe=df,
    path="s3://...",
    mode="overwrite",
    dataset=True,
    database="my_database",  # Optional, only with you want it available on Athena/Glue Catalog
    table="my_table",
    partition_cols=["PARTITION_COL_NAME"])
Annabelannabela answered 10/1, 2020 at 13:0 Comment(0)
P
0

Sorry, there's no a such option yet but the way I work around it is using boto3 to delete the files before writing them.

import boto3
resource = boto3.resource('s3')
resource.Bucket('mybucket').objects.filter(Prefix='output/my_table').delete()
Pampas answered 23/5, 2019 at 0:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.