performant writes to apache Iceberg
Asked Answered
P

0

6

I'm trying to achieve performant record writes from pandas (or ideally Polars if possible) in a Python environment to our Apache Iceberg deployment (with hive metastore) directly, or via Trino query engine based on which is the most performant.

Given what I've already tried, my current record write throughput is still pretty rubbish. If someone can please point my head in the right direction of how to get performant write connections directly to iceberg (or via Trino if expected to work through a query engine)... as I'm also not seeing documentation for guidance.

My attempts as follows:

  • Originally via Trino python package to parse the content of a pandas df into batches of sql syntax queries. - clunky and slowest
  • Then digged and found a async Trino python package which would atleast allow me to make the calls with asyncio and benefit from call concurrency based on the number of trino workers deployed.
  • Utilized Trino's jdbc connector type, with underlying trino-jdbc.jar,and python's jaydebeapi package to see if a performance improvement can be expected with the underlying java runtime execution - data read successful, but write unsuccessful
  • Iceberg direct connection via pySpark - connection issues described under this post
  • pyIceberg as of version 0.4.0 still does not contain record writing functionality from what I could gather from documentation or digging in the package
  • Stumbled onto this approach with pandas.to_sql() method - best so far but still not nearly enough

Through one communication thread I also read how a dataframe can be uploaded to S3 directly as parquet file format, then from there possible to link up with iceberg's relevant catalog table via meta data calls (makes sense compression transfer wise)... but also haven't been able to get working this way round

The code snippet of pandas.sql() via Trino is the only one worth sharing:

import warnings
# Ignore all warnings
warnings.filterwarnings("ignore")
import logging
logging.getLogger().setLevel(logging.DEBUG)

from sqlalchemy import create_engine
from trino.auth import BasicAuthentication
import pandas as pd 
from datetime import datetime

trino_target_host = "..." 
trino_target_port = 443
trino_target_catalog = 'iceberg'
trino_target_schema = '...'

def write_content(table: str, df, batch_size: int):
    print(f"size: {len(df)}")
    try:
        start = datetime.utcnow()
        engine = create_engine(
            f"trino://{trino_target_host}:{trino_target_port}/{trino_target_catalog}/{trino_target_schema}",
            connect_args={
                "auth": BasicAuthentication("...", "..."),
                "http_scheme": "https"
            }
        )
        # chunksize = writing df in batches of size - saving memory
        # method = instead of writing a single record at a time, 'multi' will insert multiple rows as 1 statement
        output = df.to_sql(table, engine, if_exists='append', index=False, chunksize=batch_size, method='multi')
        print(f"elapsed: {datetime.utcnow() - start}")
        return output
    except Exception as e:
        if 'Request Entity Too Large' in str(e):
            print(f"Batch size '{batch_size}' too large. Reduce size")
            return None
        print(f"failed: {e}")
        return None  

sample = df[:70000].copy(deep=True)
write_content('table_name', sample, 1000)

size: 70000

EXECUTE IMMEDIATE not available for trino.dp.iotnxt.io:443; defaulting to legacy prepared statements (TrinoUserError(type=USER_ERROR, name=SYNTAX_ERROR, message="line 1:19: mismatched input ''SELECT 1''. Expecting: 'USING', ", query_id=20230629_161633_03350_rptf8))

elapsed: 0:14:05.315631

This is very deployment-specific, but just to give a relative idea in comparison:

count Read write (batch = 1000) write (batch = 1500)
10 3.67 / 3.7 / 3.6 secs
100 2.9 / 4.0 / 2.8 / 2.2 sec 4.4 / 4.2 / 4.1 secs
1000 2.2 / 3.8 / 2.4 secs 11.2 / 11.2 / 11.0 secs
10 000 3.2 / 2.8 / 2.9 secs 118.4 / 105.7 / 108.8 sec 98.2 / 104.6 / 108.5 secs
70 000 6.8 / 6.5 / 7.2 secs 845.31 secs
140 000 9.8 secs
Plutonian answered 6/7, 2023 at 14:34 Comment(4)
What do you think of writing everything to a Parquet file, and uploading that to a place where Trino can read it? Create a temp view on top of the Parquet file, and use that to insert it into the final table.Woermer
I'm open for that idea. Would like to minimize duplicative rewriting though, so was hoping to be able to copy the parquet file to s3 and that some pyIceberg meta data command would be available to just add that file to an existing table. Otherwise, I'm also looking at what you suggested in the meantime (trino.io/docs/current/connector/iceberg.html)Plutonian
Starting from 0.6.0 pyiceberg has some writing capabilities even if it's not full featured yet. I haven't tested it, but maybe it will be useful.Donegal
@Usernameless, thanks been through it recently. Able to write the data if you convert your pandas dataframe to a pyArrow frame, and then write. works great performance wise. Current limitation though, only able to write to unpartitioned tables (all datalake tables on production level is typically partitioned by day / month). Otherwise, happy in noting the interfacing progressPlutonian

© 2022 - 2024 — McMap. All rights reserved.