0

I am trying to run a python script that logs into Amazon Redshift DB and then execute a SQL command. I use a tool called Airflow for workflow management. When running the below code, I am able to login fine to the DB but when trying to execute the SQL command get the below error.

**AttributeError: 'NoneType' object has no attribute 'execute'**

Code:

## Login to DB

def db_log(**kwargs):
  global db_con
  try:
    db_con = psycopg2.connect(
       " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439'")
  except:
    print("I am unable to connect")
    print('Connection Task Complete')
    task_instance = kwargs['task_instance']
    task_instance.xcom_push(key="dwh_connection" , value = "dwh_connection")
    return (dwh_connection)


def insert_data(**kwargs):
  task_instance = kwargs['task_instance']
  db_con_xcom = task_instance.xcom_pull(key="dwh_connection", task_ids='DWH_Connect')
  cur = db_con_xcom
  cur.execute("""insert into tbl_1 select limit 2 ;""")

Could anyone help me fix this. Thanks..

Complete code:

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io

# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 29, 12),
'email': ['[email protected]']
}

dag = DAG('sample1', default_args=default_args)

## Login to DB

def db_log(**kwargs):
  global db_con
  try:
    db_con = psycopg2.connect(
       " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439'")
  except:
    print("I am unable to connect")
    print('Connection Task Complete')
    task_instance = kwargs['task_instance']
    task_instance.xcom_push(key="dwh_connection" , value = "dwh_connection")
    return (dwh_connection)



t1 = PythonOperator(
  task_id='DWH_Connect',
  python_callable=data_warehouse_login,provide_context=True,
  dag=dag)

#######################

def insert_data(**kwargs):
  task_instance = kwargs['task_instance']
  db_con_xcom = task_instance.xcom_pull(key="dwh_connection", task_ids='DWH_Connect')
  cur = db_con_xcom
  cur.execute("""insert into tbl_1 select limit 2 """)


##########################################

t2 = PythonOperator(
  task_id='DWH_Connect1',
  python_callable=insert_data,provide_context=True,dag=dag)

t1 >> t2
4
  • 2
    the reason you get that error is that the cur object you created didn't get a value. Check if the task_instance.xcom_pull worked Commented May 30, 2018 at 11:05
  • 1
    Please put full code. For example I don't know what your task DWH_Connect does or how you have coded it. Commented May 30, 2018 at 11:14
  • @kaxil, have updated the initial message with the complete code Commented May 30, 2018 at 11:25
  • I would not recommend sending a connection object in Xcom. Use Xcom to send metadata not a connection object. Commented May 30, 2018 at 16:07

1 Answer 1

0

Are you sure you've added the entirety of your code? You call the data_warehouse_login function in the first task's python_callable but that is undefined. Assuming that this is meant to be db_log and the first task was successful, you're not actually xcom-ing anything to the second task (as your xcom_push only triggers on error).

Generally wouldn't advise xcom-ing a connection object anyway. Alternatively, you may want to consider using the included PostgresHook, which should cover your use case and works equally well with Amazon Redshift.

https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/postgres_hook.py

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.