write_pandas snowflake connector function is not able to operate on table
Asked Answered
H

7

5

I am working on a python script that is designed to process some data, create a table if not exists, and truncate the table before inserting a refreshed dataset. I am using a role that has usage, read, write, create table permissions, as well stage permissions set as follows:

grant usage, read, write on future stages in schema <schema> to role <role>

I am using the write_pandas function in python via the snowflake connector. The documentation says that this function uses PUT and Copy Into commands:

To write the data to the table, the function saves the data to Parquet files, uses the PUT command to upload these files to a temporary stage, and uses the COPY INTO <table> command to copy the data from the files to the table. You can use some of the function parameters to control how the PUT and COPY INTO <table> statements are executed.

I still get the error message that I am unable to operate on the schema, and I am not sure what else I need to add. Does someone have the list of permissions that are required to run the write_pandas command?

Halfway answered 2/3, 2021 at 0:27 Comment(2)
Do you have a code sample you can share? What exactly is the error you're seeing?Almazan
#create table if not exists in schema sql = create_table_string cur.execute(sql) print('Table created successfully in Snowflake') #Truncate and write to table sql = "TRUNCATE IF EXISTS <table_name>;" cur.execute(sql) print('Truncate table created successfully in Snowflake') write_pandas(conn, df2, " <table_name>") print('Table written successfully in Snowflake') # Close your cursor and your connection. cur.close() conn.close() Everything works up until the write_pandas function, where I get Insufficient privileges to operate on schema <schema_name>Halfway
F
6

write_pandas() does not create the table automatically. You need to create the table by yourself if the table does not exist beforehand. For each time you run write_pandas(), it will just append the dataframe to the table you specified.

On the other hand, if you use df.to_sql(..., method=pd_writer) to write pandas dataframe into snowflake, it will create the table automatically for you, and you can use if_exists in to_sql() to specify different behaviors - append, replace, or fail - if the table already exists.

Foredate answered 4/5, 2021 at 7:26 Comment(1)
see my solution on how to create the tableHandy
H
5

I have a rather inelegant solution that gets the job done for table creation and appends, all without leaving my Jupyter.

I keep this code in my sql utility file. The get_col_types function will create a dictionary of col names and dtypes needed to create the table.

def get_col_types(df):
    
    '''
        Helper function to create/modify Snowflake tables; gets the column and dtype pair for each item in the dataframe

        
        args:
            df: dataframe to evaluate
            
    '''
        
    import numpy as np
    
    # get dtypes and convert to df
    ct = df.dtypes.reset_index().rename(columns={0:'col'})
    ct = ct.apply(lambda x: x.astype(str).str.upper()) # case matching as snowflake needs it in uppers
        
    # only considers objects at this point
    # only considers objects and ints at this point
    ct['col'] = np.where(ct['col']=='OBJECT', 'VARCHAR', ct['col'])
    ct['col'] = np.where(ct['col'].str.contains('DATE'), 'DATETIME', ct['col'])
    ct['col'] = np.where(ct['col'].str.contains('INT'), 'NUMERIC', ct['col'])
    ct['col'] = np.where(ct['col'].str.contains('FLOAT'), 'FLOAT', ct['col'])
    
    # get the column dtype pair
    l = []
    for index, row in ct.iterrows():
        l.append(row['index'] + ' ' + row['col'])
    
    string = ', '.join(l) # convert from list to a string object
    
    string = string.strip()
    
    return string


def create_table(table, action, col_type, df):
    
    '''
        Function to create/replace and append to tables in Snowflake
        
        args:
            table: name of the table to create/modify
            action: whether do the initial create/replace or appending; key to control logic
            col_type: string with column name associated dtype, each pair separated by a comma; comes from get_col_types() func
            df: dataframe to load
            
        dependencies: function get_col_types(); helper function to get the col and dtypes to create a table
    '''
    
    import pandas as pd
    import snowflake.connector as snow
    from snowflake.connector.pandas_tools import write_pandas  
    from snowflake.connector.pandas_tools import pd_writer
   
    database=database
    warehouse=warehouse
    schema=schema
    
    # set up connection
    conn = snow.connect(
               account = ACCOUNT,
               user = USER,
               password = PW,
               warehouse = warehouse,
               database = database,
               schema = schema,
               role = ROLE)    

    # set up cursor
    cur = conn.cursor()
    
    if action=='create_replace':
    
        # set up execute
        cur.execute(
            """ CREATE OR REPLACE TABLE 
            """ + table +"""(""" + col_type + """)""") 

        #prep to ensure proper case
        df.columns = [col.upper() for col in df.columns]

        # write df to table
        write_pandas(conn, df, table.upper())
        
    elif action=='append':
        
        # convert to a string list of tuples
        df = str(list(df.itertuples(index=False, name=None)))
        # get rid of the list elements so it is a string tuple list
        df = df.replace('[','').replace(']','')
        
        # set up execute
        cur.execute(
            """ INSERT INTO """ + table + """
                VALUES """ + df + """

            """)  

