1

I am trying to execute a Airflow script that consists of a couple of Python functions. These functions basically query a database and perform few tasks. I am trying to execute this is Airflow so that I would be able to monitor each of these functions seperately. Given below is the code I am trying to execute and get the below error

Subtask: NameError: name 'task_instance' is not defined

## Third party Library Imports

import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 23, 12),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")


#######################
## Login to DB


def db_log(**kwargs):
    global db_con
    try:
    db_con = psycopg2.connect(
" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = 'port' sslmode = 'require' ")
    except:
        print("Connection Failed.")
        print('Connected successfully')
        task_instance = kwargs['task_instance']
        task_instance.xcom_push(value="db_con", key="db_log")
        return (db_con)

def insert_data(**kwargs):
    v1 = task_instance.xcom_pull(key="db_con", task_ids='db_log')
    return (v1)
    cur = db_con.cursor()
    cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")

#def job_run():
#    db_log()
#    insert_data()


##########################################

t1 = PythonOperator(
task_id='Connect',
python_callable=db_log,provide_context=True,
dag=dag)

t2 = PythonOperator(
task_id='Query',
python_callable=insert_data,provide_context=True,
dag=dag)


t1 >> t2

Could anyone assist on this. Thanks..

Update 1 :

Encountered an error

AttributeError: 'NoneType' object has no attribute 'execute'

pointing to the last line on the above piece of code

cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")

Complete code

Complete code:

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io

# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 29, 12),
'email': ['[email protected]']
}

dag = DAG('sample1', default_args=default_args)

## Login to DB

def db_log(**kwargs):
  global db_con
  try:
    db_con = psycopg2.connect(
       " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439'")
  except:
    print("I am unable to connect")
    print('Connection Task Complete')
    task_instance = kwargs['task_instance']
    task_instance.xcom_push(key="dwh_connection" , value = "dwh_connection")
    return (dwh_connection)



t1 = PythonOperator(
  task_id='DWH_Connect',
  python_callable=data_warehouse_login,provide_context=True,
  dag=dag)

#######################

def insert_data(**kwargs):
  task_instance = kwargs['task_instance']
  db_con_xcom = task_instance.xcom_pull(key="dwh_connection", task_ids='DWH_Connect')
  cur = db_con_xcom
  cur.execute("""insert into tbl_1 select limit 2 """)


##########################################

t2 = PythonOperator(
  task_id='DWH_Connect1',
  python_callable=insert_data,provide_context=True,dag=dag)

t1 >> t2
2
  • @tobi6, could you please advice on the above. Tnx.. Commented May 30, 2018 at 6:49
  • @tobi6, I removed the indentation portion of the code (have pasted the function in the original message) along with the error message.. Commented May 30, 2018 at 14:56

2 Answers 2

5

This is a basic error message from Python.

NameError: name 'task_instance' is not defined

tells that task_instance is nowhere to be found when you want to use it.

The task instance is provided in the context which is already being passed to the function.

Airflow sends the context with the setting

provide_context=True,

within the task. Also the definition accepts kwargs:

def insert_data(**kwargs):

which is also correct.

Correction

You first need to take the task instance out of the context like so:

task_instance = kwargs['task_instance']

Then you can use the task instance to use xcom_pull. So it should look like this (put in a few comments as well):

def insert_data(**kwargs):
    task_instance = kwargs['task_instance']
    db_con_xcom = task_instance.xcom_pull(key="db_con", task_ids='db_log')
    #return (v1)  # wrong, why return here?
    #cur = db_con.cursor()  # wrong, db_con might not be available
    cur = db_con_xcom
    cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")
Sign up to request clarification or add additional context in comments.

1 Comment

thanks, encountered an Attribute error on running the updated code. Have pasted the error as part of my initial message. Could you please advice. Tnx..
1

Since the question is becoming bigger I think it is appropriate to add a second answer.

Even after the edit from the comment "I removed the indentation portion of the code" I am still not sure about this bit of code:

def db_log(**kwargs):
  global db_con
  try:
    db_con = psycopg2.connect(
       " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439'")
  except:
    print("I am unable to connect")
    print('Connection Task Complete')
    task_instance = kwargs['task_instance']
    task_instance.xcom_push(key="dwh_connection" , value = "dwh_connection")
    return (dwh_connection)

It should look like this:

def db_log(**kwargs):
  global db_con
  try:
    db_con = psycopg2.connect(
       " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439'")
  except:
    print("I am unable to connect")

  print('Connection Task Complete')
  task_instance = kwargs['task_instance']
  task_instance.xcom_push(key="dwh_connection" , value = "dwh_connection")
  #return (dwh_connection)  # don't need a return here

Besides that the idea in your other question (Python - AttributeError: 'NoneType' object has no attribute 'execute') to use a PostgresHook seems interesting to me. You might want to pursue that thought in the other question.

2 Comments

thanks I was about to send you a update on the indentation as that was throwing an error earlier. I have modified as per your latest code and was able to establish the connection successfully. However it throws an error insert_data function. I have used the same code as per what you have written in your previous post in this message. Get an error AttributeError: 'str' object has no attribute 'execute' on the line cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""").. Could you please guide on this.. Tnx..
Again, this is another question. Again, please do not add logs in comments. It would be more helpful to update your other question stackoverflow.com/questions/50603012/… since the error is very, very similar.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.