7

I am trying to setup dynamic sequence etl jobs that will use XCOM to get data from the first task that runs. Here is the current code:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime as dt, timedelta as td, date
from airflow.models import BaseOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable

START_DT = dt.combine(dt.today(), dt.min.time())
END_DT = dt.combine(dt.today(), dt.max.time())
NOW = dt.now()
CURRENT_EXEC = '{{ execution_date }}'
TODAY_MD = dt.today().strftime("%m%d")

def datetime_range(start, end, delta):
    """Generates the date range with time separation"""
    current = start
    if not isinstance(delta, td):
            delta = td(**delta)
    while current < end:
        yield current
        current += delta

default_args = {
        'owner': 'test',
        'depends_on_past': False,
        'start_date': START_DT, 
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'queue': 'etl',
        'retries': 1,
        'retry_delay': td(minutes=1),
}

dag_name = 'SEQ_TEST_01'

dag = DAG(dag_id=dag_name, default_args=default_args, schedule_interval=td(minutes=30))

def seq_job(sq_dt, **kwargs):
    for count, dt_in in enumerate(datetime_range(START_DT, END_DT, {'minutes':30}), 1):
        if sq_dt < str(dt_in):
            curr_seq = count, dt_in, dt_in + td(minutes=29, seconds=59)
            sequence = int(curr_seq[0])
            return sequence

pycall = PythonOperator(
    task_id='seq_sensor',
    provide_context=True,
    python_callable=seq_job,
    op_kwargs={'sq_dt': CURRENT_EXEC},
    dag=dag)

def group(grp, **context):
    sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
    grp = '%0.2d' % grp
    database = 'TEST'
    today_date = '{{ ds_nodash }}'
    return BashOperator(
           task_id='ETL_GRP{}_{}_{}'.format(database, sequence, gap),
           bash_command='script.sh {} {} {} {}'.format(today_date, sequence, database, grp), 
           dag=dag)

complete = DummyOperator(
        task_id='All_Sequences_complete',
        dag=dag)

pycall >> group(1) >> complete
pycall >> group(2) >> complete
pycall >> group(3) >> complete

Issue is that no matter what i try, i keep getting this error:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 263, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 66, in <module>
    pycall >> group(1) >> complete
  File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 56, in group
    sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
KeyError: 'task_instance'

Not sure if it's something small I am missing, or if I have everything wrong. Still new to airflow and trying to setup our ETL test env to run every 30 minutes with unique sequence number which is generated by datetime_range and is based on on execution_date variable.

3 Answers 3

4

Try to use context['ti'] instead.

Sign up to request clarification or add additional context in comments.

1 Comment

Tried that and still no go :(
4

I solved it by moving the bash operator to another function and pull the data from python operator via:

def bash_out(group, **kwargs):
        sequence = "{{ task_instance.xcom_pull(task_ids='seq_sensor') }}"
        return BashOperator(task_id='ETL_{}_GRP{}'.format(database, group), bash_command='script.sh {} {} {} {}'.format(today_date, database, sequence, group), dag=dag)

and setting the dependancies:

pycall >> [bash_out('01'), bash_out('02'), bash_out('03')] >> complete

Comments

2

Make sure 'provide_context': True is present in the default_args.

1 Comment

This fixed my case, thanks!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.