7

I have a situation where I need to find a specific folder in S3 to pass onto a PythonOperator in an Airflow script. I am doing this using another PythonOperator that finds the correct directory. I can successfully either xcom.push() or Variable.set() and read it back within the PythonOperator. The problem is, I need to pass this variable onto a separate PythonOperator that uses code in a python library. Therefore, I need to Variable.get() or xcom.pull() this variable within the main part of the Airflow script. I have searched quite a bit and can't seem to figure out if this is possible or not. Below is some code for reference:

    def check_for_done_file(**kwargs):

    ### This function does a bunch of stuff to find the correct S3 path to 
    ### populate target_dir, this has been verified and works

    Variable.set("target_dir", done_file_list.pop())
    test = Variable.get("target_dir")
    print("TEST: ", test)

    #### END OF METHOD, BEGIN MAIN

with my_dag:

   ### CALLING METHOD FROM MAIN, POPULATING VARIABLE

   check_for_done_file_task = PythonOperator(
      task_id = 'check_for_done_file',
      python_callable = check_for_done_file,
      dag = my_dag,
      op_kwargs = {
          "source_bucket" : "my_source_bucket",
          "source_path" : "path/to/the/s3/folder/I/need"
      }
   )

   target_dir = Variable.get("target_dir") # I NEED THIS VAR HERE.

   move_data_to_in_progress_task = PythonOperator(
       task_id = 'move-from-incoming-to-in-progress',
       python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
       dag = my_dag,
       op_kwargs = {
           "source_bucket" : "source_bucket",
           "source_path" : "path/to/my/s3/folder/" + target_dir,
           "destination_bucket" : "destination_bucket",
           "destination_path" : "path/to/my/s3/folder/" + target_dir,
           "recurse" : True
       }
    )

So, is the only way to accomplish this to augment the library to look for the "target_dir" variable? I don't think Airflow main has a context, and therefore what I want to do may not be possible. Any Airflow experts, please weigh in to let me know what my options might be.

1 Answer 1

4

op_kwargs is a templated field. So you can use xcom_push:

def check_for_done_file(**kwargs):
    ...
    kwargs['ti'].xcom_push(value=y)

and use jinja template in op_kwargs:

   move_data_to_in_progress_task = PythonOperator(
       task_id = 'move-from-incoming-to-in-progress',
       python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
       dag = my_dag,
       op_kwargs = {
           "source_bucket" : "source_bucket",
           "source_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
           "destination_bucket" : "destination_bucket",
           "destination_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
           "recurse" : True
       }
    )

Also, add provide_context=True to your check_for_done_file_task task to pass context dictionary to callables.

Sign up to request clarification or add additional context in comments.

4 Comments

the line kwargs['ti'].xcom_push(value=done_file_list.pop()) - (changed to push my partiulcar value) results in a stack trace: Subtask check_for_done_file KeyError: 'ti' [2019-11-11 09:52:26,241] {logging_mixin.py:95} INFO - [2019-11-11 09:52:26,240] {jobs.py:2562} INFO - Task exited with return code 1
add provide_context=True to your check_for_done_file_task task and try again
This was very helpful and solved my problem. Thank you. Just for clarification, I needed to return the value in 'check_for_done_file_task', as opposed to doing a xcom.push().
Oh yes :) You can use xcom just by returning a value or specifying a key. Example: github.com/apache/airflow/blob/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.