0

Is there a way to create a table in snowflake from a pandas dataframe in python just using the snowflake connector and pandas library? Main goal here is to just take a pandas dataframe and use the schema to create a new table in a specific data warehouse/database/schema in snowflake. I have seen examples of how to do this with sqlalchemy which I am trying to avoid but worst case I will just use that.

I have tried other methods inclduing sqlalchemy, using the snowflake uploading method using the PUT command but wanted to ask if anyone had an alternative that just uses the snowflake connector, pandas and does require me to save the data to my drive locally or use sqlalchemy.

Appreciate any input, or feedback on how to write better questions too.

*Note:

write_pandas - snowflake connector function can only append tables that already exist.

df.to_sql - only works with sqlalchemy or sqlite3 connections so don't think snowflake conn would work but could be wrong?

I have used the snowflake connector functions write_pandas(), and pd_writer() with the pandas function to_sql(). The issue here is that the pandas to_sql() function in the docs states that the connection can only be "consqlalchemy.engine.(Engine or Connection) or sqlite3.Connection". I would prefer to keep using the snowflake-connector for python. Without using sqlalchemy I know I could do the following:

.ini config file for connecting to database (named db.ini)

[database_name]
user = user7
pass = s$cret
acc = jhn675f
wh = 22jb7tyo5
db = dev_env546
db_schema = hubspot

python module for connecting to snowflake db and code to execute

import configparser
import pandas as pd
import snowflake.connector

config = configparser.ConfigParser()
config.read('db.ini')

sn_user = config['database_name']['user']
sn_password = config['database_name']['pass']
sn_account = config['database_name']['acc']
sn_warehouse = config['database_name']['wh']
sn_database = config['database_name']['db']
sn_schema= config['database_name']['db_schema']

ctx = snowflake.connector.connect(
    user=sn_user ,
    password = sn_password 
    account=sn_account ,
    warehouse=sn_warehouse ,
    database=sn_database ,
    schema=sn_schema
    )

cs = ctx.cursor()

query_extract = '''
    select table1.field1,
    table1.field2,
    table1.field3,
    table1.field4,
    table1.field5,
    table1.field6,
    table1.field7,
    table2.field2,
    table2.field5,
    table2.field7,
    table2.field9,
    table3.field1,
    table3.field6
    from database.schema.table1
    left join database.schema.table2
    on table1.field3 = table2.field1
    left join database.schema.table3
    on table1.field5 = table3.field1
'''
try:
    cs.execute(query_extract)
    df = cs.fetch_pandas_all()
except:
    #throw exception here

# clean data in the dataframe and perform some calcs
# store results in new dataframe called df_final
# would like to just use df_final to create a table in snowflake based on df_final schema and datatypes
# right now I am not sure how to do that

Current methods or alternatives

import configparser
import pandas as pd
import snowflake.connector

config = configparser.ConfigParser()
config.read('db.ini')

sn_user = config['database_name']['user']
sn_password = config['database_name']['pass']
sn_account = config['database_name']['acc']
sn_warehouse = config['database_name']['wh']
sn_database = config['database_name']['db']
sn_schema= config['database_name']['db_schema']

ctx = snowflake.connector.connect(
    user=sn_user ,
    password = sn_password 
    account=sn_account ,
    warehouse=sn_warehouse ,
    database=sn_database ,
    schema=sn_schema
    )

cs = ctx.cursor()

query_extract = '''
    select table1.field1,
    table1.field2,
    table1.field3,
    table1.field4,
    table1.field5,
    table1.field6,
    table1.field7,
    table2.field2,
    table2.field5,
    table2.field7,
    table2.field9,
    table3.field1,
    table3.field6
    from database.schema.table1
    left join database.schema.table2
    on table1.field3 = table2.field1
    left join database.schema.table3
    on table1.field5 = table3.field1
'''
try:
    cs.execute(query_extract)
    df = cs.fetch_pandas_all()
