4

I'm using a combination of the GCS python SDK and google API client to loop through a version-enabled bucket and download specific objects based on metadata.

from google.cloud import storage
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials

def downloadepoch_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()

    for item in response['items']:
        if item['metadata']['epoch'] == restore_epoch:
            print(item['bucket'])
            print(item['name'])
            print(item['metadata']['epoch'])
            print(item['updated'])
            blob = source_bucket.blob(item['name'])
            blob.download_to_filename(
                '/Users/admin/git/data-processing/{}'.format(item))


downloadepoch_objects()

The above function works properly for a blob that is not within a directory (gs://bucketname/test1.txt) as the item that gets passed in is simply test1.txt. The issue I am running into is when trying to download files from a complex directory tree (gs://bucketname/nfs/media/docs/test1.txt) The item that gets passed is nfs/media/docs/test1.txt. Is it possible to have the .download_to_file() method to create directories if they are not present?

3 Answers 3

2

Below is the working solution. I ended up stripping away the path from the object name and creating the directory structure on the fly. A better way might be as @Brandon Yarbrough suggested using 'prefix + response['prefixes'][0]' but I couldn't quite figure that out. Hope this helps others out.

#!/usr/local/bin/python3

from google.cloud import storage
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import json
import os
import pathlib

bucket_name = 'test-bucket'
restore_epoch = '1519189202'
restore_location = '/Users/admin/data/'

credentials = GoogleCredentials.get_application_default()
service = discovery.build('storage', 'v1', credentials=credentials)

storage_client = storage.Client()
source_bucket = storage_client.get_bucket(bucket_name)


def listall_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()
    print(json.dumps(response, indent=2))


def listname_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()

    for item in response['items']:
        print(item['name'] + ' Uploaded on: ' + item['updated'] +
              ' Epoch: ' + item['metadata']['epoch'])


def downloadepoch_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()

    try:
        for item in response['items']:
            if item['metadata']['epoch'] == restore_epoch:
                print('Downloading ' + item['name'] + ' from ' +
                      item['bucket'] + '; Epoch= ' + item['metadata']['epoch'])
                print('Saving to: ' + restore_location)
                blob = source_bucket.blob(item['name'])
                path = pathlib.Path(restore_location + r'{}'.format(item['name'])).parent
                if os.path.isdir(path):
                    blob.download_to_filename(restore_location + '{}'.format(item['name']))
                    print('Download complete')
                else:
                    os.mkdir(path)
                    blob.download_to_filename(restore_location + '{}'.format(item['name']))
                    print('Download complete')
    except Exception:
        pass


# listall_objects()
# listname_objects()
downloadepoch_objects()
Sign up to request clarification or add additional context in comments.

Comments

1

GCS does not have a notion of "directories," although tools like gsutil do a good job of pretending for convenience. If you want all of the objects under the "nfs/media/docs/" path, you can specify that as a prefix, like so:

request = service.objects.list(
    bucket=bucket_name,
    versions=True,
    prefix='nfs/media/docs/',  # Only show objects beginning like this
    delimiter='/'  # Consider this character a directory marker.
)
response = request.execute()
subdirectories = response['prefixes']
objects = response['items']

Because of the prefix parameter, only objects that begin with 'nfs/media/docs' will be returned in response['items']. Because of the delimiter parameter, "subdirectories" will be returned in response['prefixes']. You can get more details in the Python documentation of the objects.list method.

If you were to use the newer google-cloud Python library, which I'd recommended for new code, the same call would look pretty similar:

from google.cloud import storage

client = storage.Client()
bucket = client.bucket(bucket_name)
iterator = bucket.list_blobs(
    versions=True,
    prefix='nfs/media/docs/',
    delimiter='/'
)
subdirectories = iterator.prefixes
objects = list(iterator)

6 Comments

Thanks for the response. Just to explain my specific use case a bit further: I set up a cron job that uses gsutil to backup a nfs server file directory. The job runs every day into a version-enabled storage bucket. This bucket keeps 8 versions of each asset allowing myself to be able to role back the entire directory tree 8 days back. Each object is tagged with a custom metadata tag called epoch.
(continuing above comment) Based on this tag I'm able to loop through the entire bucket and only download items that equal the epoch metadata tag. I cannot use gsutil since it does not support downloading by specific version or metadata tag. My directory tree is always changing so specifying the prefix will also not work. I need to figure out a way to loop through the bucket objects, match the epoch metadata tag and then finally download the object and generate a directory structure that is always changing. Thoughts?
Lemme see if I understand. You have an NFS directory tree. It's synced to a GCS bucket. Each day, you rsync the NFS directory tree to the bucket, but you leave versioning on. You'd like the ability to restore the state of the NFS directory tree from a certain day?
That is correct. Im actually using a gsutil cp command to backup the directory tree so I get a complete copy each time the backup job runs. This way when I restore, all objects will be restored and not just the new/changed objects. Each time the 'gsutil cp' command runs it attaches the same metadata tag for each object (epoch). When I run a list on the bucket, I can loop through each object in the bucket and only download the ones with the metadata tag. This works in my original code, but since the files are in "folders", the download fails because my code cant create the directory structure.
Why not also have your code create the directory structure? You can call list() again on prefix + response['prefixes'][0].
|
0

Following solution worked for me. I am recursively downloading all blobs from a path prefix to a model directory at the project root, while maintaining the folder structure. Multiple blobs are being downloaded concurrently.

GCS client version google-cloud-storage==1.41.1

import os
from datetime import datetime
from google.cloud import storage
from concurrent.futures import ThreadPoolExecutor

BUCKET_NAME = "ml-model"

def timer(func):
    def time_wrapper(*arg, **kwargs):
        start = datetime.now()
        func(*arg, **kwargs)
        diff = datetime.now() - start
        logger.info(f"{func.__name__} took {diff.seconds} s and {diff.microseconds//1000} ms")
    return time_wrapper

def fetch_environment() -> str:
    env = os.environ.get("environment", "staging")
    return env


def create_custom_folder(dir_name: str):
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)


def fetch_gcs_credential_file_path():
    return os.environ.get("GCS_CREDENTIAL_FILE_PATH")


class GCS:
    def __init__(self):
        cred_file_path = fetch_gcs_credential_file_path()
        self.client = storage.Client.from_service_account_json(cred_file_path)
        self.bucket = self.client.bucket(BUCKET_NAME)

    def download_blob(self, blob):
        filename = blob.name.replace(self.path_prefix, '')
        delimiter_based_splits = filename.split('/')
        if len(delimiter_based_splits) > 1:
            dir_name = "model/" + "/".join(delimiter_based_splits[: len(delimiter_based_splits)-1])
            create_custom_folder(dir_name)
            blob.download_to_filename(f"{dir_name}/{delimiter_based_splits[-1]}")
        else:
            blob.download_to_filename(f"model/" + filename)
    @timer
    def download_blobs_multithreaded(self, prefix: str):
        '''
        CREATE FOLDER IF NOT EXISTS
        '''
        create_custom_folder("model")

        blobs = self.bucket.list_blobs(prefix=prefix)

        self.path_prefix = prefix
        with ThreadPoolExecutor() as executor:
            executor.map(self.download_blob, blobs


def download_model():
    env = fetch_environment()
    folder_path_prefix = f"ml/{env}/{ML_MODEL_NAME}/v1/tf-saved-model/"
    gcs = GCS()
    gcs.download_blobs_multithreaded(folder_path_prefix)

if __name__ == '__main__':
    download_model()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.