Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 48 additions & 46 deletions src/batch_python_tutorial_ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# unique to your accounts. These are used when constructing connection strings
# for the Batch and Storage client objects.


def query_yes_no(question, default="yes"):
"""
Prompts the user for yes/no input, displaying the specified question text.
Expand Down Expand Up @@ -92,11 +93,10 @@ def upload_file_to_container(block_blob_client, container_name, file_path):
block_blob_client.create_blob_from_path(container_name,
blob_name,
file_path)

# Obtain the SAS token for the container.
sas_token = get_container_sas_token(block_blob_client,
container_name, azureblob.BlobPermissions.READ)

container_name, azureblob.BlobPermissions.READ)

sas_url = block_blob_client.make_blob_url(container_name,
blob_name,
Expand All @@ -105,6 +105,7 @@ def upload_file_to_container(block_blob_client, container_name, file_path):
return batchmodels.ResourceFile(file_path=blob_name,
http_url=sas_url)


def get_container_sas_token(block_blob_client,
container_name, blob_permissions):
"""
Expand All @@ -130,9 +131,8 @@ def get_container_sas_token(block_blob_client,
return container_sas_token



def get_container_sas_url(block_blob_client,
container_name, blob_permissions):
container_name, blob_permissions):
"""
Obtains a shared access signature URL that provides write access to the
ouput container to which the tasks will upload their output.
Expand All @@ -146,10 +146,11 @@ def get_container_sas_url(block_blob_client,
"""
# Obtain the SAS token for the container.
sas_token = get_container_sas_token(block_blob_client,
container_name, azureblob.BlobPermissions.WRITE)
container_name, azureblob.BlobPermissions.WRITE)

# Construct SAS URL for the container
container_sas_url = "https://{}.blob.core.windows.net/{}?{}".format(config._STORAGE_ACCOUNT_NAME, container_name, sas_token)
container_sas_url = "https://{}.blob.core.windows.net/{}?{}".format(
config._STORAGE_ACCOUNT_NAME, container_name, sas_token)

return container_sas_url

Expand All @@ -174,16 +175,16 @@ def create_pool(batch_service_client, pool_id):

# The start task installs ffmpeg on each node from an available repository, using
# an administrator user identity.

new_pool = batch.models.PoolAddParameter(
id=pool_id,
virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
image_reference=batchmodels.ImageReference(
publisher="Canonical",
offer="UbuntuServer",
sku="18.04-LTS",
version="latest"
),
publisher="Canonical",
offer="UbuntuServer",
sku="18.04-LTS",
version="latest"
),
node_agent_sku_id="batch.node.ubuntu 18.04"),
vm_size=config._POOL_VM_SIZE,
target_dedicated_nodes=config._DEDICATED_POOL_NODE_COUNT,
Expand All @@ -193,13 +194,14 @@ def create_pool(batch_service_client, pool_id):
wait_for_success=True,
user_identity=batchmodels.UserIdentity(
auto_user=batchmodels.AutoUserSpecification(
scope=batchmodels.AutoUserScope.pool,
elevation_level=batchmodels.ElevationLevel.admin)),
)
scope=batchmodels.AutoUserScope.pool,
elevation_level=batchmodels.ElevationLevel.admin)),
)
)

batch_service_client.pool.add(new_pool)


def create_job(batch_service_client, job_id, pool_id):
"""
Creates a job with the specified ID, associated with the specified pool.
Expand All @@ -216,7 +218,8 @@ def create_job(batch_service_client, job_id, pool_id):
pool_info=batch.models.PoolInformation(pool_id=pool_id))

batch_service_client.job.add(job)



def add_tasks(batch_service_client, job_id, input_files, output_container_sas_url):
"""
Adds a task for each input file in the collection to the specified job.
Expand All @@ -234,26 +237,26 @@ def add_tasks(batch_service_client, job_id, input_files, output_container_sas_ur

tasks = list()

for idx, input_file in enumerate(input_files):
input_file_path=input_file.file_path
output_file_path="".join((input_file_path).split('.')[:-1]) + '.mp3'
command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(input_file_path, output_file_path)
for idx, input_file in enumerate(input_files):
input_file_path = input_file.file_path
output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3'
command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(
input_file_path, output_file_path)
tasks.append(batch.models.TaskAddParameter(
id='Task{}'.format(idx),
command_line=command,
resource_files=[input_file],
output_files=[batchmodels.OutputFile(
file_pattern=output_file_path,
destination=batchmodels.OutputFileDestination(
container=batchmodels.OutputFileBlobContainerDestination(
container_url=output_container_sas_url)),
upload_options=batchmodels.OutputFileUploadOptions(
upload_condition=batchmodels.OutputFileUploadCondition.task_success))]
)
)
file_pattern=output_file_path,
destination=batchmodels.OutputFileDestination(
container=batchmodels.OutputFileBlobContainerDestination(
container_url=output_container_sas_url)),
upload_options=batchmodels.OutputFileUploadOptions(
upload_condition=batchmodels.OutputFileUploadCondition.task_success))]
)
)
batch_service_client.task.add_collection(job_id, tasks)



