Multiprocessing multiple boto3 files taking too long
Asked Answered
R

0

0

I'm basically following the last code from this thread Is there any faster way for downloading multiple files from s3 to local folder?.

I adapted it to my problem which is extract files from s3 into Python (in this case a pandas df):

bucket_name = 'bucket'
prefix = ['prefix']
max_process = 20 # CAN BE CHANGE
debug_en = True

# pass your credentials and region name
s3 = boto3.client('s3',
            aws_access_key_id='key',
            aws_secret_access_key='accesskey', 
            region_name='us-east-1',
            verify=False
             )


def downfiles(bucket_name, obj_key):

    try:
        s3_response_object = s3.get_object(bucket_name, obj_key)
        object_content = s3_response_object['Body'].read()

        return pd.read_parquet(io.BytesIO(object_content))

    except:
        pass


def download_dir(bucket_name, sub_prefix):
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket_name, Prefix=sub_prefix)
    mp_data = []
    for page in pages:
        if 'Contents' in page:
            for obj in page['Contents']:
                if '15' in obj['Key']:
                    mp_data.append((bucket_name, obj['Key']))
    with Pool(processes=4) as pool:                
        return list(pool.starmap(downfiles, mp_data))


if __name__ == '__main__':
    print("starting script...")
    s3_dirs = prefix
    total_df_list = []
    for s3_dir in s3_dirs:
        print("[Information] %s directory is downloading" % s3_dir)
        df_list = download_dir(bucket_name, s3_dir)
        total_df_list = total_df_list + df_list
    final_df = pd.concat(total_df_list, ignore_index=True)

The code is running forever to download 5 small parquet files (it has never ended execution), I'm assuming it has to do with the Pool step as it's new to me. Any tip on where am I failing?

EDIT:

Finally I changed to concurrent package to paralellize processes. Here's the code in case is useful to someone:

bucket_name = 'bucket'
prefix = 'prefix'
s3_object_keys = [] # List of S3 object keys
max_workers = 5
key_filter = '' # to filter out key

s3 = boto3.resource('s3',
            aws_access_key_id='',
            aws_secret_access_key='', 
            region_name='us-east-1',
            verify=False
             )

bucket = s3.Bucket(bucket_name)

for obj in bucket.objects.filter(Prefix=prefix):
    if key_filter:
        if key_filter in obj.key:
            s3_object_keys.append(obj.key)
    else:
        pass

def fetch(key):
    s3_response_object = s3.Object(bucket_name, key)
    object_content = s3_response_object.get()['Body'].read()

    return pd.read_parquet(io.BytesIO(object_content))

def fetch_all(keys):

    with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_key = {executor.submit(fetch, key): key for key in keys}

        print("All URLs submitted.")

        for future in futures.as_completed(future_to_key):

            key = future_to_key[future]
            exception = future.exception()

            if not exception:
                yield key, future.result()
            else:
                yield key, exception

df_list = []
for key, result in fetch_all(s3_object_keys):
    print(f'key: {key}  result: {type(result)}')
    df_list.append(result)

df = pd.concat(df_list, ignore_index=True)

This code is also adapted from the same post within the same thread I mentioned in the original question.

P.S I still would like to know what was wrong with the first code i wrote about.

Reest answered 4/4 at 14:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.