I'm basically following the last code from this thread Is there any faster way for downloading multiple files from s3 to local folder?.
I adapted it to my problem which is extract files from s3 into Python (in this case a pandas df):
bucket_name = 'bucket'
prefix = ['prefix']
max_process = 20 # CAN BE CHANGE
debug_en = True
# pass your credentials and region name
s3 = boto3.client('s3',
aws_access_key_id='key',
aws_secret_access_key='accesskey',
region_name='us-east-1',
verify=False
)
def downfiles(bucket_name, obj_key):
try:
s3_response_object = s3.get_object(bucket_name, obj_key)
object_content = s3_response_object['Body'].read()
return pd.read_parquet(io.BytesIO(object_content))
except:
pass
def download_dir(bucket_name, sub_prefix):
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket_name, Prefix=sub_prefix)
mp_data = []
for page in pages:
if 'Contents' in page:
for obj in page['Contents']:
if '15' in obj['Key']:
mp_data.append((bucket_name, obj['Key']))
with Pool(processes=4) as pool:
return list(pool.starmap(downfiles, mp_data))
if __name__ == '__main__':
print("starting script...")
s3_dirs = prefix
total_df_list = []
for s3_dir in s3_dirs:
print("[Information] %s directory is downloading" % s3_dir)
df_list = download_dir(bucket_name, s3_dir)
total_df_list = total_df_list + df_list
final_df = pd.concat(total_df_list, ignore_index=True)
The code is running forever to download 5 small parquet files (it has never ended execution), I'm assuming it has to do with the Pool step as it's new to me. Any tip on where am I failing?
EDIT:
Finally I changed to concurrent package to paralellize processes. Here's the code in case is useful to someone:
bucket_name = 'bucket'
prefix = 'prefix'
s3_object_keys = [] # List of S3 object keys
max_workers = 5
key_filter = '' # to filter out key
s3 = boto3.resource('s3',
aws_access_key_id='',
aws_secret_access_key='',
region_name='us-east-1',
verify=False
)
bucket = s3.Bucket(bucket_name)
for obj in bucket.objects.filter(Prefix=prefix):
if key_filter:
if key_filter in obj.key:
s3_object_keys.append(obj.key)
else:
pass
def fetch(key):
s3_response_object = s3.Object(bucket_name, key)
object_content = s3_response_object.get()['Body'].read()
return pd.read_parquet(io.BytesIO(object_content))
def fetch_all(keys):
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_key = {executor.submit(fetch, key): key for key in keys}
print("All URLs submitted.")
for future in futures.as_completed(future_to_key):
key = future_to_key[future]
exception = future.exception()
if not exception:
yield key, future.result()
else:
yield key, exception
df_list = []
for key, result in fetch_all(s3_object_keys):
print(f'key: {key} result: {type(result)}')
df_list.append(result)
df = pd.concat(df_list, ignore_index=True)
This code is also adapted from the same post within the same thread I mentioned in the original question.
P.S I still would like to know what was wrong with the first code i wrote about.