Working example:

# create df
l1 = ['cats','dogs','frogs']   
l2 = [10, 20, 30]
df = pd.DataFrame(zip(l1,l2), columns=['type','age'])
col_type = get_col_types(df)
create_table('table_test', 'create_replace', col_type, df)

# now that the table is created, append to it
l1 = ['cow','cricket']   
l2 = [45, 20]
df2 = pd.DataFrame(zip(l1,l2), columns=['type','age'])
append_table('table_test', 'append', None, df2)
  
Handy answered 24/8, 2021 at 23:27 Comment(2)
This works only if you are appending a data frame having rows less than 16,384. Type error: It should be create_table instead of append_tableGodly
In the append section, we can directly use write_pandas(conn, df, table.upper()) to copy the full data frame at once if the data frame contains more than 16,384.Godly
B
2

In the community spirit of StackOverflow, I made some changes / cleanup to the answer that @Cory Randolph posted.

import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas  

conn = snowflake.connector.connect(
    user=USER,
    password=PASSWORD,
    account=ACCOUNT,
    database=DATABASE,    
    role=ROLE,
    warehouse=WAREHOUSE, 
    schema=ML)


def get_table_metadata(df):
    def map_dtypes(x):
        if (x == 'object') or (x=='category'):
            return 'VARCHAR'
        elif 'date' in x:
            return 'DATETIME'
        elif 'int' in x:
            return 'NUMERIC'  
        elif 'float' in x: return 'FLOAT' 
        else:
            print("cannot parse pandas dtype")
    sf_dtypes = [map_dtypes(str(s)) for s in df.dtypes]
    table_metadata = ", ". join([" ".join([y.upper(), x]) for x, y in zip(sf_dtypes, list(df.columns))])
    return table_metadata


def df_to_snowflake_table(table_name, operation, df, conn=conn): 
    if operation=='create_replace':
        df.columns = [c.upper() for c in df.columns]
        table_metadata = get_table_metadata(df)
        conn.cursor().execute(f"CREATE OR REPLACE TABLE {table_name} ({table_metadata})")
        write_pandas(conn, df, table_name.upper())
    elif operation=='insert':
        table_rows = str(list(df.itertuples(index=False, name=None))).replace('[','').replace(']','')
        conn.cursor().execute(f"INSERT INTO {table_name} VALUES {table_rows}")  

Then

df_to_snowflake_table('table_test', 'create_replace', df)  

and/or

df_to_snowflake_table('table_test', 'insert', df)
Ballerina answered 8/2, 2022 at 4:33 Comment(0)
E
1

Windows 10, Python 3.9.4, Snowflake-Connector-Python 2.4.2, Pandas 1.1.5

  • I have same problem with write_pandas function.

  • I have accountadmin privileges on Snowflake. Python code and error traceback are enclosed below.

  • However, if I were to explicitly write a CSV file, I can upload the data from the CSV file by using the two functions:

  1. "put file://" (into Snowflake staging) and
  2. "copy into from" (Snowflake staging).

So, it's definitely something with write_pandas function.

