locopy package

Submodules

Module contents

A Python library to assist with ETL processing.

class locopy.Database(dbapi, config_yaml=None, **kwargs)[source]

Bases: object

Base class for all DBAPI 2 database connectors which will inherit this functionality.

The Database class will manage connections and handle executing queries. Most of the functionality should work out of the box for classes which inherit minus the abstract method for connect which may vary across databases.

Parameters:
  • dbapi (DBAPI 2 module, optional) – A database adapter which is Python DB API 2.0 compliant (psycopg2, pg8000, etc.)

  • config_yaml (str, optional) – String representing the YAML file location of the database connection keyword arguments. It is worth noting that this should only contain valid arguments for the database connector you plan on using. It will throw an exception if something is passed through which isn’t valid.

  • **kwargs – Database connection keyword arguments.

dbapi

database adapter which is Python DBAPI 2.0 compliant

Type:

DBAPI 2 module

connection

Dictionary of database connection items

Type:

dict

conn

DBAPI connection instance

Type:

dbapi.connection

cursor

DBAPI cursor instance

Type:

dbapi.cursor

Raises:

CredentialsError – Database credentials are not provided, valid, or both kwargs and a YAML config was provided.

column_names()[source]

Pull column names out of the cursor description.

Depending on the DBAPI, it could return column names as bytes: b'column_name'.

Returns:

List of column names, all in lower-case

Return type:

list

connect()[source]

Create a connection to a database.

Sets the values of the conn and cursor attributes.

Raises:

DBError – If there is a problem establishing a connection.

disconnect()[source]

Terminate the connection.

Closes the values of the conn and cursor attributes.

Raises:

DBError – If there is a problem disconnecting from the database.

execute(sql, commit=True, params=(), many=False, verbose=True)[source]

Execute some sql against the connection.

Parameters:
  • sql (str) – SQL to run against the connection. Could be one or multiple statements.

  • commit (Boolean, default True) – Whether to “commit” the commands to the cluster immediately or not.

  • params (iterable of parameters) – Parameters to submit with the query. The exact syntax will depend on the database adapter you are using

  • many (bool, default False) – Whether to execute the script as an “execute many”

  • verbose (bool, default True) – Whether to print executed query

Raises:
  • DBError – if a problem occurs executing the sql statement

  • DBError – If a connection to the database cannot be made

to_dataframe(df_type='pandas', size=None)[source]

Return a dataframe of the last query results.

Parameters:
  • df_type (Literal["pandas","polars"], optional) – Output dataframe format. Defaults to pandas.

  • size (int, optional) – Chunk size to fetch. Defaults to None.

Returns:

Dataframe with lowercase column names. Returns None if no fetched result.

Return type:

pandas.DataFrame or polars.DataFrame

to_dict()[source]

Generate dictionaries of rows.

Yields:

dict – Each row, encoded as a dict.

class locopy.Redshift(profile=None, kms_key=None, dbapi=None, config_yaml=None, **kwargs)[source]

Bases: S3, Database

Locopy class which manages connections to Redshift.

Inherits Database and implements the specific COPY and UNLOAD functionality.

If any of host, port, dbname, user and password are not provided, a config_yaml file must be provided with those parameters in it. Please note ssl is always enforced when connecting.

Parameters:
  • profile (str, optional) – The name of the AWS profile to use which is typical stored in the credentials file. You can also set environment variable AWS_DEFAULT_PROFILE which would be used instead.

  • kms_key (str, optional) – The KMS key to use for encryption If kms_key Defaults to None then the AES256 ServerSideEncryption will be used.

  • dbapi (DBAPI 2 module, optional) – A database adapter which is Python DB API 2.0 compliant (psycopg2, pg8000, etc.)

  • config_yaml (str, optional) – String representing the YAML file location of the database connection keyword arguments. It is worth noting that this should only contain valid arguments for the database connector you plan on using. It will throw an exception if something is passed through which isn’t valid.

  • **kwargs – Database connection keyword arguments.