except:
    #throw exception here

df_final.to_csv('data/processed_data') 

create_stage = '''create stage processed_data_stage
  copy_options = (on_error='skip_file');'''

create_file_format = '''create or replace file format processed_data_stage
type = 'csv' field_delimiter = ',';'''

upload_file = '''put file:/data/processed_data.csv @processed_data_stage;'''

Other alternative is just embracing sqlalchemy and the pandas_to_sql function

from snowflake.connector.pandas_tools import pd_writer
import pandas as pd
from sqlalchemy import create_engine

account_identifier = '<account_identifier>'
user = '<user_login_name>'
password = '<password>'
database_name = '<database_name>'
schema_name = '<schema_name>'

conn_string = f"snowflake://{user}:{password}@{account_identifier}/{database_name}/{schema_name}"
engine = create_engine(conn_string)

#Create your DataFrame

table_name = 'cities'
df = pd.DataFrame(data=[['Stephen','Oslo'],['Jane','Stockholm']],columns=['Name','City'])

#What to do if the table exists? replace, append, or fail?

if_exists = 'replace'

#Write the data to Snowflake, using pd_writer to speed up loading

with engine.connect() as con:
        df.to_sql(name=table_name.lower(), con=con, if_exists=if_exists, method=pd_writer)
1
  • You can use .json format. What I read on the snowflake page, they can read .json on the other side pandas can write .json. Commented Feb 2, 2023 at 20:05

1 Answer 1

4

So for anyone who is doing something similar I ended up finding a pandas function that is not documented but can extract the schema from a dataframe.

The pandas library has a function called pd.io.sql.get_schema() that can return a string formatted as an sql query based on a dataframe and table name. So you can do:

import configparser
import pandas as pd
import snowflake.connector

config = configparser.ConfigParser()
config.read('db.ini')

sn_user = config['database_name']['user']
sn_password = config['database_name']['pass']
sn_account = config['database_name']['acc']
sn_warehouse = config['database_name']['wh']
sn_database = config['database_name']['db']
sn_schema= config['database_name']['db_schema']

ctx = snowflake.connector.connect(
    user=sn_user ,
    password = sn_password 
    account=sn_account ,
    warehouse=sn_warehouse ,
    database=sn_database ,
    schema=sn_schema
    )

cs = ctx.cursor()

query_extract = '''
    select table1.field1,
    table1.field2,
    table1.field3,
    table1.field4,
    table1.field5,
    table1.field6,
    table1.field7,
    table2.field2,
    table2.field5,
    table2.field7,
    table2.field9,
    table3.field1,
    table3.field6
    from database.schema.table1
    left join database.schema.table2
    on table1.field3 = table2.field1
    left join database.schema.table3
    on table1.field5 = table3.field1
'''
try:
    cs.execute(query_extract)
    df = cs.fetch_pandas_all()
except:
    #throw exception here

# clean data in the dataframe and perform some calcs
# store results in new dataframe called df_final

df_final.columns = map(lambda x: str(x).upper(), df_final.columns)

tb_name = 'NEW_TABLE'
df_schema = pd.io.sql.get_schema(df_final, tb_name)
df_schema = str(df_schema).replace('TEXT', 'VARCAHR(256)')

#use replace to remove characters and quotes that might give you syntax errors

cs.execute(df_schema)

success, nchunks, nrows, _ = write_pandas(ctx, df, tb_name)

cs.close()
ctx.close()

I skipped over parts but basically you can do:

  1. establish connection to snowflake
  2. extract tables from snowflake into pandas dataframe
  3. clean dataframe and perform transformations and save into new dataframe
  4. use pd.io.sql.get_schema to get a string sql query based on the pandas dataframe you want to load into snowflake
  5. Use the string of the sql query with your connection and database cursor to execute the create table command based on df schema
  6. use the snowflake write_pandas command to write your df to the newly created snowflake table
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.