locopy.redshift module

Redshift Module.

Module to wrap a database adapter into a Redshift class which can be used to connect to Redshift, and run arbitrary code.

class locopy.redshift.Redshift(profile=None, kms_key=None, dbapi=None, config_yaml=None, **kwargs)[source]

Bases: S3, Database

Locopy class which manages connections to Redshift.

Inherits Database and implements the specific COPY and UNLOAD functionality.

If any of host, port, dbname, user and password are not provided, a config_yaml file must be provided with those parameters in it. Please note ssl is always enforced when connecting.

Parameters:
  • profile (str, optional) – The name of the AWS profile to use which is typical stored in the credentials file. You can also set environment variable AWS_DEFAULT_PROFILE which would be used instead.

  • kms_key (str, optional) – The KMS key to use for encryption If kms_key Defaults to None then the AES256 ServerSideEncryption will be used.

  • dbapi (DBAPI 2 module, optional) – A database adapter which is Python DB API 2.0 compliant (psycopg2, pg8000, etc.)

  • config_yaml (str, optional) – String representing the YAML file location of the database connection keyword arguments. It is worth noting that this should only contain valid arguments for the database connector you plan on using. It will throw an exception if something is passed through which isn’t valid.

  • **kwargs – Database connection keyword arguments.

profile

String representing the AWS profile for authentication

Type:

str

kms_key

String representing the s3 kms key

Type:

str

session

Hold the AWS session credentials / info

Type:

boto3.Session

s3

Hold the S3 client object which is used to upload/delete files to S3

Type:

botocore.client.S3

dbapi

database adapter which is Python DBAPI 2.0 compliant

Type:

DBAPI 2 module

connection

Dictionary of database connection items

Type:

dict

conn

DBAPI connection instance

Type:

dbapi.connection

cursor

DBAPI cursor instance

Type:

dbapi.cursor

Raises:
connect()[source]

Create a connection to the Redshift cluster.

Sets the values of the conn and cursor attributes.

Raises:

DBError – If there is a problem establishing a connection to Redshift.

copy(table_name, s3path, delim='|', copy_options=None)[source]

Execute the COPY command to load files from S3 into a Redshift table.

Parameters:
  • table_name (str) – The Redshift table name which is being loaded

  • s3path (str) – S3 path of the input file. eg: s3://path/to/file.csv

  • delim (str, optional) – None for non-delimited file type. Defaults to |

  • copy_options (list) – List of strings of copy options to provide to the COPY command. Will have default options added in.

Raises:

DBError – If there is a problem executing the COPY command, a connection has not been initalized, or credentials are wrong.

insert_dataframe_to_table(dataframe, table_name, columns=None, create=False, metadata=None, batch_size=1000, verbose=False)[source]

Insert a Pandas or Polars dataframe to an existing table or a new table.

executemany in psycopg2 and pg8000 has very poor performance in terms of running speed. To overcome this issue, we instead format the insert query and then run execute.

Parameters:
  • dataframe (pandas.DataFrame or polars.DataFrame) – The pandas dataframe which needs to be inserted.

  • table_name (str) – The name of the Snowflake table which is being inserted.

  • columns (list, optional) – The list of columns which will be uploaded.

  • create (bool, default False) – Boolean flag if a new table need to be created and insert to.

  • metadata (dictionary, optional) – If metadata==None, it will be generated based on data

  • batch_size (int, default 1000) – The number of records to insert in each batch

  • verbose (bool, default False) – Whether or not to print out insert query

load_and_copy(local_file, s3_bucket, table_name, delim='|', copy_options=None, delete_s3_after=False, splits=1, compress=True, s3_folder=None)[source]

Load a file to S3, then copies into Redshift.

Has options to split a single file into multiple files, compress using gzip, and upload to an S3 bucket with folders within the bucket.

Notes

If you are using folders in your S3 bucket please be aware of having special chars or backward slashes (\). These may cause the file to upload but fail on the COPY command.

