datacompy.spark package

Submodules

datacompy.spark.helper module

Helper function module contributed by Capital One’s Hydra Team.

Helper functions to assist in specific usecases where there is no columns to join and use the row order of the datasets.

datacompy.spark.helper.compare_by_row(spark_session: SparkSession, base_dataframe: DataFrame, compare_dataframe: DataFrame, string2double_cols: list[str] | None, abs_tol: float = 0, rel_tol: float = 0, df1_name: str = 'df1', df2_name: str = 'df2', ignore_spaces: bool = False, ignore_case: bool = False, cast_column_names_lower: bool = True) SparkSQLCompare

Run a detailed analysis on specific usecases where there is no columns to join and use the row order of the datasets.

If you know which columns to join on then please use SparkSQLCompare directly as this is meant to help support very specific helper usecases using row order contributed by Capital One’s Hydra Team.

Parameters:
  • spark_session (pyspark.sql.SparkSession) – A SparkSession to be used to execute Spark commands in the comparison.

  • base_dataframe (pyspark.sql.DataFrame) – Dataset to be compared against

  • compare_dataframe (pyspark.sql.DataFrame) – dataset to compare

  • string2double_cols (List[str], optional) – The columns that contain numeric values but are stored as string types

  • abs_tol (float, optional) – Absolute tolerance between two values.

  • rel_tol (float, optional) – Relative tolerance between two values.

  • df1_name (str, optional) – A string name for the first dataframe. This allows the reporting to print out an actual name instead of “df1”, and allows human users to more easily track the dataframes.

  • df2_name (str, optional) – A string name for the second dataframe

  • ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns (including any join columns)

  • ignore_case (bool, optional) – Flag to ignore the case of string columns

  • cast_column_names_lower (bool, optional) – Boolean indicator that controls of column names will be cast into lower case

Return type:

datacompy.spark.sql.SparkSQLCompare

datacompy.spark.helper.format_numeric_fields(df: DataFrame) DataFrame

Round and truncate numeric fields to 5 decimal places.

Parameters:

df (pyspark.sql.DataFrame) – The DataFrame to be formatted

Return type:

pyspark.sql.DataFrame

datacompy.spark.helper.handle_numeric_strings(df: DataFrame, field_list: list[str]) DataFrame

Convert columns in field_list from numeric strings to DoubleType.

Parameters:
  • df (pyspark.sql.DataFrame) – The DataFrame to be converted

  • field_list (List[str]) – List of StringType columns to be converted to DoubleType

Return type:

pyspark.sql.DataFrame

datacompy.spark.helper.sort_columns(base_df: DataFrame, compare_df: DataFrame) DataFrame

Sort both DataFrames by their columns to ensure consistent order.

Parameters:
  • base_df (pyspark.sql.DataFrame) – The base DataFrame to be sorted

  • compare_df (pyspark.sql.DataFrame) – The compare DataFrame to be sorted

Return type:

pyspark.sql.DataFrame, pyspark.sql.DataFrame

datacompy.spark.helper.sort_rows(base_df: DataFrame, compare_df: DataFrame) DataFrame

Add new column to each DataFrame that numbers the rows, so they can be compared by row number.

Parameters:
  • base_df (pyspark.sql.DataFrame) – The base DataFrame to be sorted

  • compare_df (pyspark.sql.DataFrame) – The compare DataFrame to be sorted

Return type:

pyspark.sql.DataFrame, pyspark.sql.DataFrame

datacompy.spark.sql module

Compare two PySpark SQL DataFrames.

Originally this package was meant to provide similar functionality to PROC COMPARE in SAS - i.e. human-readable reporting on the difference between two dataframes.

class datacompy.spark.sql.SparkSQLCompare(spark_session: SparkSession, df1: DataFrame, df2: DataFrame, join_columns: List[str] | str, abs_tol: float | Dict[str, float] = 0, rel_tol: float | Dict[str, float] = 0, df1_name: str = 'df1', df2_name: str = 'df2', ignore_spaces: bool = False, ignore_case: bool = False, cast_column_names_lower: bool = True)

Bases: BaseCompare

Comparison class to be used to compare whether two Spark SQL dataframes are equal.

Both df1 and df2 should be dataframes containing all of the join_columns, with unique column names. Differences between values are compared to abs_tol + rel_tol * abs(df2[‘value’]).

