Increase Connection Pool Size
Asked Answered
R

2

9

We are running the following code to upload to GCP Buckets in parallel. It seems we are quickly using up all the connections in the pool based on the warnings we are seeing. Is there any way to configure the connection pool the library is using?

def upload_string_to_bucket(content: str):
        blob = bucket.blob(cloud_path)
        blob.upload_from_string(content)

with concurrent.futures.ThreadPoolExecutor() as executor:
            executor.map(upload_string_to_bucket, content_list)
WARNING:urllib3.connectionpool:Connection pool is full, discarding connection: www.googleapis.com
WARNING:urllib3.connectionpool:Connection pool is full, discarding connection: www.googleapis.com
WARNING:urllib3.connectionpool:Connection pool is full, discarding connection: www.googleapis.com
WARNING:urllib3.connectionpool:Connection pool is full, discarding connection: www.googleapis.com
WARNING:urllib3.connectionpool:Connection pool is full, discarding connection: www.googleapis.com
WARNING:urllib3.connectionpool:Connection pool is full, discarding connection: www.googleapis.com
Renie answered 4/10, 2018 at 18:57 Comment(0)
R
3

a bit late to the party, but this seems to work for me (however, with no noticeable upload speed improvement in my case):

from requests.adapters import HTTPAdapter
from google.cloud import storage

gcs_client = storage.Client()

adapter = HTTPAdapter(pool_connections=30, pool_maxsize=30)
gcs_client._http.mount("https://", adapter)
gcs_client._http._auth_request.session.mount("https://", adapter)
Responsiveness answered 31/12, 2023 at 18:56 Comment(0)
D
1

I have a similar issue with download blobs in parallel.

This article may be informative. https://laike9m.com/blog/requests-secret-pool_connections-and-pool_maxsize,89/

Personally, I don't think that increasing a connection pull is the best solution, I prefer to chunk the "downloads" by pool_maxsize.

def chunker(it: Iterable, chunk_size: int):
    chunk = []
    for index, item in enumerate(it):
        chunk.append(item)
        if not (index + 1) % chunk_size:
            yield chunk
            chunk = []
    if chunk:
        yield chunk

for chunk in chunker(content_list, 10):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(upload_string_to_bucket, chunk)

Of course, we can spawn the download right away after one is ready, all as we wish.

Declare answered 7/8, 2019 at 10:42 Comment(1)
The problem is we are using the GCP library and they do not externalize the ability to configure the pool. I do like your approach of chunking but the default pool currently is definitely too smallRenie

© 2022 - 2024 — McMap. All rights reserved.