datacompy.spark package

Submodules

datacompy.spark.legacy module

Legacy spark comparison.

class datacompy.spark.legacy.LegacySparkCompare(spark_session: SparkSession, base_df: DataFrame, compare_df: DataFrame, join_columns: List[str | Tuple[str, str]], column_mapping: List[Tuple[str, str]] | None = None, cache_intermediates: bool = False, known_differences: List[Dict[str, Any]] | None = None, rel_tol: float = 0, abs_tol: float = 0, show_all_columns: bool = False, match_rates: bool = False)

Bases: object

Comparison class used to compare two Spark Dataframes.

Extends the Compare functionality to the wide world of Spark and out-of-memory data.

Parameters:
  • spark_session (pyspark.sql.SparkSession) – A SparkSession to be used to execute Spark commands in the comparison.

  • base_df (pyspark.sql.DataFrame) – The dataframe to serve as a basis for comparison. While you will ultimately get the same results comparing A to B as you will comparing B to A, by convention base_df should be the canonical, gold standard reference dataframe in the comparison.

  • compare_df (pyspark.sql.DataFrame) – The dataframe to be compared against base_df.

  • join_columns (list) – A list of columns comprising the join key(s) of the two dataframes. If the column names are the same in the two dataframes, the names of the columns can be given as strings. If the names differ, the join_columns list should include tuples of the form (base_column_name, compare_column_name).

  • column_mapping (list[tuple], optional) – If columns to be compared have different names in the base and compare dataframes, a list should be provided in columns_mapping consisting of tuples of the form (base_column_name, compare_column_name) for each set of differently-named columns to be compared against each other.

  • cache_intermediates (bool, optional) – Whether or not SparkCompare will cache intermediate dataframes (such as the deduplicated version of dataframes, or the joined comparison). This will take a large amount of cache, proportional to the size of your dataframes, but will significantly speed up performance, as multiple steps will not have to recompute transformations. False by default.

  • known_differences (list[dict], optional) –

    A list of dictionaries that define transformations to apply to the compare dataframe to match values when there are known differences between base and compare. The dictionaries should contain:

    • name: A name that describes the transformation

    • types: The types that the transformation should be applied to.

      This prevents certain transformations from being applied to types that don’t make sense and would cause exceptions.

    • transformation: A Spark SQL statement to apply to the column

      in the compare dataset. The string “{input}” will be replaced by the variable in question.

  • abs_tol (float, optional) – Absolute tolerance between two values.

  • rel_tol (float, optional) – Relative tolerance between two values.

  • show_all_columns (bool, optional) – If true, all columns will be shown in the report including columns with a 100% match rate.

  • match_rates (bool, optional) – If true, match rates by column will be shown in the column summary.

Returns:

Instance of a SparkCompare object, ready to do some comparin’. Note that if cache_intermediates=True, this instance will already have done some work deduping the input dataframes. If cache_intermediates=False, the instantiation of this object is lazy.

Return type:

SparkCompare

property base_row_count: int

Get the count of rows in the de-duped base dataframe.

Type:

int

property columns_compared: List[str]

Get columns to be compared in both dataframes.

All columns in both excluding the join key(s).

property columns_in_both: Set[str]

Get columns in both dataframes.

Type:

set[str]

property columns_only_base: Set[str]

Get columns that are unique to the base dataframe.

Type:

set[str]

property columns_only_compare: Set[str]

Get columns that are unique to the compare dataframe.

Type:

set[str]

property common_row_count: int

Get the count of rows in common between base and compare dataframes.

Type:

int

property compare_row_count: int

Get the count of rows in the de-duped compare dataframe.

Type:

int

report(file: ~typing.TextIO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>) None

Create a comparison report and print it to the file specified.

Prints to stdout by default.

Parameters:

file (file, optional) – A filehandle to write the report to. By default, this is sys.stdout, printing the report to stdout. You can also redirect this to an output file, as in the example.

Examples

>>> with open('my_report.txt', 'w') as report_file:
...     comparison.report(file=report_file)
property rows_both_all: DataFrame | None

Returns all rows in both dataframes.

Type:

pyspark.sql.DataFrame

property rows_both_mismatch: DataFrame | None

