datacompy.spark package¶
Submodules¶
datacompy.spark.legacy module¶
Legacy spark comparison.
- class datacompy.spark.legacy.LegacySparkCompare(spark_session: SparkSession, base_df: DataFrame, compare_df: DataFrame, join_columns: List[str | Tuple[str, str]], column_mapping: List[Tuple[str, str]] | None = None, cache_intermediates: bool = False, known_differences: List[Dict[str, Any]] | None = None, rel_tol: float = 0, abs_tol: float = 0, show_all_columns: bool = False, match_rates: bool = False)¶
Bases:
object
Comparison class used to compare two Spark Dataframes.
Extends the
Compare
functionality to the wide world of Spark and out-of-memory data.- Parameters:
spark_session (
pyspark.sql.SparkSession
) – ASparkSession
to be used to execute Spark commands in the comparison.base_df (
pyspark.sql.DataFrame
) – The dataframe to serve as a basis for comparison. While you will ultimately get the same results comparing A to B as you will comparing B to A, by conventionbase_df
should be the canonical, gold standard reference dataframe in the comparison.compare_df (
pyspark.sql.DataFrame
) – The dataframe to be compared againstbase_df
.join_columns (list) – A list of columns comprising the join key(s) of the two dataframes. If the column names are the same in the two dataframes, the names of the columns can be given as strings. If the names differ, the
join_columns
list should include tuples of the form (base_column_name, compare_column_name).column_mapping (list[tuple], optional) – If columns to be compared have different names in the base and compare dataframes, a list should be provided in
columns_mapping
consisting of tuples of the form (base_column_name, compare_column_name) for each set of differently-named columns to be compared against each other.cache_intermediates (bool, optional) – Whether or not
SparkCompare
will cache intermediate dataframes (such as the deduplicated version of dataframes, or the joined comparison). This will take a large amount of cache, proportional to the size of your dataframes, but will significantly speed up performance, as multiple steps will not have to recompute transformations. False by default.known_differences (list[dict], optional) –
A list of dictionaries that define transformations to apply to the compare dataframe to match values when there are known differences between base and compare. The dictionaries should contain:
name: A name that describes the transformation
- types: The types that the transformation should be applied to.
This prevents certain transformations from being applied to types that don’t make sense and would cause exceptions.
- transformation: A Spark SQL statement to apply to the column
in the compare dataset. The string “{input}” will be replaced by the variable in question.
abs_tol (float, optional) – Absolute tolerance between two values.
rel_tol (float, optional) – Relative tolerance between two values.
show_all_columns (bool, optional) – If true, all columns will be shown in the report including columns with a 100% match rate.
match_rates (bool, optional) – If true, match rates by column will be shown in the column summary.
- Returns:
Instance of a
SparkCompare
object, ready to do some comparin’. Note that ifcache_intermediates=True
, this instance will already have done some work deduping the input dataframes. Ifcache_intermediates=False
, the instantiation of this object is lazy.- Return type:
SparkCompare
- property base_row_count: int¶
Get the count of rows in the de-duped base dataframe.
- Type:
int
- property columns_compared: List[str]¶
Get columns to be compared in both dataframes.
All columns in both excluding the join key(s).
- property columns_in_both: Set[str]¶
Get columns in both dataframes.
- Type:
set[str]
- property columns_only_base: Set[str]¶
Get columns that are unique to the base dataframe.
- Type:
set[str]
- property columns_only_compare: Set[str]¶
Get columns that are unique to the compare dataframe.
- Type:
set[str]
- property common_row_count: int¶
Get the count of rows in common between base and compare dataframes.
- Type:
int
- property compare_row_count: int¶
Get the count of rows in the de-duped compare dataframe.
- Type:
int
- report(file: ~typing.TextIO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>) None ¶
Create a comparison report and print it to the file specified.
Prints to stdout by default.
- Parameters:
file (
file
, optional) – A filehandle to write the report to. By default, this is sys.stdout, printing the report to stdout. You can also redirect this to an output file, as in the example.
Examples
>>> with open('my_report.txt', 'w') as report_file: ... comparison.report(file=report_file)
- property rows_both_all: DataFrame | None¶
Returns all rows in both dataframes.
- Type:
pyspark.sql.DataFrame
- property rows_both_mismatch: DataFrame | None¶
Returns all rows in both dataframes that have mismatches.
- Type:
pyspark.sql.DataFrame
- property rows_only_base: DataFrame¶
Returns rows only in the base dataframe.
- Type:
pyspark.sql.DataFrame
- property rows_only_compare: DataFrame | None¶
Returns rows only in the compare dataframe.
- Type:
pyspark.sql.DataFrame
- class datacompy.spark.legacy.MatchType(value)¶
Bases:
Enum
Types of matches.
- KNOWN_DIFFERENCE = 2¶
- MATCH = 1¶
- MISMATCH = 0¶
- datacompy.spark.legacy.decimal_comparator() str ¶
Check equality with decimal(X, Y) types.
Otherwise treated as the string “decimal”.
datacompy.spark.pandas module¶
Compare two Pandas on Spark DataFrames.
Originally this package was meant to provide similar functionality to PROC COMPARE in SAS - i.e. human-readable reporting on the difference between two dataframes.
- class datacompy.spark.pandas.SparkPandasCompare(df1: DataFrame, df2: DataFrame, join_columns: List[str] | str, abs_tol: float = 0, rel_tol: float = 0, df1_name: str = 'df1', df2_name: str = 'df2', ignore_spaces: bool = False, ignore_case: bool = False, cast_column_names_lower: bool = True)¶
Bases:
BaseCompare
Comparison class to be used to compare whether two Pandas on Spark dataframes are equal.
Both df1 and df2 should be dataframes containing all of the join_columns, with unique column names. Differences between values are compared to abs_tol + rel_tol * abs(df2[‘value’]).
- Parameters:
df1 (pyspark.pandas.frame.DataFrame) – First dataframe to check
df2 (pyspark.pandas.frame.DataFrame) – Second dataframe to check
join_columns (list or str, optional) – Column(s) to join dataframes on. If a string is passed in, that one column will be used.
abs_tol (float, optional) – Absolute tolerance between two values.
rel_tol (float, optional) – Relative tolerance between two values.
df1_name (str, optional) – A string name for the first dataframe. This allows the reporting to print out an actual name instead of “df1”, and allows human users to more easily track the dataframes.
df2_name (str, optional) – A string name for the second dataframe
ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns (including any join columns)
ignore_case (bool, optional) – Flag to ignore the case of string columns
cast_column_names_lower (bool, optional) – Boolean indicator that controls of column names will be cast into lower case
- Variables:
df1_unq_rows (pyspark.pandas.frame.DataFrame) – All records that are only in df1 (based on a join on join_columns)
df2_unq_rows (pyspark.pandas.frame.DataFrame) – All records that are only in df2 (based on a join on join_columns)
- all_columns_match() bool ¶
Whether the columns all match in the dataframes.
- all_mismatch(ignore_matching_cols: bool = False) DataFrame ¶
Get all rows with any columns that have a mismatch.
Returns all df1 and df2 versions of the columns and join columns.
- Parameters:
ignore_matching_cols (bool, optional) – Whether showing the matching columns in the output or not. The default is False.
- Returns:
All rows of the intersection dataframe, containing any columns, that don’t match.
- Return type:
pyspark.pandas.frame.DataFrame
- all_rows_overlap() bool ¶
Whether the rows are all present in both dataframes.
- Returns:
True if all rows in df1 are in df2 and vice versa (based on existence for join option)
- Return type:
bool
- count_matching_rows() bool ¶
Count the number of rows match (on overlapping fields).
- Returns:
Number of matching rows
- Return type:
int
- property df1: DataFrame¶
Get the first dataframe.
- df1_unq_columns() OrderedSet[str] ¶
Get columns that are unique to df1.
- property df2: DataFrame¶
Get the second dataframe.
- df2_unq_columns() OrderedSet[str] ¶
Get columns that are unique to df2.
- intersect_columns() OrderedSet[str] ¶
Get columns that are shared between the two dataframes.
- intersect_rows_match() bool ¶
Check whether the intersect rows all match.
- matches(ignore_extra_columns: bool = False) bool ¶
Return True or False if the dataframes match.
- Parameters:
ignore_extra_columns (bool) – Ignores any columns in one dataframe and not in the other.
- report(sample_count: int = 10, column_count: int = 10, html_file: str | None = None) str ¶
Return a string representation of a report.
The representation can then be printed or saved to a file.
- Parameters:
sample_count (int, optional) – The number of sample records to return. Defaults to 10.
column_count (int, optional) – The number of columns to display in the sample records output. Defaults to 10.
html_file (str, optional) – HTML file name to save report output to. If
None
the file creation will be skipped.
- Returns:
The report, formatted kinda nicely.
- Return type:
str
- sample_mismatch(column: str, sample_count: int = 10, for_display: bool = False) DataFrame ¶
Return sample mismatches.
Gets a sub-dataframe which contains the identifying columns, and df1 and df2 versions of the column.
- Parameters:
column (str) – The raw column name (i.e. without
_df1
appended)sample_count (int, optional) – The number of sample records to return. Defaults to 10.
for_display (bool, optional) – Whether this is just going to be used for display (overwrite the column names)
- Returns:
A sample of the intersection dataframe, containing only the “pertinent” columns, for rows that don’t match on the provided column.
- Return type:
pyspark.pandas.frame.DataFrame
- subset() bool ¶
Return True if dataframe 2 is a subset of dataframe 1.
Dataframe 2 is considered a subset if all of its columns are in dataframe 1, and all of its rows match rows in dataframe 1 for the shared columns.
- Returns:
True if dataframe 2 is a subset of dataframe 1.
- Return type:
bool
- datacompy.spark.pandas.calculate_max_diff(col_1: DataFrame, col_2: DataFrame) float ¶
Get a maximum difference between two columns.
- Parameters:
col_1 (pyspark.pandas.series.Series) – The first column
col_2 (pyspark.pandas.series.Series) – The second column
- Returns:
max diff
- Return type:
float
- datacompy.spark.pandas.columns_equal(col_1: Series, col_2: Series, rel_tol: float = 0, abs_tol: float = 0, ignore_spaces: bool = False, ignore_case: bool = False) Series ¶
Compare two columns from a dataframe.
Returns a True/False series, with the same index as column 1.
Two nulls (np.nan) will evaluate to True.
A null and a non-null value will evaluate to False.
Numeric values will use the relative and absolute tolerances.
Decimal values (decimal.Decimal) will attempt to be converted to floats before comparing
Non-numeric values (i.e. where np.isclose can’t be used) will just trigger True on two nulls or exact matches.
- Parameters:
col_1 (pyspark.pandas.series.Series) – The first column to look at
col_2 (pyspark.pandas.series.Series) – The second column
rel_tol (float, optional) – Relative tolerance
abs_tol (float, optional) – Absolute tolerance
ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns
ignore_case (bool, optional) – Flag to ignore the case of string columns
- Returns:
A series of Boolean values. True == the values match, False == the values don’t match.
- Return type:
pyspark.pandas.series.Series
- datacompy.spark.pandas.compare_string_and_date_columns(col_1: Series, col_2: Series) Series ¶
Compare a string column and date column, value-wise.
This tries to convert a string column to a date column and compare that way.
- Parameters:
col_1 (pyspark.pandas.series.Series) – The first column to look at
col_2 (pyspark.pandas.series.Series) – The second column
- Returns:
A series of Boolean values. True == the values match, False == the values don’t match.
- Return type:
pyspark.pandas.series.Series
- datacompy.spark.pandas.generate_id_within_group(dataframe: DataFrame, join_columns: List[str]) Series ¶
Generate an ID column that can be used to deduplicate identical rows.
The series generated is the order within a unique group, and it handles nulls.
- Parameters:
dataframe (pyspark.pandas.frame.DataFrame) – The dataframe to operate on
join_columns (list) – List of strings which are the join columns
- Returns:
The ID column that’s unique in each group.
- Return type:
pyspark.pandas.series.Series
- datacompy.spark.pandas.get_merged_columns(original_df: DataFrame, merged_df: DataFrame, suffix: str) List[str] ¶
Get the columns from an original dataframe in the new merged dataframe.
- Parameters:
original_df (pyspark.pandas.frame.DataFrame) – The original, pre-merge dataframe
merged_df (pyspark.pandas.frame.DataFrame) – Post-merge with another dataframe, with suffixes added in.
suffix (str) – What suffix was used to distinguish when the original dataframe was overlapping with the other merged dataframe.
- Returns:
Column list of the original dataframe pre suffix
- Return type:
List[str]
- datacompy.spark.pandas.render(filename: str, *fields: int | float | str) str ¶
Render out an individual template.
This basically just reads in a template file, and applies
.format()
on the fields.- Parameters:
filename (str) – The file that contains the template. Will automagically prepend the templates directory before opening
fields (list) – Fields to be rendered out in the template
- Returns:
The fully rendered out file.
- Return type:
str
datacompy.spark.sql module¶
Compare two PySpark SQL DataFrames.
Originally this package was meant to provide similar functionality to PROC COMPARE in SAS - i.e. human-readable reporting on the difference between two dataframes.
- class datacompy.spark.sql.SparkSQLCompare(spark_session: SparkSession, df1: DataFrame, df2: DataFrame, join_columns: List[str] | str, abs_tol: float = 0, rel_tol: float = 0, df1_name: str = 'df1', df2_name: str = 'df2', ignore_spaces: bool = False, ignore_case: bool = False, cast_column_names_lower: bool = True)¶
Bases:
BaseCompare
Comparison class to be used to compare whether two Spark SQL dataframes are equal.
Both df1 and df2 should be dataframes containing all of the join_columns, with unique column names. Differences between values are compared to abs_tol + rel_tol * abs(df2[‘value’]).
- Parameters:
spark_session (pyspark.sql.SparkSession) – A
SparkSession
to be used to execute Spark commands in the comparison.df1 (pyspark.sql.DataFrame) – First dataframe to check
df2 (pyspark.sql.DataFrame) – Second dataframe to check
join_columns (list or str, optional) – Column(s) to join dataframes on. If a string is passed in, that one column will be used.
abs_tol (float, optional) – Absolute tolerance between two values.
rel_tol (float, optional) – Relative tolerance between two values.
df1_name (str, optional) – A string name for the first dataframe. This allows the reporting to print out an actual name instead of “df1”, and allows human users to more easily track the dataframes.
df2_name (str, optional) – A string name for the second dataframe
ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns (including any join columns)
ignore_case (bool, optional) – Flag to ignore the case of string columns
cast_column_names_lower (bool, optional) – Boolean indicator that controls of column names will be cast into lower case
- Variables:
df1_unq_rows (pyspark.sql.DataFrame) – All records that are only in df1 (based on a join on join_columns)
df2_unq_rows (pyspark.sql.DataFrame) – All records that are only in df2 (based on a join on join_columns)
intersect_rows (pyspark.sql.DataFrame) – All records that are in both df1 and df2
- all_columns_match() bool ¶
Whether the columns all match in the dataframes.
- Returns:
True if all columns in df1 are in df2 and vice versa
- Return type:
bool
- all_mismatch(ignore_matching_cols: bool = False) DataFrame ¶
Get all rows with any columns that have a mismatch.
Returns all df1 and df2 versions of the columns and join columns.
- Parameters:
ignore_matching_cols (bool, optional) – Whether showing the matching columns in the output or not. The default is False.
- Returns:
All rows of the intersection dataframe, containing any columns, that don’t match.
- Return type:
pyspark.sql.DataFrame
- all_rows_overlap() bool ¶
Whether the rows are all present in both dataframes.
- Returns:
True if all rows in df1 are in df2 and vice versa (based on existence for join option)
- Return type:
bool
- count_matching_rows() int ¶
Count the number of rows match (on overlapping fields).
- Returns:
Number of matching rows
- Return type:
int
- property df1: DataFrame¶
Get the first dataframe.
- df1_unq_columns() OrderedSet[str] ¶
Get columns that are unique to df1.
- property df2: DataFrame¶
Get the second dataframe.
- df2_unq_columns() OrderedSet[str] ¶
Get columns that are unique to df2.
- intersect_columns() OrderedSet[str] ¶
Get columns that are shared between the two dataframes.
- intersect_rows_match() bool ¶
Check whether the intersect rows all match.
- matches(ignore_extra_columns: bool = False) bool ¶
Return True or False if the dataframes match.
- Parameters:
ignore_extra_columns (bool) – Ignores any columns in one dataframe and not in the other.
- report(sample_count: int = 10, column_count: int = 10, html_file: str | None = None) str ¶
Return a string representation of a report.
The representation can then be printed or saved to a file.
- Parameters:
sample_count (int, optional) – The number of sample records to return. Defaults to 10.
column_count (int, optional) – The number of columns to display in the sample records output. Defaults to 10.
html_file (str, optional) – HTML file name to save report output to. If
None
the file creation will be skipped.
- Returns:
The report, formatted kinda nicely.
- Return type:
str
- sample_mismatch(column: str, sample_count: int = 10, for_display: bool = False) DataFrame ¶
Return sample mismatches.
Gets a sub-dataframe which contains the identifying columns, and df1 and df2 versions of the column.
- Parameters:
column (str) – The raw column name (i.e. without
_df1
appended)sample_count (int, optional) – The number of sample records to return. Defaults to 10.
for_display (bool, optional) – Whether this is just going to be used for display (overwrite the column names)
- Returns:
A sample of the intersection dataframe, containing only the “pertinent” columns, for rows that don’t match on the provided column.
- Return type:
pyspark.sql.DataFrame
- subset() bool ¶
Return True if dataframe 2 is a subset of dataframe 1.
Dataframe 2 is considered a subset if all of its columns are in dataframe 1, and all of its rows match rows in dataframe 1 for the shared columns.
- Returns:
True if dataframe 2 is a subset of dataframe 1.
- Return type:
bool
- datacompy.spark.sql.calculate_max_diff(dataframe: DataFrame, col_1: str, col_2: str) float ¶
Get a maximum difference between two columns.
- Parameters:
dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on
col_1 (str) – The first column to look at
col_2 (str) – The second column
- Returns:
max diff
- Return type:
float
- datacompy.spark.sql.calculate_null_diff(dataframe: DataFrame, col_1: str, col_2: str) int ¶
Get the null differences between two columns.
- Parameters:
dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on
col_1 (str) – The first column to look at
col_2 (str) – The second column
- Returns:
null diff
- Return type:
int
- datacompy.spark.sql.columns_equal(dataframe: DataFrame, col_1: str, col_2: str, col_match: str, rel_tol: float = 0, abs_tol: float = 0, ignore_spaces: bool = False, ignore_case: bool = False) DataFrame ¶
Compare two columns from a dataframe.
Returns a True/False series with the same index as column 1.
Two nulls (np.nan) will evaluate to True.
A null and a non-null value will evaluate to False.
Numeric values will use the relative and absolute tolerances.
Decimal values (decimal.Decimal) will attempt to be converted to floats before comparing
Non-numeric values (i.e. where np.isclose can’t be used) will just trigger True on two nulls or exact matches.
- Parameters:
dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on
col_1 (str) – The first column to look at
col_2 (str) – The second column
col_match (str) – The matching column denoting if the compare was a match or not
rel_tol (float, optional) – Relative tolerance
abs_tol (float, optional) – Absolute tolerance
ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns
ignore_case (bool, optional) – Flag to ignore the case of string columns
- Returns:
A column of boolean values are added. True == the values match, False == the values don’t match.
- Return type:
pyspark.sql.DataFrame
- datacompy.spark.sql.decimal_comparator()¶
Check equality with decimal(X, Y) types.
Otherwise treated as the string “decimal”.
- datacompy.spark.sql.get_merged_columns(original_df: DataFrame, merged_df: DataFrame, suffix: str) List[str] ¶
Get the columns from an original dataframe, in the new merged dataframe.
- Parameters:
original_df (pyspark.sql.DataFrame) – The original, pre-merge dataframe
merged_df (pyspark.sql.DataFrame) – Post-merge with another dataframe, with suffixes added in.
suffix (str) – What suffix was used to distinguish when the original dataframe was overlapping with the other merged dataframe.
- Returns:
Column list of the original dataframe pre suffix
- Return type:
List[str]
- datacompy.spark.sql.render(filename: str, *fields: int | float | str) str ¶
Render out an individual template.
This basically just reads in a template file, and applies
.format()
on the fields.- Parameters:
filename (str) – The file that contains the template. Will automagically prepend the templates directory before opening
fields (list) – Fields to be rendered out in the template
- Returns:
The fully rendered out file.
- Return type:
str
Module contents¶
Spark comparisons.