Parameters:
  • spark_session (pyspark.sql.SparkSession) – A SparkSession to be used to execute Spark commands in the comparison.

  • df1 (pyspark.sql.DataFrame) – First dataframe to check

  • df2 (pyspark.sql.DataFrame) – Second dataframe to check

  • join_columns (list or str, optional) – Column(s) to join dataframes on. If a string is passed in, that one column will be used.

  • abs_tol (float or dict, optional) – Absolute tolerance between two values. Can be either a float value applied to all columns, or a dictionary mapping column names to specific tolerance values. The special key “default” in the dictionary specifies the tolerance for columns not explicitly listed.

  • rel_tol (float or dict, optional) – Relative tolerance between two values. Can be either a float value applied to all columns, or a dictionary mapping column names to specific tolerance values. The special key “default” in the dictionary specifies the tolerance for columns not explicitly listed.

  • df1_name (str, optional) – A string name for the first dataframe. This allows the reporting to print out an actual name instead of “df1”, and allows human users to more easily track the dataframes.

  • df2_name (str, optional) – A string name for the second dataframe

  • ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns (including any join columns)

  • ignore_case (bool, optional) – Flag to ignore the case of string columns

  • cast_column_names_lower (bool, optional) – Boolean indicator that controls of column names will be cast into lower case

Variables:
  • df1_unq_rows (pyspark.sql.DataFrame) – All records that are only in df1 (based on a join on join_columns)

  • df2_unq_rows (pyspark.sql.DataFrame) – All records that are only in df2 (based on a join on join_columns)

  • intersect_rows (pyspark.sql.DataFrame) – All records that are in both df1 and df2

all_columns_match() bool

Whether the columns all match in the dataframes.

Returns:

True if all columns in df1 are in df2 and vice versa

Return type:

bool

all_mismatch(ignore_matching_cols: bool = False) DataFrame

Get all rows with any columns that have a mismatch.

Returns all df1 and df2 versions of the columns and join columns.

Parameters:

ignore_matching_cols (bool, optional) – Whether showing the matching columns in the output or not. The default is False.

Returns:

All rows of the intersection dataframe, containing any columns, that don’t match.

Return type:

pyspark.sql.DataFrame

all_rows_overlap() bool

Whether the rows are all present in both dataframes.

Returns:

True if all rows in df1 are in df2 and vice versa (based on existence for join option)

Return type:

bool

count_matching_rows() int

Count the number of rows match (on overlapping fields).

Returns:

Number of matching rows

Return type:

int

property df1: DataFrame

Get the first dataframe.

df1_unq_columns() OrderedSet[str]

Get columns that are unique to df1.

property df2: DataFrame

Get the second dataframe.

df2_unq_columns() OrderedSet[str]

Get columns that are unique to df2.

intersect_columns() OrderedSet[str]

Get columns that are shared between the two dataframes.

intersect_rows_match() bool

Check whether the intersect rows all match.

matches(ignore_extra_columns: bool = False) bool

Return True or False if the dataframes match.

Parameters:

ignore_extra_columns (bool) – Ignores any columns in one dataframe and not in the other.

report(sample_count: int = 10, column_count: int = 10, html_file: str | None = None, template_path: str | None = None) str

Return a string representation of a report.

The representation can then be printed or saved to a file. You can customize the report’s appearance by providing a custom Jinja2 template.

Parameters:
  • sample_count (int, optional) – The number of sample records to return. Defaults to 10.

  • column_count (int, optional) – The number of columns to display in the sample records output. Defaults to 10.

  • html_file (str, optional) – HTML file name to save report output to. If None the file creation will be skipped.

  • template_path (str, optional) –

    Path to a custom Jinja2 template file to use for report generation. If None, the default template will be used. The template receives the following context variables:

    • column_summary: Dict with column statistics including: common_columns, df1_unique, df2_unique, df1_name, df2_name

    • row_summary: Dict with row statistics including: match_columns, equal_rows, unequal_rows

    • column_comparison: Dict with column comparison statistics including: unequal_columns, equal_columns, unequal_values

    • mismatch_stats: Dict containing:
      • stats: List of dicts with column mismatch statistics (column, match, mismatch, null_diff, etc.)

      • samples: Sample rows with mismatched values

      • has_samples: Boolean indicating if there are any mismatch samples

      • has_mismatches: Boolean indicating if there are any mismatches

    • df1_unique_rows: Dict with unique rows in df1 including: has_rows, rows, columns

    • df2_unique_rows: Dict with unique rows in df2 including: has_rows, rows, columns

