0

I am trying to read a list of files uploaded to a Google Storage bucket and load them to a file/buffer so that I can perform some aggregation on these files.

So far, I am able to read the contents of all the files in a serial manners (each blob object from the iterator that contains all the files in the bucket). However, there are thousands of files that I have uploaded to google cloud storage and even reading these files is taking a considerable amount of time.

from google.cloud import storage
import json
import time

import multiprocessing
from multiprocessing import Pool, Manager

cpu_count = multiprocessing.cpu_count()
manager = Manager()
finalized_list = manager.list()

# Explicitly use service account credentials by specifying the private key file.
storage_client = storage.Client.from_service_account_json('.serviceAccountCredentials.json')
bucket_name = "bucket-name"

def list_blobs():
    blobs = storage_client.list_blobs(bucket_name)
    return blobs


def read_blob(blob):
    bucket = storage_client.bucket(bucket_name)
    blob_object = bucket.blob(blob)
    with blob_object.open("r") as f:
        converted_string = f.read()
        print(converted_string)
        finalized_list.append(converted_string)

def main():
    start_time = time.time()
    print("Start time: ", start_time)

    pool = Pool(processes=cpu_count)
    blobs = list_blobs()
    pool.map(read_blob, [blob for blob in blobs])
    
    end_time = time.time()
    elapsed_time = end_time - start_time
    print("Time taken: ", elapsed_time, " seconds")

if __name__ == "__main__":
    main()

As in the above code snippet, I thought of using multiprocessing in python to read each blob object in the bucket, however, since the blob object returned by google cloud storage is not a standard iterator/list object, I am getting an error that says Pickling client objects is not explicitly supported

Is there any other way that I could use to fetch and read thousands of files from cloud storage quickly using a python script?

2
  • 1
    you might want to take a look at gcloud.aio which is an async implementation of a number of the GCP API's. Commented Jan 5, 2023 at 9:30
  • 1
    You could try using multithreading since I would think the program is mostly waiting for network requests to complete. By the way: (1) You can replace [blob for blob in blobs] with just blobs. (2) Having manager = Manager(); finalized_list = manager.list() at global scope is a disaster if you run this under an OS that creates child tasks using the spawn method, such as Windows (each process will be appending to its own list assuming your blob could be pickled). Commented Jan 5, 2023 at 12:10

1 Answer 1

1

Here is a solution I did a years ago with concurrent.futures.ProcessPoolExecutor (I did a cpu heavy task. You can as well use concurrent.futures.ThreadPoolExecutor if you're mostly waiting for a return)

from google.cloud import storage

# multi CPU
import concurrent.futures

# progress bar
from tqdm import tqdm

bucket_name = 'your_bucket'
path_to_folder = 'your_path_to_the_files'
file_ending = '.pkl'

kwargs_bucket={
    'bucket_or_name': bucket_name,
    #'max_results': 60, # comment if you want to run it on all files
    'prefix': path_to_folder
}

kwargs_process_pool={
    #'max_workers': 1 #comment if you want full speed
}

# a list to store the output
results = []

# connect to the bucket
client = storage.Client()
bucket = client.get_bucket(bucket_name)

# multi CPU OCR
futures = []
# progress bar
with tqdm(total=sum(1 for blob in client.list_blobs(**kwargs_bucket) if blob.name.endswith(file_ending)), position=0, leave=True) as pbar:
    #ProcessPoolExecutor
    with concurrent.futures.ProcessPoolExecutor(**kwargs_process_pool) as executor:
        # getting all the files from the bucket
        for blob in client.list_blobs(**kwargs_bucket):
            # skip the folder
            if not blob.name.endswith(file_ending):
                continue
            # calling the class above with the ProcessPoolExecutor
            futures.append(executor.submit(your_function, blob.name))

        # updating the progress bar and checking the return
        for future in concurrent.futures.as_completed(futures):
            pbar.update(1)
            if future.result() != '':
                results.append(future.result())

I figured out the hard way, that you should only pass variables and not objects to your_function with the executor. That's why I'm passing blob.name.

Hope that helps

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.