```import pandas as pd
```import snowflake.connector
```...
```from snowflake.connector.pandas_tools import write_pandas
```conn = snowflake.connector.connect(
```        user=strSnowflakeUserLogin,
```        password=strSnowflakeUserPassword,
```        account=strSnowflakeAccount,
```        role=strSnowflakeUserRole,
```        warehouse=strSnoflakeWarehouse,
```        database=strSnowflakeDatabase,
```        schema=strSnowflakeSchema
```        )

Traceback (most recent call last):
  File "myPython.py", line xxx, in <module> myPythonModule()
    write_pandas(conn, df, strSnowflakeTable)
  File "C:\Users\<username>\AppData\Local\Programs\Python\Python39\lib\site-packages\snowflake\connector\pandas_tools.py", line 197, in write_pandas
    copy_results = cursor.execute(copy_into_sql, _is_internal=True).fetchall()
  File "C:\Users\<username>\AppData\Local\Programs\Python\Python39\lib\site-packages\snowflake\connector\cursor.py", line 692, in execute
    Error.errorhandler_wrapper(
  File "C:\Users\<username>\AppData\Local\Programs\Python\Python39\lib\site-packages\snowflake\connector\errors.py", line 258, in errorhandler_wrapper
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "C:\Users\<username>\AppData\Local\Programs\Python\Python39\lib\site-packages\snowflake\connector\errors.py", line 188, in default_errorhandler
    raise error_class(
snowflake.connector.errors.ProgrammingError: 001757 (42601): SQL compilation error:
Table 'mySnowflakeTable' does not exist

```...
```write_pandas(conn, df, strSnowflakeTable)
Ezzo answered 11/4, 2021 at 4:59 Comment(1)
take a look at my solutionHandy
Q
1

If you are not able to write in the table itself, I guess this issues comes when snowflake is not able to read /warehouse/database/scheme . So you can use these just before executing any command in try and catch both sections.

session.use_warehouse('warehouse Name')
session.use_database('Database Name')
session.use_schema('schema Name')
Quartered answered 10/11, 2023 at 17:23 Comment(0)
E
0

@Christopher solution was very helpful for making this a repeatable/dynamic process.

I updated the get_col_types function a little, but same performance.

def get_col_types(df) -> str:
    '''
        Helper function to create/modify Snowflake tables; gets the column and dtype pair for each item in the dataframe
        
        Args:
            df: dataframe to evaluate
        
        Returns:
            String with the formated column name and the converted snowflake data type.
            Example: 'COL_A FLOAT, COL_B DATETIME, COL_C FLOAT, COL_D NUMERIC, COL_E VARCHAR'        
    '''
        
    import numpy as np
    
    # Get dtypes and convert to df
    df_col_types = df.dtypes.reset_index()
    df_col_types = df_col_types.rename(columns={'index': 'col_name', 0:'dtype'})
    df_col_types = df_col_types.apply(lambda x: x.astype(str).str.upper()) # Case matching as snowflake needs it in uppers
        
    # Create the mapping from Dataframe types to Snowflake data types
    df_col_types['dtype'] = np.where(df_col_types['dtype']=='OBJECT', 'VARCHAR', df_col_types['dtype'])
    df_col_types['dtype'] = np.where(df_col_types['dtype'].str.contains('DATE'), 'DATETIME', df_col_types['dtype'])
    df_col_types['dtype'] = np.where(df_col_types['dtype'].str.contains('INT'), 'NUMERIC', df_col_types['dtype'])
    df_col_types['dtype'] = np.where(df_col_types['dtype'].str.contains('FLOAT'), 'FLOAT', df_col_types['dtype'])
    df_col_types['dtype'] = np.where(df_col_types['dtype'].str.contains('CATEGORY'), 'VARCHAR', df_col_types['dtype'])
    
    # Get the column dtype pairs
    df_col_types['dtype_pairs'] = df_col_types.apply(lambda row: row['col_name'] + " " + row['dtype'], axis = 1)
    col_type_pair_str = ' '.join(df_col_types['dtype_pairs'])

    return col_type_pair_str
Esque answered 3/9, 2021 at 19:16 Comment(0)
S
0

Many of these answers seem outdated.

In terms of permissions, you need to be able to create schema and tables. You also need to be able to write to the table (and in your case truncate, but that doesn't have anything to do with write_pandas).

Currently the snowflake-python-connector's write_pandas function allows table creation with the auto_create_table=True setting.

I cut out a lot of details of our implementation, but you should be able to do something like this..


class SnowflakeDestination
    # SnowflakeDestination is a class that writes data to or 
    # interacts with Snowflake
    # ... other stuff here

    def _write_df(self, df, details):
        handler = DataFrameHandler()
        df = handler.infer_string_column_types(df=df)
        return write_pandas(conn=self._conn, df=df, use_logical_type=True, **details)
    
    def write_df(self, df, details):
        current_time = datetime.utcnow()
        df["_record_loaded_at"] = current_time

        database = details["database"] + self._db_suffix
        schema = details["schema"]
        table = details["table_name"]
        self._create_schema(f"{database}.{schema}")

        new_details = {
            "database": database,
            "schema": schema,
            "table_name": table,
            "quote_identifiers": False,
            "auto_create_table": True,
        }

        return self._write_df(df, new_details)

class DataFrameHandler:
    def __init__(self) -> None:
        pass

    def infer_string_column_types(self, df: pd.DataFrame):
        for column in df.columns:
            if pd.api.types.is_string_dtype(df[column]):
                try:
                    df[column] = pd.to_numeric(df[column])
                except ValueError:
                    try:
                        df[column] = pd.to_datetime(
                            df[column], utc=True, errors="ignore"
                        )
                    except:
                        pass
        return df

write_pandas does not do any implicit conversions of strings to ints, floats, date/datetime, etc. So, it would be helpful to catch those and that's what the DataFrameHander class is for.

Southwesterly answered 7/2 at 11:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.