Returns all rows in both dataframes that have mismatches.

Type:

pyspark.sql.DataFrame

property rows_only_base: DataFrame

Returns rows only in the base dataframe.

Type:

pyspark.sql.DataFrame

property rows_only_compare: DataFrame | None

Returns rows only in the compare dataframe.

Type:

pyspark.sql.DataFrame

class datacompy.spark.legacy.MatchType(value)

Bases: Enum

Types of matches.

KNOWN_DIFFERENCE = 2
MATCH = 1
MISMATCH = 0
datacompy.spark.legacy.decimal_comparator() str

Check equality with decimal(X, Y) types.

Otherwise treated as the string “decimal”.

datacompy.spark.pandas module

Compare two Pandas on Spark DataFrames.

Originally this package was meant to provide similar functionality to PROC COMPARE in SAS - i.e. human-readable reporting on the difference between two dataframes.

class datacompy.spark.pandas.SparkPandasCompare(df1: DataFrame, df2: DataFrame, join_columns: List[str] | str, abs_tol: float = 0, rel_tol: float = 0, df1_name: str = 'df1', df2_name: str = 'df2', ignore_spaces: bool = False, ignore_case: bool = False, cast_column_names_lower: bool = True)

Bases: BaseCompare

Comparison class to be used to compare whether two Pandas on Spark dataframes are equal.

Both df1 and df2 should be dataframes containing all of the join_columns, with unique column names. Differences between values are compared to abs_tol + rel_tol * abs(df2[‘value’]).

Parameters:
  • df1 (pyspark.pandas.frame.DataFrame) – First dataframe to check

  • df2 (pyspark.pandas.frame.DataFrame) – Second dataframe to check

  • join_columns (list or str, optional) – Column(s) to join dataframes on. If a string is passed in, that one column will be used.

  • abs_tol (float, optional) – Absolute tolerance between two values.

  • rel_tol (float, optional) – Relative tolerance between two values.

  • df1_name (str, optional) – A string name for the first dataframe. This allows the reporting to print out an actual name instead of “df1”, and allows human users to more easily track the dataframes.

  • df2_name (str, optional) – A string name for the second dataframe

  • ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns (including any join columns)

  • ignore_case (bool, optional) – Flag to ignore the case of string columns

  • cast_column_names_lower (bool, optional) – Boolean indicator that controls of column names will be cast into lower case

Variables:
  • df1_unq_rows (pyspark.pandas.frame.DataFrame) – All records that are only in df1 (based on a join on join_columns)

  • df2_unq_rows (pyspark.pandas.frame.DataFrame) – All records that are only in df2 (based on a join on join_columns)

all_columns_match() bool

Whether the columns all match in the dataframes.

all_mismatch(ignore_matching_cols: bool = False) DataFrame

Get all rows with any columns that have a mismatch.

Returns all df1 and df2 versions of the columns and join columns.

Parameters:

ignore_matching_cols (bool, optional) – Whether showing the matching columns in the output or not. The default is False.

Returns:

All rows of the intersection dataframe, containing any columns, that don’t match.

Return type:

pyspark.pandas.frame.DataFrame

all_rows_overlap() bool

Whether the rows are all present in both dataframes.

Returns:

True if all rows in df1 are in df2 and vice versa (based on existence for join option)

Return type:

bool

count_matching_rows() bool

Count the number of rows match (on overlapping fields).

Returns:

Number of matching rows

Return type:

int

property df1: DataFrame

Get the first dataframe.

df1_unq_columns() OrderedSet[str]

Get columns that are unique to df1.

property df2: DataFrame

Get the second dataframe.

df2_unq_columns() OrderedSet[str]

Get columns that are unique to df2.

intersect_columns() OrderedSet[str]

Get columns that are shared between the two dataframes.

intersect_rows_match() bool

Check whether the intersect rows all match.

matches(ignore_extra_columns: bool = False) bool

Return True or False if the dataframes match.

Parameters:

ignore_extra_columns (bool) – Ignores any columns in one dataframe and not in the other.

report(sample_count: int = 10, column_count: int = 10, html_file: str | None = None) str

Return a string representation of a report.

The representation can then be printed or saved to a file.

