3

I'm working on Windows, with Airflow set up via Docker. I've got a number of python scripts in windows that read and write from multiple locations in windows (SSH connections, Windows folders etc). It'll be a lot of work to replicate all of these inputs inside my Docker image, and so what I'm looking to do is get Airflow to execute these scripts as if they're running in Windows.

Is this possible, if so how?

Here's the script that I'm running as my DAG:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# Following are defaults which can be overridden later on
default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2018, 11, 27),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('Helloworld', default_args=default_args)

###########################################################
# Here's where I want to execute my windows python script #
###########################################################
t1=PythonOperator(dag=dag,
               task_id='my_task_powered_by_python',
               provide_context=False,
               python_callable=r"C:\Users\user\Documents\script.py")

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Hello World from Task 2"',
    dag=dag)

t3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Hello World from Task 3"',
    dag=dag)

t4 = BashOperator(
    task_id='task_4',
    bash_command='echo "Hello World from Task 4"',
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t2)
t4.set_upstream(t3)

1 Answer 1

2

You have a python script path passed into the PythonOperator, the PythonOperator is looking for a Python code object, not script file path.

You have two different options to accomplish calling these python scripts.

Call Script Directly Via Bash and BashOperator

You can use the BashOperator as you have above to call the python script directly.

You can accomplish by calling the Python script in the same way you would if you weren't using Airflow using the following command in your BashOperator

python script.py

Move Script and use PythonOperator

If these scripts are only called from Airflow, I would consider moving them into your Python code base and calling whichever entrypoint function you have as needed.

Ex

airflowHome/
  dags/
  plugins/
  scripts/
    __init__.py
    script1.py
    script2.py

You will now be able to access your scripts in the scripts module with Python imports. From there you can call a specific function from inside your DAG using the PythonOperator.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks @andscoop for your response. I've modified my code now so that python is correctly being called from the scripts folder. t1 = BashOperator( task_id='task_1', bash_command='python /usr/local/airflow/scripts/script1.py', dag=dag) I'm still having the same issue however, in that my python script is being ran as a bash process in docker, rather than as a windows process.
I did not catch that specific issue when I read through the first time - I would not think what you're asking is possible as docker containers are meant to contain the processes within them.
Ah that's a shame. I'm intrigued by how people use Airflow within Docker then. For me, if everything needs to be handled within Docker I think it'd be easier to just install a full version of Ubuntu and develop everything within there. I'd be surprised if people are developing complex ETL pipelines all within Docker.
Why wouldn't you be able to develop within docker? I have built several ETL pipelines within Docker. I have also seen pipelines which leverage AIrflow inside of Docker but also pyspark in a separate container to handle some of the more complex operators. Reach out to me via a link in profile if you'd like discuss further

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.