profile

String representing the AWS profile for authentication

Type:

str

kms_key

String representing the s3 kms key

Type:

str

session

Hold the AWS session credentials / info

Type:

boto3.Session

s3

Hold the S3 client object which is used to upload/delete files to S3

Type:

botocore.client.S3

dbapi

database adapter which is Python DBAPI 2.0 compliant

Type:

DBAPI 2 module

connection

Dictionary of database connection items

Type:

dict

conn

DBAPI connection instance

Type:

dbapi.connection

cursor

DBAPI cursor instance

Type:

dbapi.cursor

Raises:
connect()[source]

Create a connection to the Redshift cluster.

Sets the values of the conn and cursor attributes.

Raises:

DBError – If there is a problem establishing a connection to Redshift.

copy(table_name, s3path, delim='|', copy_options=None)[source]

Execute the COPY command to load files from S3 into a Redshift table.

Parameters:
  • table_name (str) – The Redshift table name which is being loaded

  • s3path (str) – S3 path of the input file. eg: s3://path/to/file.csv

  • delim (str, optional) – None for non-delimited file type. Defaults to |

  • copy_options (list) – List of strings of copy options to provide to the COPY command. Will have default options added in.

Raises:

DBError – If there is a problem executing the COPY command, a connection has not been initalized, or credentials are wrong.

insert_dataframe_to_table(dataframe, table_name, columns=None, create=False, metadata=None, batch_size=1000, verbose=False)[source]

Insert a Pandas or Polars dataframe to an existing table or a new table.

executemany in psycopg2 and pg8000 has very poor performance in terms of running speed. To overcome this issue, we instead format the insert query and then run execute.

Parameters:
  • dataframe (pandas.DataFrame or polars.DataFrame) – The pandas dataframe which needs to be inserted.

  • table_name (str) – The name of the Snowflake table which is being inserted.

  • columns (list, optional) – The list of columns which will be uploaded.

  • create (bool, default False) – Boolean flag if a new table need to be created and insert to.

  • metadata (dictionary, optional) – If metadata==None, it will be generated based on data

  • batch_size (int, default 1000) – The number of records to insert in each batch

  • verbose (bool, default False) – Whether or not to print out insert query

load_and_copy(local_file, s3_bucket, table_name, delim='|', copy_options=None, delete_s3_after=False, splits=1, compress=True, s3_folder=None)[source]

Load a file to S3, then copies into Redshift.

Has options to split a single file into multiple files, compress using gzip, and upload to an S3 bucket with folders within the bucket.

Notes

If you are using folders in your S3 bucket please be aware of having special chars or backward slashes (\). These may cause the file to upload but fail on the COPY command.

By default locopy will handle the splitting of files for you, in order to reduce complexity in uploading to s3 and generating the COPY command.

It is critical to ensure that the S3 location you are using that it only contains the files you want to load. In the case of a “folder” it should only contain the files you want to load. For a bucket the file name should be unique enough as any extensions get striped out in favour of the file prefix.

Parameters:
  • local_file (str) – The local file which you wish to copy. This can be a folder for non-delimited file type like parquet

  • s3_bucket (str) – The AWS S3 bucket which you are copying the local file to.

  • table_name (str) – The Redshift table name which is being loaded

  • delim (str, optional) – Delimiter for Redshift COPY command. None for non-delimited files. Defaults to |.

  • copy_options (list, optional) – A list (str) of copy options that should be appended to the COPY statement. The class will insert a default for DATEFORMAT, COMPUPDATE and TRUNCATECOLUMNS if they are not provided in this list if PARQUET is not part of the options passed in See http://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html for options which could be passed.

  • delete_s3_after (bool, optional) – Lets you specify to delete the S3 file after transfer if you want.

  • splits (int, optional) – Number of splits to perform for paralell loading into Redshift. Must be greater than 0. Recommended that this number should be less than 100. Defaults to 1.

  • compress (bool, optional) – Whether to compress the output file with gzip or leave it raw. Defaults to True

  • s3_folder (str, optional) – The AWS S3 folder of the bucket which you are copying the local file to. Defaults to None. Please note that you must follow the / convention when using subfolders.