By default locopy will handle the splitting of files for you, in order to reduce complexity in uploading to s3 and generating the COPY command.

It is critical to ensure that the S3 location you are using that it only contains the files you want to load. In the case of a “folder” it should only contain the files you want to load. For a bucket the file name should be unique enough as any extensions get striped out in favour of the file prefix.

Parameters:
  • local_file (str) – The local file which you wish to copy. This can be a folder for non-delimited file type like parquet

  • s3_bucket (str) – The AWS S3 bucket which you are copying the local file to.

  • table_name (str) – The Redshift table name which is being loaded

  • delim (str, optional) – Delimiter for Redshift COPY command. None for non-delimited files. Defaults to |.

  • copy_options (list, optional) – A list (str) of copy options that should be appended to the COPY statement. The class will insert a default for DATEFORMAT, COMPUPDATE and TRUNCATECOLUMNS if they are not provided in this list if PARQUET is not part of the options passed in See http://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html for options which could be passed.

  • delete_s3_after (bool, optional) – Lets you specify to delete the S3 file after transfer if you want.

  • splits (int, optional) – Number of splits to perform for paralell loading into Redshift. Must be greater than 0. Recommended that this number should be less than 100. Defaults to 1.

  • compress (bool, optional) – Whether to compress the output file with gzip or leave it raw. Defaults to True

  • s3_folder (str, optional) – The AWS S3 folder of the bucket which you are copying the local file to. Defaults to None. Please note that you must follow the / convention when using subfolders.

unload(query, s3path, unload_options=None)[source]

Execute the UNLOAD command to export a query from Redshift to S3.

Parameters:
  • query (str) – A query to be unloaded to S3.

  • s3path (str) – S3 path for the output files.

  • unload_options (list) – List of string unload options.

Raises:

DBError – If there is a problem executing the UNLOAD command, a connection has not been initalized, or credentials are wrong.

unload_and_copy(query, s3_bucket, s3_folder=None, raw_unload_path=None, export_path=False, delim=',', delete_s3_after=True, parallel_off=False, unload_options=None)[source]

Unload data from Redshift.

With options to write to a flat file and store on S3.

Parameters:
  • query (str) – A query to be unloaded to S3. A SELECT query

  • s3_bucket (str) – The AWS S3 bucket where the data from the query will be unloaded.

  • s3_folder (str, optional) – The AWS S3 folder of the bucket where the data from the query will be unloaded. Defaults to None. Please note that you must follow the / convention when using subfolders.

  • raw_unload_path (str, optional) – The local path where the files will be copied to. Defaults to the current working directory (os.getcwd()).

  • export_path (str, optional) – If a export_path is provided, function will concatenate and write the unloaded files to this path as a single file. If your file is very large you may not want to use this option.

  • delim (str, optional) – Delimiter for unloading and file writing. Defaults to a comma. If None, this option will be ignored

  • delete_s3_after (bool, optional) – Delete the files from S3 after unloading. Defaults to True.

  • parallel_off (bool, optional) – Unload data to S3 as a single file. Defaults to False. Not recommended as it will decrease speed.

  • unload_options (list, optional) – A list of unload options that should be appended to the UNLOAD statement.

Raises:

Exception – If no files are generated from the unload. If the column names from the query cannot be retrieved. If there is a issue with the execution of any of the queries.

locopy.redshift.add_default_copy_options(copy_options=None)[source]

Add in default options for the COPY job.

Unless those specific options have been provided in the request.

Parameters:

copy_options (list, optional) – List of copy options to be provided to the Redshift copy command

Returns:

list of strings with the default options appended. If copy_options if not provided it will just return the default options.

Return type:

list

locopy.redshift.combine_copy_options(copy_options)[source]

Return the copy_options attribute with spaces in between.

Converts to a string.

Parameters:

copy_options (list) – copy options which is to be converted into a single string with spaces inbetween.

Returns:

copy_options attribute with spaces in between

Return type:

str