I'm trying to achieve performant record writes from pandas (or ideally Polars if possible) in a Python environment to our Apache Iceberg deployment (with hive metastore) directly, or via Trino query engine based on which is the most performant.
Given what I've already tried, my current record write throughput is still pretty rubbish. If someone can please point my head in the right direction of how to get performant write connections directly to iceberg (or via Trino if expected to work through a query engine)... as I'm also not seeing documentation for guidance.
My attempts as follows:
- Originally via Trino python package to parse the content of a pandas df into batches of sql syntax queries. - clunky and slowest
- Then digged and found a async Trino python package which would atleast allow me to make the calls with asyncio and benefit from call concurrency based on the number of trino workers deployed.
- Utilized Trino's jdbc connector type, with underlying trino-jdbc.jar,and python's jaydebeapi package to see if a performance improvement can be expected with the underlying java runtime execution - data read successful, but write unsuccessful
- Iceberg direct connection via pySpark - connection issues described under this post
- pyIceberg as of version 0.4.0 still does not contain record writing functionality from what I could gather from documentation or digging in the package
- Stumbled onto this approach with pandas.to_sql() method - best so far but still not nearly enough
Through one communication thread I also read how a dataframe can be uploaded to S3 directly as parquet file format, then from there possible to link up with iceberg's relevant catalog table via meta data calls (makes sense compression transfer wise)... but also haven't been able to get working this way round
The code snippet of pandas.sql() via Trino is the only one worth sharing:
import warnings
# Ignore all warnings
warnings.filterwarnings("ignore")
import logging
logging.getLogger().setLevel(logging.DEBUG)
from sqlalchemy import create_engine
from trino.auth import BasicAuthentication
import pandas as pd
from datetime import datetime
trino_target_host = "..."
trino_target_port = 443
trino_target_catalog = 'iceberg'
trino_target_schema = '...'
def write_content(table: str, df, batch_size: int):
print(f"size: {len(df)}")
try:
start = datetime.utcnow()
engine = create_engine(
f"trino://{trino_target_host}:{trino_target_port}/{trino_target_catalog}/{trino_target_schema}",
connect_args={
"auth": BasicAuthentication("...", "..."),
"http_scheme": "https"
}
)
# chunksize = writing df in batches of size - saving memory
# method = instead of writing a single record at a time, 'multi' will insert multiple rows as 1 statement
output = df.to_sql(table, engine, if_exists='append', index=False, chunksize=batch_size, method='multi')
print(f"elapsed: {datetime.utcnow() - start}")
return output
except Exception as e:
if 'Request Entity Too Large' in str(e):
print(f"Batch size '{batch_size}' too large. Reduce size")
return None
print(f"failed: {e}")
return None
sample = df[:70000].copy(deep=True)
write_content('table_name', sample, 1000)
size: 70000
EXECUTE IMMEDIATE not available for trino.dp.iotnxt.io:443; defaulting to legacy prepared statements (TrinoUserError(type=USER_ERROR, name=SYNTAX_ERROR, message="line 1:19: mismatched input ''SELECT 1''. Expecting: 'USING', ", query_id=20230629_161633_03350_rptf8))
elapsed: 0:14:05.315631
This is very deployment-specific, but just to give a relative idea in comparison:
count | Read | write (batch = 1000) | write (batch = 1500) |
---|---|---|---|
10 | 3.67 / 3.7 / 3.6 secs | ||
100 | 2.9 / 4.0 / 2.8 / 2.2 sec | 4.4 / 4.2 / 4.1 secs | |
1000 | 2.2 / 3.8 / 2.4 secs | 11.2 / 11.2 / 11.0 secs | |
10 000 | 3.2 / 2.8 / 2.9 secs | 118.4 / 105.7 / 108.8 sec | 98.2 / 104.6 / 108.5 secs |
70 000 | 6.8 / 6.5 / 7.2 secs | 845.31 secs | |
140 000 | 9.8 secs |