locopy.snowflake module

Snowflake Module Module to wrap a database adapter into a Snowflake class which can be used to connect to Snowflake, and run arbitrary code.

class locopy.snowflake.Snowflake(profile=None, kms_key=None, dbapi=None, config_yaml=None, **kwargs)[source]

Bases: S3, Database

Locopy class which manages connections to Snowflake. Inherits Database and implements the specific COPY INTO functionality.

Parameters:
  • profile (str, optional) – The name of the AWS profile to use which is typically stored in the credentials file. You can also set environment variable AWS_DEFAULT_PROFILE which would be used instead.

  • kms_key (str, optional) – The KMS key to use for encryption If kms_key Defaults to None then the AES256 ServerSideEncryption will be used.

  • dbapi (DBAPI 2 module, optional) – A database adapter which is Python DB API 2.0 compliant (snowflake.connector)

  • config_yaml (str, optional) – String representing the YAML file location of the database connection keyword arguments. It is worth noting that this should only contain valid arguments for the database connector you plan on using. It will throw an exception if something is passed through which isn’t valid.

  • **kwargs – Database connection keyword arguments.

profile

String representing the AWS profile for authentication

Type:

str

kms_key

String representing the s3 kms key

Type:

str

session

Hold the AWS session credentials / info

Type:

boto3.Session

s3

Hold the S3 client object which is used to upload/delete files to S3

Type:

botocore.client.S3

dbapi

database adapter which is Python DBAPI 2.0 compliant (snowflake.connector)

Type:

DBAPI 2 module

connection

Dictionary of database connection items

Type:

dict

conn

DBAPI connection instance

Type:

dbapi.connection

cursor

DBAPI cursor instance

Type:

dbapi.cursor

Raises:
connect()[source]

Creates a connection to the Snowflake cluster by setting the values of the conn and cursor attributes.

Raises:

DBError – If there is a problem establishing a connection to Snowflake.

copy(table_name, stage, file_type='csv', format_options=None, copy_options=None)[source]

Executes the COPY INTO <table> command to load CSV files from a stage into a Snowflake table. If file_type == csv and format_options == None, format_options will default to: ["FIELD_DELIMITER='|'", "SKIP_HEADER=0"]

Parameters:
  • table_name (str) – The Snowflake table name which is being loaded. Must be fully qualified: <namespace>.<table_name>

  • stage (str) – Stage location of the load file. This can be a internal or external stage

  • file_type (str) – The file type. One of csv, json, or parquet

  • format_options (list) – List of strings of format options to provide to the COPY INTO command. The options will typically be in the format of ["a=b", "c=d"]

  • copy_options (list) – List of strings of copy options to provide to the COPY INTO command.

Raises:

DBError – If there is a problem executing the COPY command, or a connection has not been initalized.

download_from_internal(stage, local=None, parallel=10)[source]

Download file(s) from a internal stage via the GET command.

Parameters:
  • stage (str) – Internal stage location to load the file.

  • local (str, optional) – The local directory path where files will be downloaded to. Defualts to the current working directory (os.getcwd()). Otherwise it must be the absolute path.

  • parallel (int, optional) – Specifies the number of threads to use for downloading files.

insert_dataframe_to_table(dataframe, table_name, columns=None, create=False, metadata=None)[source]

Insert a Pandas dataframe to an existing table or a new table. In newer versions of the python snowflake connector (v2.1.2+) users can call the write_pandas method from the cursor directly, insert_dataframe_to_table is a custom implementation and does not use write_pandas. Instead of using COPY INTO the method builds a list of tuples to insert directly into the table. There are also options to create the table if it doesn’t exist and use your own metadata. If your data is significantly large then using COPY INTO <table> is more appropriate.

Parameters:
  • dataframe (Pandas Dataframe) – The pandas dataframe which needs to be inserted.

  • table_name (str) – The name of the Snowflake table which is being inserted.

  • columns (list, optional) – The list of columns which will be uploaded.

  • create (bool, optional) – Boolean flag if a new table need to be created and insert to.

  • metadata (dictionary, optional) – If metadata==None, it will be generated based on data

to_dataframe(size=None)[source]

Return a dataframe of the last query results. This is just a convenience method. This method overrides the base classes implementation in favour for the snowflake connectors built-in fetch_pandas_all when size==None. If size != None then we will continue to use the existing functionality where we iterate through the cursor and build the dataframe.

Parameters:

size (int, optional) – Chunk size to fetch. Defaults to None.

Returns:

Dataframe with lowercase column names. Returns None if no fetched result.

Return type:

pandas.DataFrame

unload(stage, table_name, file_type='csv', format_options=None, header=False, copy_options=None)[source]

Executes the COPY INTO <location> command to export a query/table from Snowflake to a stage. If file_type == csv and format_options == None, format_options will default to: ["FIELD_DELIMITER='|'"]

Parameters:
  • stage (str) – Stage location (internal or external) where the data files are unloaded

  • table_name (str) – The Snowflake table name which is being unloaded. Must be fully qualified: <namespace>.<table_name>

  • file_type (str) – The file type. One of csv, json, or parquet

  • format_options (list) – List of strings of format options to provide to the COPY INTO command.

  • header (bool, optional) – Boolean flag if header is included in the file(s)

  • copy_options (list) – List of strings of copy options to provide to the COPY INTO command.

Raises:

DBError – If there is a problem executing the UNLOAD command, or a connection has not been initalized.

upload_to_internal(local, stage, parallel=4, auto_compress=True, overwrite=True)[source]

Upload file(s) to a internal stage via the PUT command.

Parameters:
  • local (str) – The local directory path to the file to upload. Wildcard characters (*, ?) are supported to enable uploading multiple files in a directory. Otherwise it must be the absolute path.

  • stage (str) – Internal stage location to load the file.

  • parallel (int, optional) – Specifies the number of threads to use for uploading files.

  • auto_compress (bool, optional) – Specifies if Snowflake uses gzip to compress files during upload. If True, the files are compressed (if they are not already compressed). if False, the files are uploaded as-is.

  • overwrite (bool, optional) – Specifies whether Snowflake overwrites an existing file with the same name during upload. If True, existing file with the same name is overwritten. if False, existing file with the same name is not overwritten.

locopy.snowflake.combine_options(options=None)[source]

Returns the copy_options or format_options attribute with spaces in between and as a string. If options is None then return an empty string.

Parameters:

options (list, optional) – list of strings which is to be converted into a single string with spaces inbetween. Defaults to None

Returns:

options attribute with spaces in between

Return type:

str