Parameters:
  • sample_count (int, optional) – The number of sample records to return. Defaults to 10.

  • column_count (int, optional) – The number of columns to display in the sample records output. Defaults to 10.

  • html_file (str, optional) – HTML file name to save report output to. If None the file creation will be skipped.

Returns:

The report, formatted kinda nicely.

Return type:

str

sample_mismatch(column: str, sample_count: int = 10, for_display: bool = False) DataFrame

Return sample mismatches.

Gets a sub-dataframe which contains the identifying columns, and df1 and df2 versions of the column.

Parameters:
  • column (str) – The raw column name (i.e. without _df1 appended)

  • sample_count (int, optional) – The number of sample records to return. Defaults to 10.

  • for_display (bool, optional) – Whether this is just going to be used for display (overwrite the column names)

Returns:

A sample of the intersection dataframe, containing only the “pertinent” columns, for rows that don’t match on the provided column.

Return type:

pyspark.pandas.frame.DataFrame

subset() bool

Return True if dataframe 2 is a subset of dataframe 1.

Dataframe 2 is considered a subset if all of its columns are in dataframe 1, and all of its rows match rows in dataframe 1 for the shared columns.

Returns:

True if dataframe 2 is a subset of dataframe 1.

Return type:

bool

datacompy.spark.pandas.calculate_max_diff(col_1: DataFrame, col_2: DataFrame) float

Get a maximum difference between two columns.

Parameters:
  • col_1 (pyspark.pandas.series.Series) – The first column

  • col_2 (pyspark.pandas.series.Series) – The second column

Returns:

max diff

Return type:

float

datacompy.spark.pandas.columns_equal(col_1: Series, col_2: Series, rel_tol: float = 0, abs_tol: float = 0, ignore_spaces: bool = False, ignore_case: bool = False) Series

Compare two columns from a dataframe.

Returns a True/False series, with the same index as column 1.

  • Two nulls (np.nan) will evaluate to True.

  • A null and a non-null value will evaluate to False.

  • Numeric values will use the relative and absolute tolerances.

  • Decimal values (decimal.Decimal) will attempt to be converted to floats before comparing

  • Non-numeric values (i.e. where np.isclose can’t be used) will just trigger True on two nulls or exact matches.

Parameters:
  • col_1 (pyspark.pandas.series.Series) – The first column to look at

  • col_2 (pyspark.pandas.series.Series) – The second column

  • rel_tol (float, optional) – Relative tolerance

  • abs_tol (float, optional) – Absolute tolerance

  • ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns

  • ignore_case (bool, optional) – Flag to ignore the case of string columns

Returns:

A series of Boolean values. True == the values match, False == the values don’t match.

Return type:

pyspark.pandas.series.Series

datacompy.spark.pandas.compare_string_and_date_columns(col_1: Series, col_2: Series) Series

Compare a string column and date column, value-wise.

This tries to convert a string column to a date column and compare that way.

Parameters:
  • col_1 (pyspark.pandas.series.Series) – The first column to look at

  • col_2 (pyspark.pandas.series.Series) – The second column

Returns:

A series of Boolean values. True == the values match, False == the values don’t match.

Return type:

pyspark.pandas.series.Series

datacompy.spark.pandas.generate_id_within_group(dataframe: DataFrame, join_columns: List[str]) Series

Generate an ID column that can be used to deduplicate identical rows.

The series generated is the order within a unique group, and it handles nulls.

Parameters:
  • dataframe (pyspark.pandas.frame.DataFrame) – The dataframe to operate on

  • join_columns (list) – List of strings which are the join columns

Returns:

The ID column that’s unique in each group.

Return type:

pyspark.pandas.series.Series

datacompy.spark.pandas.get_merged_columns(original_df: DataFrame, merged_df: DataFrame, suffix: str) List[str]

Get the columns from an original dataframe in the new merged dataframe.

Parameters:
  • original_df (pyspark.pandas.frame.DataFrame) – The original, pre-merge dataframe

  • merged_df (pyspark.pandas.frame.DataFrame) – Post-merge with another dataframe, with suffixes added in.

  • suffix (str) – What suffix was used to distinguish when the original dataframe was overlapping with the other merged dataframe.