unload(query, s3path, unload_options=None)[source]

Execute the UNLOAD command to export a query from Redshift to S3.

Parameters:
  • query (str) – A query to be unloaded to S3.

  • s3path (str) – S3 path for the output files.

  • unload_options (list) – List of string unload options.

Raises:

DBError – If there is a problem executing the UNLOAD command, a connection has not been initalized, or credentials are wrong.

unload_and_copy(query, s3_bucket, s3_folder=None, raw_unload_path=None, export_path=False, delim=',', delete_s3_after=True, parallel_off=False, unload_options=None)[source]

Unload data from Redshift.

With options to write to a flat file and store on S3.

Parameters:
  • query (str) – A query to be unloaded to S3. A SELECT query

  • s3_bucket (str) – The AWS S3 bucket where the data from the query will be unloaded.

  • s3_folder (str, optional) – The AWS S3 folder of the bucket where the data from the query will be unloaded. Defaults to None. Please note that you must follow the / convention when using subfolders.

  • raw_unload_path (str, optional) – The local path where the files will be copied to. Defaults to the current working directory (os.getcwd()).

  • export_path (str, optional) – If a export_path is provided, function will concatenate and write the unloaded files to this path as a single file. If your file is very large you may not want to use this option.

  • delim (str, optional) – Delimiter for unloading and file writing. Defaults to a comma. If None, this option will be ignored

  • delete_s3_after (bool, optional) – Delete the files from S3 after unloading. Defaults to True.

  • parallel_off (bool, optional) – Unload data to S3 as a single file. Defaults to False. Not recommended as it will decrease speed.

  • unload_options (list, optional) – A list of unload options that should be appended to the UNLOAD statement.

Raises:

Exception – If no files are generated from the unload. If the column names from the query cannot be retrieved. If there is a issue with the execution of any of the queries.

class locopy.S3(profile=None, kms_key=None, **kwargs)[source]

Bases: object

S3 wrapper class.

Utilizes the boto3 library to push files to an S3 bucket.

Parameters:
  • profile (str, optional) – The name of the AWS profile to use which is typical stored in the credentials file. You can also set environment variable AWS_DEFAULT_PROFILE which would be used instead.

  • kms_key (str, optional) – The KMS key to use for encryption If kms_key Defaults to None then the AES256 ServerSideEncryption will be used.

  • **kwargs – Optional keyword arguments.

profile

String representing the AWS profile for authentication

Type:

str

kms_key

String representing the s3 kms key

Type:

str

session

Hold the AWS session credentials / info

Type:

boto3.Session

s3

Hold the S3 client object which is used to upload/delete files to S3

Type:

botocore.client.S3

Raises:
delete_from_s3(bucket, key)[source]

Delete a file from an S3 bucket.

Parameters:
  • bucket (str) – The AWS S3 bucket from which you are deleting the file.

  • key (str) – The name of the S3 object.

Raises:

S3DeletionError – If there is a issue deleting from the S3 bucket

delete_list_from_s3(s3_list)[source]

Delete a list of files from an S3 bucket.

Parameters:

s3_list (list) – List of strings with the s3 paths of the files to delete. The strings should not include the s3:// scheme.

download_from_s3(bucket, key, local)[source]

Download a file from a S3 bucket.

Parameters:
  • bucket (str) – The AWS S3 bucket which you are copying the local file to.

  • key (str) – The key to name the S3 object.

  • local (str) – The local file which you wish to copy to.

Raises:

S3DownloadError – If there is a issue downloading to the S3 bucket

download_list_from_s3(s3_list, local_path=None)[source]

Download a list of files from s3.

Parameters:
  • s3_list (list) – List of strings with the s3 paths of the files to download

  • local_path (str, optional) – The local path where the files will be copied to. Defualts to the current working directory (os.getcwd())

