5

I just began learning Airflow, but it is quite difficult to grasp the concept of Xcom. Therefore I wrote a dag like this:

from airflow import DAG
from airflow.utils.edgemodifier import Label

from datetime import datetime
from datetime import timedelta

from airflow.operators.bash import BashOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook

#For more default argument for a task (or creating templates), please check this website
#https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html#airflow.models.BaseOperator

default_args = {
    'owner': '...',
    'email': ['...'],
    'email_on_retry': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2021, 6, 10, 23, 0, 0, 0),
    
}

hook = SSHHook(
    remote_host='...',
    username='...',
    password='...## Heading ##',
    port=22,
)

with DAG(
    'test_dag',
    description='This is my first DAG to learn BASH operation, SSH connection, and transfer data among jobs',
    default_args=default_args,
    start_date=datetime(2021, 6, 10, 23, 0, 0, 0),
    schedule_interval="0 * * * *",
    tags = ['Testing', 'Tutorial'],
) as dag:
    # Declare Tasks
    Read_my_IP = BashOperator(
        # Task ID has to be the combination of alphanumeric chars, dashes, dots, and underscores 
        task_id='Read_my_IP',
        # The last line will be pushed to next task
        bash_command="hostname -i | awk '{print $1}'",
    )

    Read_remote_IP = SSHOperator(
        task_id='Read_remote_IP',
        ssh_hook=hook,
        environment={
            'Pi_IP': Read_my_IP.xcom_pull('Read_my_IP'),
        },
        command="echo {{Pi_IP}}",
    )

    # Declare Relationship between tasks
    Read_my_IP >> Label("PI's IP address") >> Read_remote_IP

The first task ran successfully, but I could not obtain the XCom return_value from task Read_my_IP, which is the IP address of the local machine. This might be very basic, but the documentation does not mention how to declare task_instance.

Please help to complete the Xcom flow and pass the IP address from the local machine to the remote machine for further procedure.

1 Answer 1

5

The command parameter of SSHOperator is templated thus you can get the xcom directly:

Read_remote_IP = SSHOperator(
    task_id='Read_remote_IP',
    ssh_hook=hook,
    command="echo {{ ti.xcom_pull(task_ids='Read_my_IP') }}"
)

Note that you need also to explicitly ask for xcom to be pushed from BashOperator (see operator description):

Read_my_IP = BashOperator(
    # Task ID has to be the combination of alphanumeric chars, dashes, dots, and underscores 
    task_id='Read_my_IP',
    # The last line will be pushed to next task
    bash_command="hostname -i | awk '{print $1}'",
    do_xcom_push=True
)
Sign up to request clarification or add additional context in comments.

3 Comments

I would also like to ask if there is a way that I can assign the xcom_pull result to an environment variable and assign them into command since it might be nasty when variables increases.
@Elad, I have been facing the same issue with the airflow xcom . I am not able to pass my value from xompush to SQL
ti.xcom_push(key="QueryTimeStamp_{}".format(country), value=data) , This doesn't seem to work. when I am pulling the data using ti.xcom_pull(task_ids='Match_Updated_date_{}', key='QueryTimeStamp_{}')".format(country,country)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.