I want to create parallel running. For each id in ids list create a new PythonOperator
how can I pass the ids list outside prepare_parameters function? or alternatively can I create the operations list in prepare_parameters call
prepare_parameters_operator >> operation >> end_tasks?
def prepare_parameters(**context):
dag_conf = context['dag_run'].conf
ids = dag_conf['ids'] if 'ids' in dag_conf else [1,2,3]
context['ti'].xcom_push(key='ids', value=ids)
def do_something(id, **context):
### do something###
prepare_parameters_operator = PythonOperator(python_callable=prepare_parameters,
task_id='prepare_parameters',
queue='default',
dag=dag)
operations = []
for id in ids: ### how to get the ids from Xcom? ###
operations.append(PythonOperator(python_callable=do_something,
task_id='do_something_{}'.format(id),
queue='default',
op_kwargs={'id': id},
dag=dag))
for operation in operations:
prepare_parameters_operator >> operation >> end_tasks

dag_confinitially?prepare_parameters(edited now)