Returns:

Returns a list of strings of the local file names

Return type:

list

parse_s3_url(s3_url)[source]

Extract the bucket and key from a s3 url.

Parameters:

s3_url (str) – s3 url. The string can include the s3:// scheme (which is disgarded)

Returns:

  • bucket (str) – s3 bucket

  • key (str) – s3 key

upload_list_to_s3(local_list, bucket, folder=None)[source]

Upload a list of files to a S3 bucket.

Parameters:
  • local_list (list) – List of strings with the file paths of the files to upload

  • bucket (str) – The AWS S3 bucket which you are copying the local file to.

  • folder (str, optional) – The AWS S3 folder of the bucket which you are copying the local files to. Defaults to None. Please note that you must follow the / convention when using subfolders.

Returns:

Returns a list of the generated S3 bucket and keys of the files which were uploaded. The S3:// part is NOT include. The output would look like the following: ["my-bucket/key1", "my-bucket/key2", ...]

Return type:

list

Notes

There is a assumption that if you are loading multiple files (via splits) it follows a structure such as file_name.extension.# (# splits). It allows for the COPY statement to use the key prefix vs specificing an exact file name. The returned list helps with this process downstream.

upload_to_s3(local, bucket, key)[source]

Upload a file to a S3 bucket.

Parameters:
  • local (str) – The local file which you wish to copy.

  • bucket (str) – The AWS S3 bucket which you are copying the local file to.

  • key (str) – The key to name the S3 object.

Raises:

S3UploadError – If there is a issue uploading to the S3 bucket

class locopy.Snowflake(profile=None, kms_key=None, dbapi=None, config_yaml=None, **kwargs)[source]

Bases: S3, Database

Locopy class which manages connections to Snowflake. Inherits Database.

Implements the specific COPY INTO functionality.

Parameters:
  • profile (str, optional) – The name of the AWS profile to use which is typically stored in the credentials file. You can also set environment variable AWS_DEFAULT_PROFILE which would be used instead.

  • kms_key (str, optional) – The KMS key to use for encryption If kms_key Defaults to None then the AES256 ServerSideEncryption will be used.

  • dbapi (DBAPI 2 module, optional) – A database adapter which is Python DB API 2.0 compliant (snowflake.connector)

  • config_yaml (str, optional) – String representing the YAML file location of the database connection keyword arguments. It is worth noting that this should only contain valid arguments for the database connector you plan on using. It will throw an exception if something is passed through which isn’t valid.

  • **kwargs – Database connection keyword arguments.

profile

String representing the AWS profile for authentication

Type:

str

kms_key

String representing the s3 kms key

Type:

str

session

Hold the AWS session credentials / info

Type:

boto3.Session

s3

Hold the S3 client object which is used to upload/delete files to S3

Type:

botocore.client.S3

dbapi

database adapter which is Python DBAPI 2.0 compliant (snowflake.connector)

Type:

DBAPI 2 module

connection

Dictionary of database connection items

Type:

dict

conn

DBAPI connection instance

Type:

dbapi.connection

cursor

DBAPI cursor instance

Type:

dbapi.cursor

Raises:
connect()[source]

Create a connection to the Snowflake cluster.

Setg the values of the conn and cursor attributes.

Raises:

DBError – If there is a problem establishing a connection to Snowflake.

copy(table_name, stage, file_type='csv', format_options=None, copy_options=None)[source]

Load files from a stage into a Snowflake table.

Execute the COPY INTO <table> command to If file_type == csv and format_options == None, format_options will default to: ["FIELD_DELIMITER='|'", "SKIP_HEADER=0"].

Parameters:
  • table_name (str) – The Snowflake table name which is being loaded. Must be fully qualified: <namespace>.<table_name>

  • stage (str) – Stage location of the load file. This can be a internal or external stage

  • file_type (str) – The file type. One of csv, json, or parquet

  • format_options (list) – List of strings of format options to provide to the COPY INTO command. The options will typically be in the format of ["a=b", "c=d"]

  • copy_options (list) – List of strings of copy options to provide to the COPY INTO command.

