18

Is there a way to concurrently download S3 files using boto3 in Python3? I am aware of the aiobotocore library, but I would like to know if there is a way to do it using the standard boto3 library.

2
  • By looking at the code, I'd say it already does it by itself (look for max_concurrency in the link I pasted) Commented Jan 4, 2018 at 9:09
  • Are there common reasons why aiobotocore should not be used? Commented Apr 29, 2020 at 17:00

3 Answers 3

21

If you want to download lots of smaller files directly to disk in parallel using boto3 you can do so using the multiprocessing module. Here's a little snippet that will do just that. You run it like: ./download.py bucket_name s3_key_0 s3_key_1 ... s3_key_n

#!/usr/bin/env python3
import multiprocessing
import boto3
import sys

# make a per process s3_client
s3_client = None
def initialize():
  global s3_client
  s3_client = boto3.client('s3')

# the work function of each process which will fetch something from s3
def download(job):
  bucket, key, filename = job
  s3_client.download_file(bucket, key, filename)

if __name__ == '__main__':
  # make the jobs, arguments to program are: bucket s3_key_0 s3_key_1 ... s3_key_n
  bucket = sys.argv[1]
  jobs = [(bucket, key, key.replace('/', '_')) for key in sys.argv[2:] ]

  # make a process pool to do the work
  pool = multiprocessing.Pool(multiprocessing.cpu_count(), initialize)
  pool.map(download, jobs)
  pool.close()
  pool.join()

One important piece of this is that we make an instance of an s3 client for every process that each process will reuse. This is important for 2 reasons. First, creating a client is slow so we want to do that as infrequently as possible. Secondly, clients should not be shared across processes as calls to download_file may mutate internal state of the client.

Sign up to request clarification or add additional context in comments.

6 Comments

Is the issue with trying to share internal s3 client state why you would need to use multiprocessing rather than multithreading? I've got a similar need to download many smaller files, and was wondering on the hit due to the longer startup time of mulitiprocessing vs multithreading pools
multithreading with a pool of threads should work so long as you make a client for each thread in the pool. multiprocessing seemed like it would be less code to illustrate this using the global trick as opposed to threads with thread-local storage
I tried both with a single client for all threads, and a client per thread - both worked fine. ...It looks like there is significant (understatement!) uncertainty and lack of clarity from the boto team about what is necessary.
@Richard it is possible that they deliberatly keep this information obfuscated so that its easier for them to make changes to the core assumptions about what is and isnt a good idea. like maybe recent versions became threadsafe but old versions werent. as you stated, its a mystery at the moment!
not sure. definitely know there is a github thread where they are repeatedly asked by many people of months, and no statement at all. I don't view that as great accoutability by them unfortunately
|
0

The below snippet will allow you to download multiple objects from s3 using multiprocessing

import boto3
import multiprocessing as mp
import os

s3 = boto3.resource('s3')
my_bucket = s3.Bucket('My_bucket')
        
def s3download(object_key_file):
    my_bucket.download_file(object_key_file[0], object_key_file[1])
    print('downloaded file with object name... {}'.format(object_key_file[0]))
    print('downloaded file with file name... {}'.format(object_key_file[1]))
        
def parallel_s3_download():
    object_key_file=[]
    for s3_object in my_bucket.objects.filter(Prefix="directory_name/"):
        # Need to split s3_object.key into path and file name, else it will give error file not found.
        path, filename = os.path.split(s3_object.key)
        object_key_file.append((s3_object.key,filename))
    object_key_file.pop(0)
    pool = mp.Pool(min(mp.cpu_count(), len(object_key_file)))  # number of workers
    pool.map(s3download, object_key_file, chunksize=1)
    pool.close()
if __name__ == "__main__":
    parallel_s3_download()
    print('downloading zip file')

1 Comment

what is object_key_file.pop(0) for? Also, I am getting this error: TypeError: s3download() missing 1 required positional argument: 'object_key_file'
0

In the face of the unknown threadsafe status of the boto3.Client, here is one approach to using multiprocessing in python>=3.7

import os
from multiprocessing import Pool
from typing import Generator, Iterable, List
from urllib.parse import urlparse

import boto3
from jsonargparse import CLI


def batcher(iterable: Iterable, batch_size: int) -> Generator[List, None, None]:
    """Batch an iterator. The last item might be of smaller len than batch_size.

    Args:
        iterable (Iterable): Any iterable that should be batched
        batch_size (int): Len of the generated lists

    Yields:
        Generator[List, None, None]: List of items in iterable
    """
    batch = []
    counter = 0
    for i in iterable:
        batch.append(i)
        counter += 1
        if counter % batch_size == 0:
            yield batch
            batch = []
    if len(batch) > 0:
        yield batch


def download_batch(batch):
    s3 = boto3.client("s3")
    n = 0
    for line in batch:
        dst, line = line
        url = urlparse(line)
        url_path = url.path.lstrip("/")
        folder, basename = os.path.split(url_path)
        dir = os.path.join(dst, folder)
        os.makedirs(dir, exist_ok=True)
        filepath = os.path.join(dir, basename)
        print(f"{filepath}")
        s3.download_file(url.netloc, url_path, filepath)
        n += 1
    return n


def file_reader(fp, dst):
    with open(fp) as f:
        for line in f:
            line = line.rstrip("\n")
            yield dst, line


def copy_cli(txt_path: str, dst: str = os.getcwd(), n_cpus: int = os.cpu_count()):
    """Copy files from s3 based on a list of urls. The output folder structure follows
    the s3 path.

    Args:
        txt_path (str): path to your list of files. One url per line.
        dst (str): path to store the files.
        n_cpus (int): number of simultaneous batches. Defaults to the number of cpus in
         the computer.
    """
    total_files = sum([1 for _ in file_reader(txt_path, dst)])
    print(n_cpus)
    n_cpus = min(total_files, n_cpus)
    batch_size = total_files // n_cpus
    with Pool(processes=n_cpus) as pool:
        for n in pool.imap_unordered(
            download_batch, batcher(file_reader(txt_path, dst), batch_size)
        ):
            pass


if __name__ == "__main__":
    CLI(copy_cli)

Usage

pip install jsonargparse boto3

my_list.txt

s3://path/to/file1.abc
s3://path/to/file2.cdf
python s3cp.py my_list.txt --dst ../my_dst_path/ --n_cpus=5

I hope it helps 😊. You can find the same code in this repo https://github.com/fcossio/s3-selective-copy

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.