Returns:

The report, formatted according to the template.

Return type:

str

sample_mismatch(column: str, sample_count: int = 10, for_display: bool = False) DataFrame

Return sample mismatches.

Gets a sub-dataframe which contains the identifying columns, and df1 and df2 versions of the column.

Parameters:
  • column (str) – The raw column name (i.e. without _df1 appended)

  • sample_count (int, optional) – The number of sample records to return. Defaults to 10.

  • for_display (bool, optional) – Whether this is just going to be used for display (overwrite the column names)

Returns:

A sample of the intersection dataframe, containing only the “pertinent” columns, for rows that don’t match on the provided column.

Return type:

pyspark.sql.DataFrame

subset() bool

Return True if dataframe 2 is a subset of dataframe 1.

Dataframe 2 is considered a subset if all of its columns are in dataframe 1, and all of its rows match rows in dataframe 1 for the shared columns.

Returns:

True if dataframe 2 is a subset of dataframe 1.

Return type:

bool

datacompy.spark.sql.calculate_max_diff(dataframe: DataFrame, col_1: str, col_2: str) float

Get a maximum difference between two columns.

Parameters:
  • dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on

  • col_1 (str) – The first column to look at

  • col_2 (str) – The second column

Returns:

max diff

Return type:

float

datacompy.spark.sql.calculate_null_diff(dataframe: DataFrame, col_1: str, col_2: str) int

Get the null differences between two columns.

Parameters:
  • dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on

  • col_1 (str) – The first column to look at

  • col_2 (str) – The second column

Returns:

null diff

Return type:

int

datacompy.spark.sql.columns_equal(dataframe: DataFrame, col_1: str, col_2: str, rel_tol: float = 0, abs_tol: float = 0, ignore_spaces: bool = False, ignore_case: bool = False) Column

Compare if two columns are considered equal, returns a boolean Spark Column to be used in a .withColumn(…) statement.

Returns a True/False series with the same index as column 1.

  • Two nulls (np.nan) will evaluate to True.

  • A null and a non-null value will evaluate to False.

  • Numeric values will use the relative and absolute tolerances.

  • Decimal values (decimal.Decimal) will attempt to be converted to floats before comparing

  • Non-numeric values (i.e. where np.isclose can’t be used) will just trigger True on two nulls or exact matches.

Parameters:
  • dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on

  • col_1 (str) – The first column to look at

  • col_2 (str) – The second column

  • rel_tol (float, optional) – Relative tolerance

  • abs_tol (float, optional) – Absolute tolerance

  • ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns

  • ignore_case (bool, optional) – Flag to ignore the case of string columns

Returns:

Boolean Spark Column: True if values match according to the rules above, False otherwise.

Return type:

pyspark.sql.Column

Example

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark.createDataFrame([
...     (1, 1.1, "ABC", None),
...     (2, 2.0, "DEF ", 4.0),
...     (3, None, "ghi", 5.0)
... ], ["id", "col1", "col2", "col3"])
>>> # Compare numeric columns with tolerance
>>> df = df.withColumn("col1_equals_col3", columns_equal(df, "col1", "col3", rel_tol=0.1))
>>> # Compare string columns ignoring case and spaces
>>> df = df.withColumn("col2_normalized", columns_equal(df, "col2", "col2", ignore_spaces=True, ignore_case=True))

Note

Starting in version 0.18.0, the behavior of this function was changed so rather than returning a DataFrame a Column expression is returned.

datacompy.spark.sql.decimal_comparator()

Check equality with decimal(X, Y) types.

Otherwise treated as the string “decimal”.

datacompy.spark.sql.get_merged_columns(original_df: DataFrame, merged_df: DataFrame, suffix: str) List[str]

Get the columns from an original dataframe, in the new merged dataframe.

Parameters:
  • original_df (pyspark.sql.DataFrame) – The original, pre-merge dataframe

  • merged_df (pyspark.sql.DataFrame) – Post-merge with another dataframe, with suffixes added in.

  • suffix (str) – What suffix was used to distinguish when the original dataframe was overlapping with the other merged dataframe.

Returns:

Column list of the original dataframe pre suffix

Return type:

List[str]

Module contents

Spark comparisons.