Returns:

Column list of the original dataframe pre suffix

Return type:

List[str]

datacompy.spark.pandas.render(filename: str, *fields: int | float | str) str

Render out an individual template.

This basically just reads in a template file, and applies .format() on the fields.

Parameters:
  • filename (str) – The file that contains the template. Will automagically prepend the templates directory before opening

  • fields (list) – Fields to be rendered out in the template

Returns:

The fully rendered out file.

Return type:

str

datacompy.spark.sql module

Compare two PySpark SQL DataFrames.

Originally this package was meant to provide similar functionality to PROC COMPARE in SAS - i.e. human-readable reporting on the difference between two dataframes.

class datacompy.spark.sql.SparkSQLCompare(spark_session: SparkSession, df1: DataFrame, df2: DataFrame, join_columns: List[str] | str, abs_tol: float = 0, rel_tol: float = 0, df1_name: str = 'df1', df2_name: str = 'df2', ignore_spaces: bool = False, ignore_case: bool = False, cast_column_names_lower: bool = True)

Bases: BaseCompare

Comparison class to be used to compare whether two Spark SQL dataframes are equal.

Both df1 and df2 should be dataframes containing all of the join_columns, with unique column names. Differences between values are compared to abs_tol + rel_tol * abs(df2[‘value’]).

Parameters:
  • spark_session (pyspark.sql.SparkSession) – A SparkSession to be used to execute Spark commands in the comparison.

  • df1 (pyspark.sql.DataFrame) – First dataframe to check

  • df2 (pyspark.sql.DataFrame) – Second dataframe to check

  • join_columns (list or str, optional) – Column(s) to join dataframes on. If a string is passed in, that one column will be used.

  • abs_tol (float, optional) – Absolute tolerance between two values.

  • rel_tol (float, optional) – Relative tolerance between two values.

  • df1_name (str, optional) – A string name for the first dataframe. This allows the reporting to print out an actual name instead of “df1”, and allows human users to more easily track the dataframes.

  • df2_name (str, optional) – A string name for the second dataframe

  • ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns (including any join columns)

  • ignore_case (bool, optional) – Flag to ignore the case of string columns

  • cast_column_names_lower (bool, optional) – Boolean indicator that controls of column names will be cast into lower case

Variables:
  • df1_unq_rows (pyspark.sql.DataFrame) – All records that are only in df1 (based on a join on join_columns)

  • df2_unq_rows (pyspark.sql.DataFrame) – All records that are only in df2 (based on a join on join_columns)

  • intersect_rows (pyspark.sql.DataFrame) – All records that are in both df1 and df2

all_columns_match() bool

Whether the columns all match in the dataframes.

Returns:

True if all columns in df1 are in df2 and vice versa

Return type:

bool

all_mismatch(ignore_matching_cols: bool = False) DataFrame

Get all rows with any columns that have a mismatch.

Returns all df1 and df2 versions of the columns and join columns.

Parameters:

ignore_matching_cols (bool, optional) – Whether showing the matching columns in the output or not. The default is False.

Returns:

All rows of the intersection dataframe, containing any columns, that don’t match.

Return type:

pyspark.sql.DataFrame

all_rows_overlap() bool

Whether the rows are all present in both dataframes.

Returns:

True if all rows in df1 are in df2 and vice versa (based on existence for join option)

Return type:

bool

count_matching_rows() int

Count the number of rows match (on overlapping fields).

Returns:

Number of matching rows

Return type:

int

property df1: DataFrame

Get the first dataframe.

df1_unq_columns() OrderedSet[str]

Get columns that are unique to df1.

property df2: DataFrame

Get the second dataframe.

df2_unq_columns() OrderedSet[str]

Get columns that are unique to df2.

intersect_columns() OrderedSet[str]

Get columns that are shared between the two dataframes.

intersect_rows_match() bool

Check whether the intersect rows all match.

matches(ignore_extra_columns: bool = False) bool

Return True or False if the dataframes match.

Parameters:

ignore_extra_columns (bool) – Ignores any columns in one dataframe and not in the other.

report(sample_count: int = 10, column_count: int = 10, html_file: str | None = None) str

Return a string representation of a report.

