Is there a way to concurrently download S3 files using boto3 in Python3? I am aware of the aiobotocore library, but I would like to know if there is a way to do it using the standard boto3 library.
If you want to download lots of smaller files directly to disk in parallel using boto3
you can do so using the multiprocessing
module. Here's a little snippet that will do just that. You run it like: ./download.py bucket_name s3_key_0 s3_key_1 ... s3_key_n
#!/usr/bin/env python3
import multiprocessing
import boto3
import sys
# make a per process s3_client
s3_client = None
def initialize():
global s3_client
s3_client = boto3.client('s3')
# the work function of each process which will fetch something from s3
def download(job):
bucket, key, filename = job
s3_client.download_file(bucket, key, filename)
if __name__ == '__main__':
# make the jobs, arguments to program are: bucket s3_key_0 s3_key_1 ... s3_key_n
bucket = sys.argv[1]
jobs = [(bucket, key, key.replace('/', '_')) for key in sys.argv[2:] ]
# make a process pool to do the work
pool = multiprocessing.Pool(multiprocessing.cpu_count(), initialize)
pool.map(download, jobs)
pool.close()
pool.join()
One important piece of this is that we make an instance of an s3 client for every process that each process will reuse. This is important for 2 reasons. First, creating a client is slow so we want to do that as infrequently as possible. Secondly, clients should not be shared across processes as calls to download_file
may mutate internal state of the client.
global
trick as opposed to threads with thread-local storage –
Tamasha global
keyword? –
Formication The below snippet will allow you to download multiple objects from s3 using multiprocessing
import boto3
import multiprocessing as mp
import os
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('My_bucket')
def s3download(object_key_file):
my_bucket.download_file(object_key_file[0], object_key_file[1])
print('downloaded file with object name... {}'.format(object_key_file[0]))
print('downloaded file with file name... {}'.format(object_key_file[1]))
def parallel_s3_download():
object_key_file=[]
for s3_object in my_bucket.objects.filter(Prefix="directory_name/"):
# Need to split s3_object.key into path and file name, else it will give error file not found.
path, filename = os.path.split(s3_object.key)
object_key_file.append((s3_object.key,filename))
object_key_file.pop(0)
pool = mp.Pool(min(mp.cpu_count(), len(object_key_file))) # number of workers
pool.map(s3download, object_key_file, chunksize=1)
pool.close()
if __name__ == "__main__":
parallel_s3_download()
print('downloading zip file')
In the face of the unknown threadsafe status of the boto3.Client
, here is one approach to using multiprocessing in python>=3.7
import os
from multiprocessing import Pool
from typing import Generator, Iterable, List
from urllib.parse import urlparse
import boto3
from jsonargparse import CLI
def batcher(iterable: Iterable, batch_size: int) -> Generator[List, None, None]:
"""Batch an iterator. The last item might be of smaller len than batch_size.
Args:
iterable (Iterable): Any iterable that should be batched
batch_size (int): Len of the generated lists
Yields:
Generator[List, None, None]: List of items in iterable
"""
batch = []
counter = 0
for i in iterable:
batch.append(i)
counter += 1
if counter % batch_size == 0:
yield batch
batch = []
if len(batch) > 0:
yield batch
def download_batch(batch):
s3 = boto3.client("s3")
n = 0
for line in batch:
dst, line = line
url = urlparse(line)
url_path = url.path.lstrip("/")
folder, basename = os.path.split(url_path)
dir = os.path.join(dst, folder)
os.makedirs(dir, exist_ok=True)
filepath = os.path.join(dir, basename)
print(f"{filepath}")
s3.download_file(url.netloc, url_path, filepath)
n += 1
return n
def file_reader(fp, dst):
with open(fp) as f:
for line in f:
line = line.rstrip("\n")
yield dst, line
def copy_cli(txt_path: str, dst: str = os.getcwd(), n_cpus: int = os.cpu_count()):
"""Copy files from s3 based on a list of urls. The output folder structure follows
the s3 path.
Args:
txt_path (str): path to your list of files. One url per line.
dst (str): path to store the files.
n_cpus (int): number of simultaneous batches. Defaults to the number of cpus in
the computer.
"""
total_files = sum([1 for _ in file_reader(txt_path, dst)])
print(n_cpus)
n_cpus = min(total_files, n_cpus)
batch_size = total_files // n_cpus
with Pool(processes=n_cpus) as pool:
for n in pool.imap_unordered(
download_batch, batcher(file_reader(txt_path, dst), batch_size)
):
pass
if __name__ == "__main__":
CLI(copy_cli)
Usage
pip install jsonargparse boto3
my_list.txt
s3://path/to/file1.abc
s3://path/to/file2.cdf
python s3cp.py my_list.txt --dst ../my_dst_path/ --n_cpus=5
I hope it helps 😊. You can find the same code in this repo https://github.com/fcossio/s3-selective-copy
© 2022 - 2024 — McMap. All rights reserved.
max_concurrency
in the link I pasted) – Glucosuriaaiobotocore
should not be used? – Bier