Is there a way to concurrently download S3 files using boto3 in Python3? I am aware of the aiobotocore library, but I would like to know if there is a way to do it using the standard boto3 library.
3 Answers
If you want to download lots of smaller files directly to disk in parallel using boto3 you can do so using the multiprocessing module. Here's a little snippet that will do just that. You run it like: ./download.py bucket_name s3_key_0 s3_key_1 ... s3_key_n
#!/usr/bin/env python3
import multiprocessing
import boto3
import sys
# make a per process s3_client
s3_client = None
def initialize():
global s3_client
s3_client = boto3.client('s3')
# the work function of each process which will fetch something from s3
def download(job):
bucket, key, filename = job
s3_client.download_file(bucket, key, filename)
if __name__ == '__main__':
# make the jobs, arguments to program are: bucket s3_key_0 s3_key_1 ... s3_key_n
bucket = sys.argv[1]
jobs = [(bucket, key, key.replace('/', '_')) for key in sys.argv[2:] ]
# make a process pool to do the work
pool = multiprocessing.Pool(multiprocessing.cpu_count(), initialize)
pool.map(download, jobs)
pool.close()
pool.join()
One important piece of this is that we make an instance of an s3 client for every process that each process will reuse. This is important for 2 reasons. First, creating a client is slow so we want to do that as infrequently as possible. Secondly, clients should not be shared across processes as calls to download_file may mutate internal state of the client.
6 Comments
global trick as opposed to threads with thread-local storageThe below snippet will allow you to download multiple objects from s3 using multiprocessing
import boto3
import multiprocessing as mp
import os
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('My_bucket')
def s3download(object_key_file):
my_bucket.download_file(object_key_file[0], object_key_file[1])
print('downloaded file with object name... {}'.format(object_key_file[0]))
print('downloaded file with file name... {}'.format(object_key_file[1]))
def parallel_s3_download():
object_key_file=[]
for s3_object in my_bucket.objects.filter(Prefix="directory_name/"):
# Need to split s3_object.key into path and file name, else it will give error file not found.
path, filename = os.path.split(s3_object.key)
object_key_file.append((s3_object.key,filename))
object_key_file.pop(0)
pool = mp.Pool(min(mp.cpu_count(), len(object_key_file))) # number of workers
pool.map(s3download, object_key_file, chunksize=1)
pool.close()
if __name__ == "__main__":
parallel_s3_download()
print('downloading zip file')
1 Comment
In the face of the unknown threadsafe status of the boto3.Client, here is one approach to using multiprocessing in python>=3.7
import os
from multiprocessing import Pool
from typing import Generator, Iterable, List
from urllib.parse import urlparse
import boto3
from jsonargparse import CLI
def batcher(iterable: Iterable, batch_size: int) -> Generator[List, None, None]:
"""Batch an iterator. The last item might be of smaller len than batch_size.
Args:
iterable (Iterable): Any iterable that should be batched
batch_size (int): Len of the generated lists
Yields:
Generator[List, None, None]: List of items in iterable
"""
batch = []
counter = 0
for i in iterable:
batch.append(i)
counter += 1
if counter % batch_size == 0:
yield batch
batch = []
if len(batch) > 0:
yield batch
def download_batch(batch):
s3 = boto3.client("s3")
n = 0
for line in batch:
dst, line = line
url = urlparse(line)
url_path = url.path.lstrip("/")
folder, basename = os.path.split(url_path)
dir = os.path.join(dst, folder)
os.makedirs(dir, exist_ok=True)
filepath = os.path.join(dir, basename)
print(f"{filepath}")
s3.download_file(url.netloc, url_path, filepath)
n += 1
return n
def file_reader(fp, dst):
with open(fp) as f:
for line in f:
line = line.rstrip("\n")
yield dst, line
def copy_cli(txt_path: str, dst: str = os.getcwd(), n_cpus: int = os.cpu_count()):
"""Copy files from s3 based on a list of urls. The output folder structure follows
the s3 path.
Args:
txt_path (str): path to your list of files. One url per line.
dst (str): path to store the files.
n_cpus (int): number of simultaneous batches. Defaults to the number of cpus in
the computer.
"""
total_files = sum([1 for _ in file_reader(txt_path, dst)])
print(n_cpus)
n_cpus = min(total_files, n_cpus)
batch_size = total_files // n_cpus
with Pool(processes=n_cpus) as pool:
for n in pool.imap_unordered(
download_batch, batcher(file_reader(txt_path, dst), batch_size)
):
pass
if __name__ == "__main__":
CLI(copy_cli)
Usage
pip install jsonargparse boto3
my_list.txt
s3://path/to/file1.abc
s3://path/to/file2.cdf
python s3cp.py my_list.txt --dst ../my_dst_path/ --n_cpus=5
I hope it helps 😊. You can find the same code in this repo https://github.com/fcossio/s3-selective-copy
max_concurrencyin the link I pasted)aiobotocoreshould not be used?