The representation can then be printed or saved to a file.

Parameters:
  • sample_count (int, optional) – The number of sample records to return. Defaults to 10.

  • column_count (int, optional) – The number of columns to display in the sample records output. Defaults to 10.

  • html_file (str, optional) – HTML file name to save report output to. If None the file creation will be skipped.

Returns:

The report, formatted kinda nicely.

Return type:

str

sample_mismatch(column: str, sample_count: int = 10, for_display: bool = False) DataFrame

Return sample mismatches.

Gets a sub-dataframe which contains the identifying columns, and df1 and df2 versions of the column.

Parameters:
  • column (str) – The raw column name (i.e. without _df1 appended)

  • sample_count (int, optional) – The number of sample records to return. Defaults to 10.

  • for_display (bool, optional) – Whether this is just going to be used for display (overwrite the column names)

Returns:

A sample of the intersection dataframe, containing only the “pertinent” columns, for rows that don’t match on the provided column.

Return type:

pyspark.sql.DataFrame

subset() bool

Return True if dataframe 2 is a subset of dataframe 1.

Dataframe 2 is considered a subset if all of its columns are in dataframe 1, and all of its rows match rows in dataframe 1 for the shared columns.

Returns:

True if dataframe 2 is a subset of dataframe 1.

Return type:

bool

datacompy.spark.sql.calculate_max_diff(dataframe: DataFrame, col_1: str, col_2: str) float

Get a maximum difference between two columns.

Parameters:
  • dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on

  • col_1 (str) – The first column to look at

  • col_2 (str) – The second column

Returns:

max diff

Return type:

float

datacompy.spark.sql.calculate_null_diff(dataframe: DataFrame, col_1: str, col_2: str) int

Get the null differences between two columns.

Parameters:
  • dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on

  • col_1 (str) – The first column to look at

  • col_2 (str) – The second column

Returns:

null diff

Return type:

int

datacompy.spark.sql.columns_equal(dataframe: DataFrame, col_1: str, col_2: str, col_match: str, rel_tol: float = 0, abs_tol: float = 0, ignore_spaces: bool = False, ignore_case: bool = False) DataFrame

Compare two columns from a dataframe.

Returns a True/False series with the same index as column 1.

  • Two nulls (np.nan) will evaluate to True.

  • A null and a non-null value will evaluate to False.

  • Numeric values will use the relative and absolute tolerances.

  • Decimal values (decimal.Decimal) will attempt to be converted to floats before comparing

  • Non-numeric values (i.e. where np.isclose can’t be used) will just trigger True on two nulls or exact matches.

Parameters:
  • dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on

  • col_1 (str) – The first column to look at

  • col_2 (str) – The second column

  • col_match (str) – The matching column denoting if the compare was a match or not

  • rel_tol (float, optional) – Relative tolerance

  • abs_tol (float, optional) – Absolute tolerance

  • ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns

  • ignore_case (bool, optional) – Flag to ignore the case of string columns

Returns:

A column of boolean values are added. True == the values match, False == the values don’t match.

Return type:

pyspark.sql.DataFrame

datacompy.spark.sql.decimal_comparator()

Check equality with decimal(X, Y) types.

Otherwise treated as the string “decimal”.

datacompy.spark.sql.get_merged_columns(original_df: DataFrame, merged_df: DataFrame, suffix: str) List[str]

Get the columns from an original dataframe, in the new merged dataframe.

Parameters:
  • original_df (pyspark.sql.DataFrame) – The original, pre-merge dataframe

  • merged_df (pyspark.sql.DataFrame) – Post-merge with another dataframe, with suffixes added in.

  • suffix (str) – What suffix was used to distinguish when the original dataframe was overlapping with the other merged dataframe.

Returns:

Column list of the original dataframe pre suffix

Return type:

List[str]

datacompy.spark.sql.render(filename: str, *fields: int | float | str) str

Render out an individual template.

This basically just reads in a template file, and applies .format() on the fields.

Parameters:
  • filename (str) – The file that contains the template. Will automagically prepend the templates directory before opening

  • fields (list) – Fields to be rendered out in the template

Returns:

The fully rendered out file.

Return type:

str

Module contents

Spark comparisons.