Raises:

DBError – If there is a problem executing the COPY command, or a connection has not been initalized.

download_from_internal(stage, local=None, parallel=10)[source]

Download file(s) from a internal stage via the GET command.

Parameters:
  • stage (str) – Internal stage location to load the file.

  • local (str, optional) – The local directory path where files will be downloaded to. Defualts to the current working directory (os.getcwd()). Otherwise it must be the absolute path.

  • parallel (int, optional) – Specifies the number of threads to use for downloading files.

insert_dataframe_to_table(dataframe, table_name, columns=None, create=False, metadata=None)[source]

Insert a Pandas or Polars dataframe to an existing table or a new table.

In newer versions of the python snowflake connector (v2.1.2+) users can call the write_pandas method from the cursor directly, insert_dataframe_to_table is a custom implementation and does not use write_pandas. Instead of using COPY INTO the method builds a list of tuples to insert directly into the table. There are also options to create the table if it doesn’t exist and use your own metadata. If your data is significantly large then using COPY INTO <table> is more appropriate.

Parameters:
  • dataframe (Pandas or Polars Dataframe) – The pandas or polars dataframe which needs to be inserted.

  • table_name (str) – The name of the Snowflake table which is being inserted.

  • columns (list, optional) – The list of columns which will be uploaded.

  • create (bool, optional) – Boolean flag if a new table need to be created and insert to.

  • metadata (dictionary, optional) – If metadata==None, it will be generated based on data

to_dataframe(df_type='pandas', size=None)[source]

Return a dataframe of the last query results.

This is just a convenience method. This method overrides the base classes implementation in favour for the snowflake connectors built-in fetch_pandas_all when size==None. If size != None then we will continue to use the existing functionality where we iterate through the cursor and build the dataframe.

Parameters:
  • df_type (Literal["pandas","polars"], optional) – Output dataframe format. Defaults to pandas.

  • size (int, optional) – Chunk size to fetch. Defaults to None.

Returns:

Dataframe with lowercase column names. Returns None if no fetched result.

Return type:

pandas.DataFrame or polars.DataFrame

unload(stage, table_name, file_type='csv', format_options=None, header=False, copy_options=None)[source]

Export a query/table from Snowflake to a stage.

Execute the COPY INTO <location> command.

If file_type == csv and format_options == None, format_options will default to: ["FIELD_DELIMITER='|'"].

Parameters:
  • stage (str) – Stage location (internal or external) where the data files are unloaded

  • table_name (str) – The Snowflake table name which is being unloaded. Must be fully qualified: <namespace>.<table_name>

  • file_type (str) – The file type. One of csv, json, or parquet

  • format_options (list) – List of strings of format options to provide to the COPY INTO command.

  • header (bool, optional) – Boolean flag if header is included in the file(s)

  • copy_options (list) – List of strings of copy options to provide to the COPY INTO command.

Raises:

DBError – If there is a problem executing the UNLOAD command, or a connection has not been initalized.

upload_to_internal(local, stage, parallel=4, auto_compress=True, overwrite=True)[source]

Upload file(s) to a internal stage via the PUT command.

Parameters:
  • local (str) – The local directory path to the file to upload. Wildcard characters (*, ?) are supported to enable uploading multiple files in a directory. Otherwise it must be the absolute path.

  • stage (str) – Internal stage location to load the file.

  • parallel (int, optional) – Specifies the number of threads to use for uploading files.

  • auto_compress (bool, optional) – Specifies if Snowflake uses gzip to compress files during upload. If True, the files are compressed (if they are not already compressed). if False, the files are uploaded as-is.

  • overwrite (bool, optional) – Specifies whether Snowflake overwrites an existing file with the same name during upload. If True, existing file with the same name is overwritten. if False, existing file with the same name is not overwritten.