locopy package¶
Submodules¶
- locopy.database module
- locopy.errors module
- locopy.logger module
- locopy.redshift module
- locopy.s3 module
- locopy.snowflake module
Snowflake
Snowflake.profile
Snowflake.kms_key
Snowflake.session
Snowflake.s3
Snowflake.dbapi
Snowflake.connection
Snowflake.conn
Snowflake.cursor
Snowflake.connect()
Snowflake.copy()
Snowflake.download_from_internal()
Snowflake.insert_dataframe_to_table()
Snowflake.to_dataframe()
Snowflake.unload()
Snowflake.upload_to_internal()
combine_options()
- locopy.utility module
Module contents¶
A Python library to assist with ETL processing.
- class locopy.Database(dbapi, config_yaml=None, **kwargs)[source]¶
Bases:
object
Base class for all DBAPI 2 database connectors which will inherit this functionality.
The
Database
class will manage connections and handle executing queries. Most of the functionality should work out of the box for classes which inherit minus the abstract method forconnect
which may vary across databases.- Parameters:
dbapi (DBAPI 2 module, optional) – A database adapter which is Python DB API 2.0 compliant (
psycopg2
,pg8000
, etc.)config_yaml (str, optional) – String representing the YAML file location of the database connection keyword arguments. It is worth noting that this should only contain valid arguments for the database connector you plan on using. It will throw an exception if something is passed through which isn’t valid.
**kwargs – Database connection keyword arguments.
- dbapi¶
database adapter which is Python DBAPI 2.0 compliant
- Type:
DBAPI 2 module
- conn¶
DBAPI connection instance
- Type:
dbapi.connection
- cursor¶
DBAPI cursor instance
- Type:
dbapi.cursor
- Raises:
CredentialsError – Database credentials are not provided, valid, or both kwargs and a YAML config was provided.
- column_names()[source]¶
Pull column names out of the cursor description.
Depending on the DBAPI, it could return column names as bytes:
b'column_name'
.- Returns:
List of column names, all in lower-case
- Return type:
- connect()[source]¶
Create a connection to a database.
Sets the values of the
conn
andcursor
attributes.- Raises:
DBError – If there is a problem establishing a connection.
- disconnect()[source]¶
Terminate the connection.
Closes the values of the
conn
andcursor
attributes.- Raises:
DBError – If there is a problem disconnecting from the database.
- execute(sql, commit=True, params=(), many=False, verbose=True)[source]¶
Execute some sql against the connection.
- Parameters:
sql (str) – SQL to run against the connection. Could be one or multiple statements.
commit (Boolean, default True) – Whether to “commit” the commands to the cluster immediately or not.
params (iterable of parameters) – Parameters to submit with the query. The exact syntax will depend on the database adapter you are using
many (bool, default False) – Whether to execute the script as an “execute many”
verbose (bool, default True) – Whether to print executed query
- Raises:
- to_dataframe(df_type='pandas', size=None)[source]¶
Return a dataframe of the last query results.
- Parameters:
df_type (Literal["pandas","polars"], optional) – Output dataframe format. Defaults to pandas.
size (int, optional) – Chunk size to fetch. Defaults to None.
- Returns:
Dataframe with lowercase column names. Returns None if no fetched result.
- Return type:
pandas.DataFrame or polars.DataFrame
- class locopy.Redshift(profile=None, kms_key=None, dbapi=None, config_yaml=None, **kwargs)[source]¶
-
Locopy class which manages connections to Redshift.
Inherits
Database
and implements the specificCOPY
andUNLOAD
functionality.If any of host, port, dbname, user and password are not provided, a config_yaml file must be provided with those parameters in it. Please note ssl is always enforced when connecting.
- Parameters:
profile (str, optional) – The name of the AWS profile to use which is typical stored in the
credentials
file. You can also set environment variableAWS_DEFAULT_PROFILE
which would be used instead.kms_key (str, optional) – The KMS key to use for encryption If kms_key Defaults to
None
then the AES256 ServerSideEncryption will be used.dbapi (DBAPI 2 module, optional) – A database adapter which is Python DB API 2.0 compliant (
psycopg2
,pg8000
, etc.)config_yaml (str, optional) – String representing the YAML file location of the database connection keyword arguments. It is worth noting that this should only contain valid arguments for the database connector you plan on using. It will throw an exception if something is passed through which isn’t valid.
**kwargs – Database connection keyword arguments.
- session¶
Hold the AWS session credentials / info
- Type:
boto3.Session
- s3¶
Hold the S3 client object which is used to upload/delete files to S3
- Type:
botocore.client.S3
- dbapi¶
database adapter which is Python DBAPI 2.0 compliant
- Type:
DBAPI 2 module
- conn¶
DBAPI connection instance
- Type:
dbapi.connection
- cursor¶
DBAPI cursor instance
- Type:
dbapi.cursor
- Raises:
CredentialsError – Database credentials are not provided or valid
S3Error – Error initializing AWS Session (ex: invalid profile)
S3CredentialsError – Issue with AWS credentials
S3InitializationError – Issue initializing S3 session
- connect()[source]¶
Create a connection to the Redshift cluster.
Sets the values of the
conn
andcursor
attributes.- Raises:
DBError – If there is a problem establishing a connection to Redshift.
- copy(table_name, s3path, delim='|', copy_options=None)[source]¶
Execute the COPY command to load files from S3 into a Redshift table.
- Parameters:
table_name (str) – The Redshift table name which is being loaded
s3path (str) – S3 path of the input file. eg:
s3://path/to/file.csv
delim (str, optional) – None for non-delimited file type. Defaults to |
copy_options (list) – List of strings of copy options to provide to the
COPY
command. Will have default options added in.
- Raises:
DBError – If there is a problem executing the COPY command, a connection has not been initalized, or credentials are wrong.
- insert_dataframe_to_table(dataframe, table_name, columns=None, create=False, metadata=None, batch_size=1000, verbose=False)[source]¶
Insert a Pandas or Polars dataframe to an existing table or a new table.
executemany in psycopg2 and pg8000 has very poor performance in terms of running speed. To overcome this issue, we instead format the insert query and then run execute.
- Parameters:
dataframe (pandas.DataFrame or polars.DataFrame) – The pandas dataframe which needs to be inserted.
table_name (str) – The name of the Snowflake table which is being inserted.
columns (list, optional) – The list of columns which will be uploaded.
create (bool, default False) – Boolean flag if a new table need to be created and insert to.
metadata (dictionary, optional) – If metadata==None, it will be generated based on data
batch_size (int, default 1000) – The number of records to insert in each batch
verbose (bool, default False) – Whether or not to print out insert query
- load_and_copy(local_file, s3_bucket, table_name, delim='|', copy_options=None, delete_s3_after=False, splits=1, compress=True, s3_folder=None)[source]¶
Load a file to S3, then copies into Redshift.
Has options to split a single file into multiple files, compress using gzip, and upload to an S3 bucket with folders within the bucket.
Notes
If you are using folders in your S3 bucket please be aware of having special chars or backward slashes (
\
). These may cause the file to upload but fail on theCOPY
command.By default locopy will handle the splitting of files for you, in order to reduce complexity in uploading to s3 and generating the COPY command.
It is critical to ensure that the S3 location you are using that it only contains the files you want to load. In the case of a “folder” it should only contain the files you want to load. For a bucket the file name should be unique enough as any extensions get striped out in favour of the file prefix.
- Parameters:
local_file (str) – The local file which you wish to copy. This can be a folder for non-delimited file type like parquet
s3_bucket (str) – The AWS S3 bucket which you are copying the local file to.
table_name (str) – The Redshift table name which is being loaded
delim (str, optional) – Delimiter for Redshift
COPY
command. None for non-delimited files. Defaults to|
.copy_options (list, optional) – A list (str) of copy options that should be appended to the COPY statement. The class will insert a default for DATEFORMAT, COMPUPDATE and TRUNCATECOLUMNS if they are not provided in this list if PARQUET is not part of the options passed in See http://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html for options which could be passed.
delete_s3_after (bool, optional) – Lets you specify to delete the S3 file after transfer if you want.
splits (int, optional) – Number of splits to perform for paralell loading into Redshift. Must be greater than
0
. Recommended that this number should be less than100
. Defaults to1
.compress (bool, optional) – Whether to compress the output file with
gzip
or leave it raw. Defaults toTrue
s3_folder (str, optional) – The AWS S3 folder of the bucket which you are copying the local file to. Defaults to
None
. Please note that you must follow the/
convention when using subfolders.
- unload(query, s3path, unload_options=None)[source]¶
Execute the UNLOAD command to export a query from Redshift to S3.
- unload_and_copy(query, s3_bucket, s3_folder=None, raw_unload_path=None, export_path=False, delim=',', delete_s3_after=True, parallel_off=False, unload_options=None)[source]¶
Unload data from Redshift.
With options to write to a flat file and store on S3.
- Parameters:
query (str) – A query to be unloaded to S3. A
SELECT
querys3_bucket (str) – The AWS S3 bucket where the data from the query will be unloaded.
s3_folder (str, optional) – The AWS S3 folder of the bucket where the data from the query will be unloaded. Defaults to
None
. Please note that you must follow the/
convention when using subfolders.raw_unload_path (str, optional) – The local path where the files will be copied to. Defaults to the current working directory (
os.getcwd()
).export_path (str, optional) – If a
export_path
is provided, function will concatenate and write the unloaded files to this path as a single file. If your file is very large you may not want to use this option.delim (str, optional) – Delimiter for unloading and file writing. Defaults to a comma. If None, this option will be ignored
delete_s3_after (bool, optional) – Delete the files from S3 after unloading. Defaults to True.
parallel_off (bool, optional) – Unload data to S3 as a single file. Defaults to False. Not recommended as it will decrease speed.
unload_options (list, optional) – A list of unload options that should be appended to the UNLOAD statement.
- Raises:
Exception – If no files are generated from the unload. If the column names from the query cannot be retrieved. If there is a issue with the execution of any of the queries.
- class locopy.S3(profile=None, kms_key=None, **kwargs)[source]¶
Bases:
object
S3 wrapper class.
Utilizes the boto3 library to push files to an S3 bucket.
- Parameters:
profile (str, optional) – The name of the AWS profile to use which is typical stored in the
credentials
file. You can also set environment variableAWS_DEFAULT_PROFILE
which would be used instead.kms_key (str, optional) – The KMS key to use for encryption If kms_key Defaults to
None
then the AES256 ServerSideEncryption will be used.**kwargs – Optional keyword arguments.
- session¶
Hold the AWS session credentials / info
- Type:
boto3.Session
- s3¶
Hold the S3 client object which is used to upload/delete files to S3
- Type:
botocore.client.S3
- Raises:
S3Error – Error initializing AWS Session (ex: invalid profile)
S3CredentialsError – Issue with AWS credentials
S3InitializationError – Issue initializing S3 session
- delete_from_s3(bucket, key)[source]¶
Delete a file from an S3 bucket.
- Parameters:
- Raises:
S3DeletionError – If there is a issue deleting from the S3 bucket
- delete_list_from_s3(s3_list)[source]¶
Delete a list of files from an S3 bucket.
- Parameters:
s3_list (list) – List of strings with the s3 paths of the files to delete. The strings should not include the s3:// scheme.
- download_from_s3(bucket, key, local)[source]¶
Download a file from a S3 bucket.
- Parameters:
- Raises:
S3DownloadError – If there is a issue downloading to the S3 bucket
- parse_s3_url(s3_url)[source]¶
Extract the bucket and key from a s3 url.
- Parameters:
s3_url (str) – s3 url. The string can include the s3:// scheme (which is disgarded)
- Returns:
bucket (str) – s3 bucket
key (str) – s3 key
- upload_list_to_s3(local_list, bucket, folder=None)[source]¶
Upload a list of files to a S3 bucket.
- Parameters:
local_list (list) – List of strings with the file paths of the files to upload
bucket (str) – The AWS S3 bucket which you are copying the local file to.
folder (str, optional) – The AWS S3 folder of the bucket which you are copying the local files to. Defaults to
None
. Please note that you must follow the/
convention when using subfolders.
- Returns:
Returns a list of the generated S3 bucket and keys of the files which were uploaded. The
S3://
part is NOT include. The output would look like the following:["my-bucket/key1", "my-bucket/key2", ...]
- Return type:
Notes
There is a assumption that if you are loading multiple files (via splits) it follows a structure such as file_name.extension.# (# splits). It allows for the COPY statement to use the key prefix vs specificing an exact file name. The returned list helps with this process downstream.
- upload_to_s3(local, bucket, key)[source]¶
Upload a file to a S3 bucket.
- Parameters:
- Raises:
S3UploadError – If there is a issue uploading to the S3 bucket
- class locopy.Snowflake(profile=None, kms_key=None, dbapi=None, config_yaml=None, **kwargs)[source]¶
-
Locopy class which manages connections to Snowflake. Inherits
Database
.Implements the specific
COPY INTO
functionality.- Parameters:
profile (str, optional) – The name of the AWS profile to use which is typically stored in the
credentials
file. You can also set environment variableAWS_DEFAULT_PROFILE
which would be used instead.kms_key (str, optional) – The KMS key to use for encryption If kms_key Defaults to
None
then the AES256 ServerSideEncryption will be used.dbapi (DBAPI 2 module, optional) – A database adapter which is Python DB API 2.0 compliant (
snowflake.connector
)config_yaml (str, optional) – String representing the YAML file location of the database connection keyword arguments. It is worth noting that this should only contain valid arguments for the database connector you plan on using. It will throw an exception if something is passed through which isn’t valid.
**kwargs – Database connection keyword arguments.
- session¶
Hold the AWS session credentials / info
- Type:
boto3.Session
- s3¶
Hold the S3 client object which is used to upload/delete files to S3
- Type:
botocore.client.S3
- dbapi¶
database adapter which is Python DBAPI 2.0 compliant (snowflake.connector)
- Type:
DBAPI 2 module
- conn¶
DBAPI connection instance
- Type:
dbapi.connection
- cursor¶
DBAPI cursor instance
- Type:
dbapi.cursor
- Raises:
CredentialsError – Database credentials are not provided or valid
S3Error – Error initializing AWS Session (ex: invalid profile)
S3CredentialsError – Issue with AWS credentials
S3InitializationError – Issue initializing S3 session
- connect()[source]¶
Create a connection to the Snowflake cluster.
Setg the values of the
conn
andcursor
attributes.- Raises:
DBError – If there is a problem establishing a connection to Snowflake.
- copy(table_name, stage, file_type='csv', format_options=None, copy_options=None)[source]¶
Load files from a stage into a Snowflake table.
Execute the
COPY INTO <table>
command to Iffile_type == csv
andformat_options == None
,format_options
will default to:["FIELD_DELIMITER='|'", "SKIP_HEADER=0"]
.- Parameters:
table_name (str) – The Snowflake table name which is being loaded. Must be fully qualified: <namespace>.<table_name>
stage (str) – Stage location of the load file. This can be a internal or external stage
file_type (str) – The file type. One of
csv
,json
, orparquet
format_options (list) – List of strings of format options to provide to the
COPY INTO
command. The options will typically be in the format of["a=b", "c=d"]
copy_options (list) – List of strings of copy options to provide to the
COPY INTO
command.
- Raises:
DBError – If there is a problem executing the COPY command, or a connection has not been initalized.
- download_from_internal(stage, local=None, parallel=10)[source]¶
Download file(s) from a internal stage via the
GET
command.- Parameters:
stage (str) – Internal stage location to load the file.
local (str, optional) – The local directory path where files will be downloaded to. Defualts to the current working directory (
os.getcwd()
). Otherwise it must be the absolute path.parallel (int, optional) – Specifies the number of threads to use for downloading files.
- insert_dataframe_to_table(dataframe, table_name, columns=None, create=False, metadata=None)[source]¶
Insert a Pandas or Polars dataframe to an existing table or a new table.
In newer versions of the python snowflake connector (v2.1.2+) users can call the
write_pandas
method from the cursor directly,insert_dataframe_to_table
is a custom implementation and does not usewrite_pandas
. Instead of usingCOPY INTO
the method builds a list of tuples to insert directly into the table. There are also options to create the table if it doesn’t exist and use your own metadata. If your data is significantly large then usingCOPY INTO <table>
is more appropriate.- Parameters:
dataframe (Pandas or Polars Dataframe) – The pandas or polars dataframe which needs to be inserted.
table_name (str) – The name of the Snowflake table which is being inserted.
columns (list, optional) – The list of columns which will be uploaded.
create (bool, optional) – Boolean flag if a new table need to be created and insert to.
metadata (dictionary, optional) – If metadata==None, it will be generated based on data
- to_dataframe(df_type='pandas', size=None)[source]¶
Return a dataframe of the last query results.
This is just a convenience method. This method overrides the base classes implementation in favour for the snowflake connectors built-in
fetch_pandas_all
whensize==None
. Ifsize != None
then we will continue to use the existing functionality where we iterate through the cursor and build the dataframe.- Parameters:
df_type (Literal["pandas","polars"], optional) – Output dataframe format. Defaults to pandas.
size (int, optional) – Chunk size to fetch. Defaults to None.
- Returns:
Dataframe with lowercase column names. Returns None if no fetched result.
- Return type:
pandas.DataFrame or polars.DataFrame
- unload(stage, table_name, file_type='csv', format_options=None, header=False, copy_options=None)[source]¶
Export a query/table from Snowflake to a stage.
Execute the
COPY INTO <location>
command.If
file_type == csv
andformat_options == None
,format_options
will default to:["FIELD_DELIMITER='|'"]
.- Parameters:
stage (str) – Stage location (internal or external) where the data files are unloaded
table_name (str) – The Snowflake table name which is being unloaded. Must be fully qualified:
<namespace>.<table_name>
file_type (str) – The file type. One of
csv
,json
, orparquet
format_options (list) – List of strings of format options to provide to the
COPY INTO
command.header (bool, optional) – Boolean flag if header is included in the file(s)
copy_options (list) – List of strings of copy options to provide to the
COPY INTO
command.
- Raises:
DBError – If there is a problem executing the UNLOAD command, or a connection has not been initalized.
- upload_to_internal(local, stage, parallel=4, auto_compress=True, overwrite=True)[source]¶
Upload file(s) to a internal stage via the
PUT
command.- Parameters:
local (str) – The local directory path to the file to upload. Wildcard characters (
*
,?
) are supported to enable uploading multiple files in a directory. Otherwise it must be the absolute path.stage (str) – Internal stage location to load the file.
parallel (int, optional) – Specifies the number of threads to use for uploading files.
auto_compress (bool, optional) – Specifies if Snowflake uses gzip to compress files during upload. If
True
, the files are compressed (if they are not already compressed). ifFalse
, the files are uploaded as-is.overwrite (bool, optional) – Specifies whether Snowflake overwrites an existing file with the same name during upload. If
True
, existing file with the same name is overwritten. ifFalse
, existing file with the same name is not overwritten.