diff --git a/README.md b/README.md index be9dbd4..eeab92c 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,10 @@ --- -services: batch, storage -platforms: python -author: dlepow +page_type: sample +description: "A Python application that uses Batch to process media files in parallel with the ffmpeg open-source tool." +languages: +- python +products: +- azure --- # Batch Python File Processing with ffmpeg diff --git a/src/batch_python_tutorial_ffmpeg.py b/src/batch_python_tutorial_ffmpeg.py index f5d999b..d875cdc 100644 --- a/src/batch_python_tutorial_ffmpeg.py +++ b/src/batch_python_tutorial_ffmpeg.py @@ -4,6 +4,7 @@ import os import sys import time +import config try: input = raw_input @@ -18,23 +19,10 @@ sys.path.append('.') sys.path.append('..') - -# Update the Batch and Storage account credential strings below with the values +# Update the Batch and Storage account credential strings in config.py with values # unique to your accounts. These are used when constructing connection strings # for the Batch and Storage client objects. -# global -_BATCH_ACCOUNT_NAME ='' -_BATCH_ACCOUNT_KEY = '' -_BATCH_ACCOUNT_URL = '' -_STORAGE_ACCOUNT_NAME = '' -_STORAGE_ACCOUNT_KEY = '' -_POOL_ID = 'LinuxFfmpegPool' -_DEDICATED_POOL_NODE_COUNT = 0 -_LOW_PRIORITY_POOL_NODE_COUNT = 5 -_POOL_VM_SIZE = 'STANDARD_A1_v2' -_JOB_ID = 'LinuxFfmpegJob' - def query_yes_no(question, default="yes"): """ @@ -105,18 +93,18 @@ def upload_file_to_container(block_blob_client, container_name, file_path): block_blob_client.create_blob_from_path(container_name, blob_name, file_path) - + # Obtain the SAS token for the container. sas_token = get_container_sas_token(block_blob_client, - container_name, azureblob.BlobPermissions.READ) - + container_name, azureblob.BlobPermissions.READ) sas_url = block_blob_client.make_blob_url(container_name, blob_name, sas_token=sas_token) return batchmodels.ResourceFile(file_path=blob_name, - blob_source=sas_url) + http_url=sas_url) + def get_container_sas_token(block_blob_client, container_name, blob_permissions): @@ -143,9 +131,8 @@ def get_container_sas_token(block_blob_client, return container_sas_token - def get_container_sas_url(block_blob_client, - container_name, blob_permissions): + container_name, blob_permissions): """ Obtains a shared access signature URL that provides write access to the ouput container to which the tasks will upload their output. @@ -159,10 +146,11 @@ def get_container_sas_url(block_blob_client, """ # Obtain the SAS token for the container. sas_token = get_container_sas_token(block_blob_client, - container_name, azureblob.BlobPermissions.WRITE) + container_name, azureblob.BlobPermissions.WRITE) # Construct SAS URL for the container - container_sas_url = "https://{}.blob.core.windows.net/{}?{}".format(_STORAGE_ACCOUNT_NAME, container_name, sas_token) + container_sas_url = "https://{}.blob.core.windows.net/{}?{}".format( + config._STORAGE_ACCOUNT_NAME, container_name, sas_token) return container_sas_url @@ -187,31 +175,33 @@ def create_pool(batch_service_client, pool_id): # The start task installs ffmpeg on each node from an available repository, using # an administrator user identity. - + new_pool = batch.models.PoolAddParameter( id=pool_id, virtual_machine_configuration=batchmodels.VirtualMachineConfiguration( image_reference=batchmodels.ImageReference( - publisher="Canonical", - offer="UbuntuServer", - sku="16.04.0-LTS", - version="latest" - ), - node_agent_sku_id="batch.node.ubuntu 16.04"), - vm_size=_POOL_VM_SIZE, - target_dedicated_nodes=_DEDICATED_POOL_NODE_COUNT, - target_low_priority_nodes=_LOW_PRIORITY_POOL_NODE_COUNT, + publisher="Canonical", + offer="UbuntuServer", + sku="18.04-LTS", + version="latest" + ), + node_agent_sku_id="batch.node.ubuntu 18.04"), + vm_size=config._POOL_VM_SIZE, + target_dedicated_nodes=config._DEDICATED_POOL_NODE_COUNT, + target_low_priority_nodes=config._LOW_PRIORITY_POOL_NODE_COUNT, start_task=batchmodels.StartTask( command_line="/bin/bash -c \"apt-get update && apt-get install -y ffmpeg\"", wait_for_success=True, user_identity=batchmodels.UserIdentity( auto_user=batchmodels.AutoUserSpecification( - scope=batchmodels.AutoUserScope.pool, - elevation_level=batchmodels.ElevationLevel.admin)), - ) + scope=batchmodels.AutoUserScope.pool, + elevation_level=batchmodels.ElevationLevel.admin)), + ) ) + batch_service_client.pool.add(new_pool) + def create_job(batch_service_client, job_id, pool_id): """ Creates a job with the specified ID, associated with the specified pool. @@ -224,11 +214,12 @@ def create_job(batch_service_client, job_id, pool_id): print('Creating job [{}]...'.format(job_id)) job = batch.models.JobAddParameter( - job_id, - batch.models.PoolInformation(pool_id=pool_id)) + id=job_id, + pool_info=batch.models.PoolInformation(pool_id=pool_id)) batch_service_client.job.add(job) - + + def add_tasks(batch_service_client, job_id, input_files, output_container_sas_url): """ Adds a task for each input file in the collection to the specified job. @@ -246,24 +237,26 @@ def add_tasks(batch_service_client, job_id, input_files, output_container_sas_ur tasks = list() - for idx, input_file in enumerate(input_files): - input_file_path=input_file.file_path - output_file_path="".join((input_file_path).split('.')[:-1]) + '.mp3' - command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(input_file_path, output_file_path) + for idx, input_file in enumerate(input_files): + input_file_path = input_file.file_path + output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3' + command = "/bin/bash -c \"ffmpeg -i {} {} \"".format( + input_file_path, output_file_path) tasks.append(batch.models.TaskAddParameter( id='Task{}'.format(idx), command_line=command, resource_files=[input_file], - output_files=[batchmodels.OutputFile(output_file_path, - destination=batchmodels.OutputFileDestination( - container=batchmodels.OutputFileBlobContainerDestination(output_container_sas_url)), - upload_options=batchmodels.OutputFileUploadOptions( - batchmodels.OutputFileUploadCondition.task_success))] - ) - ) + output_files=[batchmodels.OutputFile( + file_pattern=output_file_path, + destination=batchmodels.OutputFileDestination( + container=batchmodels.OutputFileBlobContainerDestination( + container_url=output_container_sas_url)), + upload_options=batchmodels.OutputFileUploadOptions( + upload_condition=batchmodels.OutputFileUploadCondition.task_success))] + ) + ) batch_service_client.task.add_collection(job_id, tasks) - def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): """ @@ -299,7 +292,6 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): "timeout period of " + str(timeout)) - if __name__ == '__main__': start_time = datetime.datetime.now().replace(microsecond=0) @@ -309,14 +301,13 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): # Create the blob client, for use in obtaining references to # blob storage containers and uploading files to containers. - blob_client = azureblob.BlockBlobService( - account_name=_STORAGE_ACCOUNT_NAME, - account_key=_STORAGE_ACCOUNT_KEY) + account_name=config._STORAGE_ACCOUNT_NAME, + account_key=config._STORAGE_ACCOUNT_KEY) # Use the blob client to create the containers in Azure Storage if they # don't yet exist. - + input_container_name = 'input' output_container_name = 'output' blob_client.create_container(input_container_name, fail_on_exist=False) @@ -324,22 +315,23 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): print('Container [{}] created.'.format(input_container_name)) print('Container [{}] created.'.format(output_container_name)) - # Create a list of all MP4 files in the InputFiles directory. + # Create a list of all MP4 files in the InputFiles directory. input_file_paths = [] - - for folder, subs, files in os.walk('./InputFiles/'): + + for folder, subs, files in os.walk(os.path.join(sys.path[0], 'InputFiles')): for filename in files: if filename.endswith(".mp4"): - input_file_paths.append(os.path.abspath(os.path.join(folder, filename))) + input_file_paths.append(os.path.abspath( + os.path.join(folder, filename))) - # Upload the input files. This is the collection of files that are to be processed by the tasks. + # Upload the input files. This is the collection of files that are to be processed by the tasks. input_files = [ upload_file_to_container(blob_client, input_container_name, file_path) for file_path in input_file_paths] # Obtain a shared access signature URL that provides write access to the output # container to which the tasks will upload their output. - + output_container_sas_url = get_container_sas_url( blob_client, output_container_name, @@ -347,41 +339,41 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): # Create a Batch service client. We'll now be interacting with the Batch # service in addition to Storage - credentials = batchauth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME, - _BATCH_ACCOUNT_KEY) + credentials = batchauth.SharedKeyCredentials(config._BATCH_ACCOUNT_NAME, + config._BATCH_ACCOUNT_KEY) batch_client = batch.BatchServiceClient( credentials, - base_url=_BATCH_ACCOUNT_URL) + batch_url=config._BATCH_ACCOUNT_URL) try: # Create the pool that will contain the compute nodes that will execute the # tasks. - create_pool(batch_client, _POOL_ID) - + create_pool(batch_client, config._POOL_ID) + # Create the job that will run the tasks. - create_job(batch_client, _JOB_ID, _POOL_ID) + create_job(batch_client, config._JOB_ID, config._POOL_ID) - # Add the tasks to the job. Pass the input files and a SAS URL + # Add the tasks to the job. Pass the input files and a SAS URL # to the storage container for output files. - add_tasks(batch_client, _JOB_ID, input_files, output_container_sas_url) + add_tasks(batch_client, config._JOB_ID, + input_files, output_container_sas_url) # Pause execution until tasks reach Completed state. wait_for_tasks_to_complete(batch_client, - _JOB_ID, - datetime.timedelta(minutes=30)) + config._JOB_ID, + datetime.timedelta(minutes=30)) print(" Success! All tasks reached the 'Completed' state within the " - "specified timeout period.") + "specified timeout period.") - except batchmodels.batch_error.BatchErrorException as err: - print_batch_exception(err) - raise + except batchmodels.BatchErrorException as err: + print_batch_exception(err) + raise # Delete input container in storage print('Deleting container [{}]...'.format(input_container_name)) blob_client.delete_container(input_container_name) - # Print out some timing info end_time = datetime.datetime.now().replace(microsecond=0) @@ -392,10 +384,10 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout): # Clean up Batch resources (if the user so chooses). if query_yes_no('Delete job?') == 'yes': - batch_client.job.delete(_JOB_ID) + batch_client.job.delete(config._JOB_ID) if query_yes_no('Delete pool?') == 'yes': - batch_client.pool.delete(_POOL_ID) + batch_client.pool.delete(config._POOL_ID) print() input('Press ENTER to exit...') diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..a1fbc7b --- /dev/null +++ b/src/config.py @@ -0,0 +1,31 @@ +# ------------------------------------------------------------------------- +# +# THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, +# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. +# ---------------------------------------------------------------------------------- +# The example companies, organizations, products, domain names, +# e-mail addresses, logos, people, places, and events depicted +# herein are fictitious. No association with any real company, +# organization, product, domain name, email address, logo, person, +# places, or events is intended or should be inferred. +# -------------------------------------------------------------------------- + +# Global constant variables (Azure Storage account/Batch details) + +# import "config.py" in "batch_python_tutorial_ffmpeg.py" + +# Update the Batch and Storage account credential strings below with the values +# unique to your accounts. These are used when constructing connection strings +# for the Batch and Storage client objects. + +_BATCH_ACCOUNT_NAME = '' +_BATCH_ACCOUNT_KEY = '' +_BATCH_ACCOUNT_URL = '' +_STORAGE_ACCOUNT_NAME = '' +_STORAGE_ACCOUNT_KEY = '' +_POOL_ID = 'LinuxFfmpegPool' +_DEDICATED_POOL_NODE_COUNT = 0 +_LOW_PRIORITY_POOL_NODE_COUNT = 5 +_POOL_VM_SIZE = 'STANDARD_A1_v2' +_JOB_ID = 'LinuxFfmpegJob' diff --git a/src/requirements.txt b/src/requirements.txt index 15f1dba..f1e798d 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -1,2 +1,2 @@ -azure-batch==4.0.0 -azure-storage==0.36.0 \ No newline at end of file +azure-batch==6.0.0 +azure-storage-blob==1.4.0 \ No newline at end of file