1

I want to use Python to add an activity to a pipeline in Azure Data Factory. With the following code I am replacing the actual activity but not adding a new one:

p_name = 'test'
act_name = 'Wait4'

Wait_activity = WaitActivity(name=act_name,wait_time_in_seconds=5)


p_obj = PipelineResource(activities=[Wait_activity])
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)

This is the pipeline before running the code : enter image description here

After running the code:

enter image description here

Expected :

enter image description here

1 Answer 1

2

Researched the statements in source code:

enter image description here

So when you update the pipeline, the activities property should be the list of activities in pipeline, not single one.

For example:

wait_activity = WaitActivity(name="waittest", type="Wait", wait_time_in_seconds=100, )
ActivityDependency = [{"activity":"waittest","dependencyConditions":["Succeeded"]}]
wait_activity1 = WaitActivity(name="waittest1", type="Wait", wait_time_in_seconds=100,depends_on=ActivityDependency)


p_name = 'testforadf'
p_obj = PipelineResource(
        activities=[wait_activity, wait_activity1])
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)

Please note two lines:

activities=[wait_activity, wait_activity1])

This property should contains all of your activities.

ActivityDependency = [{"activity":"waittest","dependencyConditions":["Succeeded"]}]

This is the dependency conditions between your activities.

My output:

enter image description here

Any concern, please let me know.


Well,please see my sample code:

The premise is that I already have the above two wait activities

adftest = adf_client.pipelines.get(rg_name,df_name,p_name)
print(adftest)
for activity in adftest.activities :
    print(activity.name)
    print(activity.type)

Then output is :

{'additional_properties': None, 'id': '/subscriptions/b83c1ed3-c5b6-44fb-b5ba-2b83a074c23f/resourceGroups/v-jugong-ChinaCXPTeam/providers/Microsoft.DataFactory/factories/jaygongadf/pipelines/testforadf', 'name': 'testforadf', 'type': 'Microsoft.DataFactory/factories/pipelines', 'etag': 'ed006cf3-0000-0800-0000-5da970600000', 'description': None, 'activities': [<azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FEDE0F0>, <azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FED6DA0>], 'parameters': None, 'variables': None, 'concurrency': None, 'annotations': None, 'folder': None}
waittest
Wait
waittest1
Wait

Then you could see the objects in above activities property. Besides,you could see their types: 'activities': [<azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FEDE0F0>, <azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FED6DA0>]

They are WaitActivity type, so you could view their loop the activity to get every item in it using :

for activity in adftest.activities :
        print(activity.name)
        print(activity.type)

You could view what properties the WaitActivity type contains, likename,type in source code statements.(For me, i used Pycharm to test code,the IDE could detect source code directly)

enter image description here

Then if you want to add one more activity,for example, one more WaitActivity:

wait_activity2 = WaitActivity(name="waittest2", type="Wait", wait_time_in_seconds=100, )
adftest.activities.append(wait_activity2)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, adftest)

Please see above code, i created a new WaitActivity named wait_activity2 ,then append it into activities array. Then update the pipeline as normal, you will find the new activity :

enter image description here

Sign up to request clarification or add additional context in comments.

12 Comments

It is not exactly what I am looking for. Let say that I have a Pipeline that already exist with several activities and I want to add an activity to this Pipeline with Python. I don't want to recreate the entire pipeline. So with your solution, if I am right, you are suggesting me to get all the current activities of the pipeline and recreate the entire pipeline. So is there a way to get the list of activities with all the property of a Pipeline ?
Firstly,i have to say that sdk only provides create_or_update method, no any methods like append or add. I think you could use adf_client.pipelines.get() to get the pipeline then replace the activities property in it,then update the entire pipeline.
After getting the pipeline, you could get the value of activities, then replace the value and update the pipeline.
If you already have some activities already, the activities property should be an array. Just append the new activity into it and update the pipeline. Hope i'm clear here.
Yes thank you for all these informations, I will try the adf_client.pipelines.get() method.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.