def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
"""
Expand Down Expand Up @@ -289,7 +292,6 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
"timeout period of " + str(timeout))



if __name__ == '__main__':

start_time = datetime.datetime.now().replace(microsecond=0)
Expand All @@ -299,37 +301,37 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
# Create the blob client, for use in obtaining references to
# blob storage containers and uploading files to containers.


blob_client = azureblob.BlockBlobService(
account_name=config._STORAGE_ACCOUNT_NAME,
account_key=config._STORAGE_ACCOUNT_KEY)

# Use the blob client to create the containers in Azure Storage if they
# don't yet exist.

input_container_name = 'input'
output_container_name = 'output'
blob_client.create_container(input_container_name, fail_on_exist=False)
blob_client.create_container(output_container_name, fail_on_exist=False)
print('Container [{}] created.'.format(input_container_name))
print('Container [{}] created.'.format(output_container_name))

# Create a list of all MP4 files in the InputFiles directory.
# Create a list of all MP4 files in the InputFiles directory.
input_file_paths = []
for folder, subs, files in os.walk(os.path.join(sys.path[0],'InputFiles')):

for folder, subs, files in os.walk(os.path.join(sys.path[0], 'InputFiles')):
for filename in files:
if filename.endswith(".mp4"):
input_file_paths.append(os.path.abspath(os.path.join(folder, filename)))
input_file_paths.append(os.path.abspath(
os.path.join(folder, filename)))

# Upload the input files. This is the collection of files that are to be processed by the tasks.
# Upload the input files. This is the collection of files that are to be processed by the tasks.
input_files = [
upload_file_to_container(blob_client, input_container_name, file_path)
for file_path in input_file_paths]

# Obtain a shared access signature URL that provides write access to the output
# container to which the tasks will upload their output.

output_container_sas_url = get_container_sas_url(
blob_client,
output_container_name,
Expand All @@ -348,30 +350,30 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
# Create the pool that will contain the compute nodes that will execute the
# tasks.
create_pool(batch_client, config._POOL_ID)

# Create the job that will run the tasks.
create_job(batch_client, config._JOB_ID, config._POOL_ID)

# Add the tasks to the job. Pass the input files and a SAS URL
# Add the tasks to the job. Pass the input files and a SAS URL
# to the storage container for output files.
add_tasks(batch_client, config._JOB_ID, input_files, output_container_sas_url)
add_tasks(batch_client, config._JOB_ID,
input_files, output_container_sas_url)

# Pause execution until tasks reach Completed state.
wait_for_tasks_to_complete(batch_client,
config._JOB_ID,
datetime.timedelta(minutes=30))

print(" Success! All tasks reached the 'Completed' state within the "
"specified timeout period.")
"specified timeout period.")

except batchmodels.BatchErrorException as err:
print_batch_exception(err)
raise
print_batch_exception(err)
raise

# Delete input container in storage
print('Deleting container [{}]...'.format(input_container_name))
blob_client.delete_container(input_container_name)


# Print out some timing info
end_time = datetime.datetime.now().replace(microsecond=0)
Expand Down
16 changes: 8 additions & 8 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#-------------------------------------------------------------------------
#
# THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES
# -------------------------------------------------------------------------
#
# THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
#----------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------
# The example companies, organizations, products, domain names,
# e-mail addresses, logos, people, places, and events depicted
# herein are fictitious. No association with any real company,
# organization, product, domain name, email address, logo, person,
# places, or events is intended or should be inferred.
#--------------------------------------------------------------------------
# --------------------------------------------------------------------------

# Global constant variables (Azure Storage account/Batch details)

Expand All @@ -19,7 +19,7 @@
# unique to your accounts. These are used when constructing connection strings
# for the Batch and Storage client objects.

_BATCH_ACCOUNT_NAME =''
_BATCH_ACCOUNT_NAME = ''
_BATCH_ACCOUNT_KEY = ''
_BATCH_ACCOUNT_URL = ''
_STORAGE_ACCOUNT_NAME = ''
Expand All @@ -28,4 +28,4 @@
_DEDICATED_POOL_NODE_COUNT = 0
_LOW_PRIORITY_POOL_NODE_COUNT = 5
_POOL_VM_SIZE = 'STANDARD_A1_v2'
_JOB_ID = 'LinuxFfmpegJob'
_JOB_ID = 'LinuxFfmpegJob'