Downloading multiple S3 objects in parallel in Python
Asked Answered
H

3

15

Is there a way to concurrently download S3 files using boto3 in Python3? I am aware of the aiobotocore library, but I would like to know if there is a way to do it using the standard boto3 library.

Hover answered 4/1, 2018 at 9:6 Comment(2)
By looking at the code, I'd say it already does it by itself (look for max_concurrency in the link I pasted)Glucosuria
Are there common reasons why aiobotocore should not be used?Bier
T
17

If you want to download lots of smaller files directly to disk in parallel using boto3 you can do so using the multiprocessing module. Here's a little snippet that will do just that. You run it like: ./download.py bucket_name s3_key_0 s3_key_1 ... s3_key_n

#!/usr/bin/env python3
import multiprocessing
import boto3
import sys

# make a per process s3_client
s3_client = None
def initialize():
  global s3_client
  s3_client = boto3.client('s3')

# the work function of each process which will fetch something from s3
def download(job):
  bucket, key, filename = job
  s3_client.download_file(bucket, key, filename)

if __name__ == '__main__':
  # make the jobs, arguments to program are: bucket s3_key_0 s3_key_1 ... s3_key_n
  bucket = sys.argv[1]
  jobs = [(bucket, key, key.replace('/', '_')) for key in sys.argv[2:] ]

  # make a process pool to do the work
  pool = multiprocessing.Pool(multiprocessing.cpu_count(), initialize)
  pool.map(download, jobs)
  pool.close()
  pool.join()

One important piece of this is that we make an instance of an s3 client for every process that each process will reuse. This is important for 2 reasons. First, creating a client is slow so we want to do that as infrequently as possible. Secondly, clients should not be shared across processes as calls to download_file may mutate internal state of the client.

Tamasha answered 10/8, 2019 at 19:34 Comment(6)
Is the issue with trying to share internal s3 client state why you would need to use multiprocessing rather than multithreading? I've got a similar need to download many smaller files, and was wondering on the hit due to the longer startup time of mulitiprocessing vs multithreading poolsGliadin
multithreading with a pool of threads should work so long as you make a client for each thread in the pool. multiprocessing seemed like it would be less code to illustrate this using the global trick as opposed to threads with thread-local storageTamasha
I tried both with a single client for all threads, and a client per thread - both worked fine. ...It looks like there is significant (understatement!) uncertainty and lack of clarity from the boto team about what is necessary.Gliadin
@Gliadin it is possible that they deliberatly keep this information obfuscated so that its easier for them to make changes to the core assumptions about what is and isnt a good idea. like maybe recent versions became threadsafe but old versions werent. as you stated, its a mystery at the moment!Tamasha
not sure. definitely know there is a github thread where they are repeatedly asked by many people of months, and no statement at all. I don't view that as great accoutability by them unfortunatelyGliadin
Is there some way to do it without global keyword?Formication
H
0

The below snippet will allow you to download multiple objects from s3 using multiprocessing

import boto3
import multiprocessing as mp
import os

s3 = boto3.resource('s3')
my_bucket = s3.Bucket('My_bucket')
        
def s3download(object_key_file):
    my_bucket.download_file(object_key_file[0], object_key_file[1])
    print('downloaded file with object name... {}'.format(object_key_file[0]))
    print('downloaded file with file name... {}'.format(object_key_file[1]))
        
def parallel_s3_download():
    object_key_file=[]
    for s3_object in my_bucket.objects.filter(Prefix="directory_name/"):
        # Need to split s3_object.key into path and file name, else it will give error file not found.
        path, filename = os.path.split(s3_object.key)
        object_key_file.append((s3_object.key,filename))
    object_key_file.pop(0)
    pool = mp.Pool(min(mp.cpu_count(), len(object_key_file)))  # number of workers
    pool.map(s3download, object_key_file, chunksize=1)
    pool.close()
if __name__ == "__main__":
    parallel_s3_download()
    print('downloading zip file')
Handyman answered 20/8, 2020 at 12:36 Comment(1)
what is object_key_file.pop(0) for? Also, I am getting this error: TypeError: s3download() missing 1 required positional argument: 'object_key_file'Tropaeolin
U
0

In the face of the unknown threadsafe status of the boto3.Client, here is one approach to using multiprocessing in python>=3.7

import os
from multiprocessing import Pool
from typing import Generator, Iterable, List
from urllib.parse import urlparse

import boto3
from jsonargparse import CLI


def batcher(iterable: Iterable, batch_size: int) -> Generator[List, None, None]:
    """Batch an iterator. The last item might be of smaller len than batch_size.

    Args:
        iterable (Iterable): Any iterable that should be batched
        batch_size (int): Len of the generated lists

    Yields:
        Generator[List, None, None]: List of items in iterable
    """
    batch = []
    counter = 0
    for i in iterable:
        batch.append(i)
        counter += 1
        if counter % batch_size == 0:
            yield batch
            batch = []
    if len(batch) > 0:
        yield batch


def download_batch(batch):
    s3 = boto3.client("s3")
    n = 0
    for line in batch:
        dst, line = line
        url = urlparse(line)
        url_path = url.path.lstrip("/")
        folder, basename = os.path.split(url_path)
        dir = os.path.join(dst, folder)
        os.makedirs(dir, exist_ok=True)
        filepath = os.path.join(dir, basename)
        print(f"{filepath}")
        s3.download_file(url.netloc, url_path, filepath)
        n += 1
    return n


def file_reader(fp, dst):
    with open(fp) as f:
        for line in f:
            line = line.rstrip("\n")
            yield dst, line


def copy_cli(txt_path: str, dst: str = os.getcwd(), n_cpus: int = os.cpu_count()):
    """Copy files from s3 based on a list of urls. The output folder structure follows
    the s3 path.

    Args:
        txt_path (str): path to your list of files. One url per line.
        dst (str): path to store the files.
        n_cpus (int): number of simultaneous batches. Defaults to the number of cpus in
         the computer.
    """
    total_files = sum([1 for _ in file_reader(txt_path, dst)])
    print(n_cpus)
    n_cpus = min(total_files, n_cpus)
    batch_size = total_files // n_cpus
    with Pool(processes=n_cpus) as pool:
        for n in pool.imap_unordered(
            download_batch, batcher(file_reader(txt_path, dst), batch_size)
        ):
            pass


if __name__ == "__main__":
    CLI(copy_cli)

Usage

pip install jsonargparse boto3

my_list.txt

s3://path/to/file1.abc
s3://path/to/file2.cdf
python s3cp.py my_list.txt --dst ../my_dst_path/ --n_cpus=5

I hope it helps 😊. You can find the same code in this repo https://github.com/fcossio/s3-selective-copy

Uxorious answered 10/3, 2023 at 17:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.