4

I'm trying to generate a list of all S3 files in a bucket/folder. There are usually in the magnitude of millions of files in the folder. I use boto right now and it's able to retrieve around 33k files per minute, which for even a million files, takes half an hour. I also load these files into a dataframe, but generate and use this list as a way to track which files are being processed.

What I've noticed is that when I ask Spark to read all files in the folder, it does a listing of its own and is able to list them out much faster than the boto call can, and then process those files. I looked up a way to do this in PySpark, but found no good examples. The closest I got was some Java and Scala code to list out the files using the HDFS library.

Is there a way we can do this in Python and Spark? For reference, I'm trying to replicate the following code snippet:

def get_s3_files(source_directory, file_type="json"):
    s3_resource = boto3.resource("s3")

    file_prepend_path = f"/{'/'.join(source_directory.parts[1:4])}"
    bucket_name = str(source_directory.parts[3])
    prefix = "/".join(source_directory.parts[4:])

    bucket = s3_resource.Bucket(bucket_name)

    s3_source_files = []

    for object in bucket.objects.filter(Prefix=prefix):
        if object.key.endswith(f".{file_type}"):
            s3_source_files.append(
                (
                    f"{file_prepend_path}/{object.key}",
                    object.size,
                    str(source_directory),
                    str(datetime.now()),
                )
            )

    return s3_source_files
2
  • Take a look at this stackoverflow.com/a/40258750/1386551 Commented Nov 10, 2021 at 22:30
  • @blackbishop - I did, but it seemed specific to HDFS and so wasn't sure if we could apply it to S3 as well. Commented Nov 10, 2021 at 23:09

2 Answers 2

5

This can be achievable very simply by dbutils.

def get_dir_content(ls_path):
  dir_paths = dbutils.fs.ls(ls_path)
  subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
  flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
  return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
    

paths = get_dir_content('s3 location')
[print(p) for p in paths]
Sign up to request clarification or add additional context in comments.

5 Comments

We used to use dbutils before, but it was too slow. We found boto to be quicker, but are trying to see if we can make it even quicker.
means , You are looking option , listing the files from given s3 location path instead of data read . am i right ? please confirm .
Yeah, as long as I can replicate the piece of code I have posted using PySpark (and have it run faster), that'd be perfect. That said, I used the AWS CLI to list objects and it was significantly faster. Weird that the CLI would take less time than boto.
dbutils.fs.ls is far worse than just listing the files with pathlib and glob.
Doing listing and then filtering is prohibit if you work with a huge amount of keys, s3api list-objects-v2 with prefix is the best option so far.
1

For some reason, using the AWS CLI command was roughly 15 times(!) faster than using boto. Not sure exactly why this is the case, but here's the code I am currently using, in case someone might find it handy. Basically, use s3api to list the objects, and then use jq to manipulate the output and get it into a form of my liking.

def get_s3_files(source_directory, schema, file_type="json"):

    file_prepend_path = f"/{'/'.join(source_directory.parts[1:4])}"
    bucket = str(source_directory.parts[3])
    prefix = "/".join(source_directory.parts[4:])

    s3_list_cmd = f"aws s3api list-objects-v2 --bucket {bucket} --prefix {prefix} | jq -r '.Contents[] | select(.Key | endswith(\".{file_type}\")) | [\"{file_prepend_path}/\"+.Key, .Size, \"{source_directory}\", (now | strftime(\"%Y-%m-%d %H:%M:%S.%s\"))] | @csv'"

    s3_list = subprocess.check_output(s3_list_cmd, shell=True, universal_newlines=True)

    with open(f"s3_file_paths.csv", "w") as f:
        f.truncate()
        f.write(s3_list)

    s3_source_files_df = spark.read.option("header", False).schema(schema).csv(f"s3_file_paths.csv")

    return s3_source_files_df

1 Comment

bucket.objects.filter(Prefix=prefix) is probably getting all the keys and then the filtering, the s3api list-objects-v2 filters the keys (I'm pretty sure it uses a index) before returning it

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.