datacompy.spark package¶
Submodules¶
datacompy.spark.helper module¶
Helper function module contributed by Capital One’s Hydra Team.
Helper functions to assist in specific usecases where there is no columns to join and use the row order of the datasets.
- datacompy.spark.helper.compare_by_row(spark_session: SparkSession, base_dataframe: DataFrame, compare_dataframe: DataFrame, string2double_cols: list[str] | None, abs_tol: float = 0, rel_tol: float = 0, df1_name: str = 'df1', df2_name: str = 'df2', ignore_spaces: bool = False, ignore_case: bool = False, cast_column_names_lower: bool = True) SparkSQLCompare ¶
Run a detailed analysis on specific usecases where there is no columns to join and use the row order of the datasets.
If you know which columns to join on then please use
SparkSQLCompare
directly as this is meant to help support very specific helper usecases using row order contributed by Capital One’s Hydra Team.- Parameters:
spark_session (pyspark.sql.SparkSession) – A
SparkSession
to be used to execute Spark commands in the comparison.base_dataframe (pyspark.sql.DataFrame) – Dataset to be compared against
compare_dataframe (pyspark.sql.DataFrame) – dataset to compare
string2double_cols (List[str], optional) – The columns that contain numeric values but are stored as string types
abs_tol (float, optional) – Absolute tolerance between two values.
rel_tol (float, optional) – Relative tolerance between two values.
df1_name (str, optional) – A string name for the first dataframe. This allows the reporting to print out an actual name instead of “df1”, and allows human users to more easily track the dataframes.
df2_name (str, optional) – A string name for the second dataframe
ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns (including any join columns)
ignore_case (bool, optional) – Flag to ignore the case of string columns
cast_column_names_lower (bool, optional) – Boolean indicator that controls of column names will be cast into lower case
- Return type:
- datacompy.spark.helper.format_numeric_fields(df: DataFrame) DataFrame ¶
Round and truncate numeric fields to 5 decimal places.
- Parameters:
df (pyspark.sql.DataFrame) – The DataFrame to be formatted
- Return type:
pyspark.sql.DataFrame
- datacompy.spark.helper.handle_numeric_strings(df: DataFrame, field_list: list[str]) DataFrame ¶
Convert columns in field_list from numeric strings to DoubleType.
- Parameters:
df (pyspark.sql.DataFrame) – The DataFrame to be converted
field_list (List[str]) – List of StringType columns to be converted to DoubleType
- Return type:
pyspark.sql.DataFrame
- datacompy.spark.helper.sort_columns(base_df: DataFrame, compare_df: DataFrame) DataFrame ¶
Sort both DataFrames by their columns to ensure consistent order.
- Parameters:
base_df (pyspark.sql.DataFrame) – The base DataFrame to be sorted
compare_df (pyspark.sql.DataFrame) – The compare DataFrame to be sorted
- Return type:
pyspark.sql.DataFrame, pyspark.sql.DataFrame
- datacompy.spark.helper.sort_rows(base_df: DataFrame, compare_df: DataFrame) DataFrame ¶
Add new column to each DataFrame that numbers the rows, so they can be compared by row number.
- Parameters:
base_df (pyspark.sql.DataFrame) – The base DataFrame to be sorted
compare_df (pyspark.sql.DataFrame) – The compare DataFrame to be sorted
- Return type:
pyspark.sql.DataFrame, pyspark.sql.DataFrame
datacompy.spark.sql module¶
Compare two PySpark SQL DataFrames.
Originally this package was meant to provide similar functionality to PROC COMPARE in SAS - i.e. human-readable reporting on the difference between two dataframes.
- class datacompy.spark.sql.SparkSQLCompare(spark_session: SparkSession, df1: DataFrame, df2: DataFrame, join_columns: List[str] | str, abs_tol: float | Dict[str, float] = 0, rel_tol: float | Dict[str, float] = 0, df1_name: str = 'df1', df2_name: str = 'df2', ignore_spaces: bool = False, ignore_case: bool = False, cast_column_names_lower: bool = True)¶
Bases:
BaseCompare
Comparison class to be used to compare whether two Spark SQL dataframes are equal.
Both df1 and df2 should be dataframes containing all of the join_columns, with unique column names. Differences between values are compared to abs_tol + rel_tol * abs(df2[‘value’]).
- Parameters:
spark_session (pyspark.sql.SparkSession) – A
SparkSession
to be used to execute Spark commands in the comparison.df1 (pyspark.sql.DataFrame) – First dataframe to check
df2 (pyspark.sql.DataFrame) – Second dataframe to check
join_columns (list or str, optional) – Column(s) to join dataframes on. If a string is passed in, that one column will be used.
abs_tol (float or dict, optional) – Absolute tolerance between two values. Can be either a float value applied to all columns, or a dictionary mapping column names to specific tolerance values. The special key “default” in the dictionary specifies the tolerance for columns not explicitly listed.
rel_tol (float or dict, optional) – Relative tolerance between two values. Can be either a float value applied to all columns, or a dictionary mapping column names to specific tolerance values. The special key “default” in the dictionary specifies the tolerance for columns not explicitly listed.
df1_name (str, optional) – A string name for the first dataframe. This allows the reporting to print out an actual name instead of “df1”, and allows human users to more easily track the dataframes.
df2_name (str, optional) – A string name for the second dataframe
ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns (including any join columns)
ignore_case (bool, optional) – Flag to ignore the case of string columns
cast_column_names_lower (bool, optional) – Boolean indicator that controls of column names will be cast into lower case
- Variables:
df1_unq_rows (pyspark.sql.DataFrame) – All records that are only in df1 (based on a join on join_columns)
df2_unq_rows (pyspark.sql.DataFrame) – All records that are only in df2 (based on a join on join_columns)
intersect_rows (pyspark.sql.DataFrame) – All records that are in both df1 and df2
- all_columns_match() bool ¶
Whether the columns all match in the dataframes.
- Returns:
True if all columns in df1 are in df2 and vice versa
- Return type:
bool
- all_mismatch(ignore_matching_cols: bool = False) DataFrame ¶
Get all rows with any columns that have a mismatch.
Returns all df1 and df2 versions of the columns and join columns.
- Parameters:
ignore_matching_cols (bool, optional) – Whether showing the matching columns in the output or not. The default is False.
- Returns:
All rows of the intersection dataframe, containing any columns, that don’t match.
- Return type:
pyspark.sql.DataFrame
- all_rows_overlap() bool ¶
Whether the rows are all present in both dataframes.
- Returns:
True if all rows in df1 are in df2 and vice versa (based on existence for join option)
- Return type:
bool
- count_matching_rows() int ¶
Count the number of rows match (on overlapping fields).
- Returns:
Number of matching rows
- Return type:
int
- property df1: DataFrame¶
Get the first dataframe.
- df1_unq_columns() OrderedSet[str] ¶
Get columns that are unique to df1.
- property df2: DataFrame¶
Get the second dataframe.
- df2_unq_columns() OrderedSet[str] ¶
Get columns that are unique to df2.
- intersect_columns() OrderedSet[str] ¶
Get columns that are shared between the two dataframes.
- intersect_rows_match() bool ¶
Check whether the intersect rows all match.
- matches(ignore_extra_columns: bool = False) bool ¶
Return True or False if the dataframes match.
- Parameters:
ignore_extra_columns (bool) – Ignores any columns in one dataframe and not in the other.
- report(sample_count: int = 10, column_count: int = 10, html_file: str | None = None, template_path: str | None = None) str ¶
Return a string representation of a report.
The representation can then be printed or saved to a file. You can customize the report’s appearance by providing a custom Jinja2 template.
- Parameters:
sample_count (int, optional) – The number of sample records to return. Defaults to 10.
column_count (int, optional) – The number of columns to display in the sample records output. Defaults to 10.
html_file (str, optional) – HTML file name to save report output to. If
None
the file creation will be skipped.template_path (str, optional) –
Path to a custom Jinja2 template file to use for report generation. If
None
, the default template will be used. The template receives the following context variables:column_summary
: Dict with column statistics including:common_columns
,df1_unique
,df2_unique
,df1_name
,df2_name
row_summary
: Dict with row statistics including:match_columns
,equal_rows
,unequal_rows
column_comparison
: Dict with column comparison statistics including:unequal_columns
,equal_columns
,unequal_values
mismatch_stats
: Dict containing:stats
: List of dicts with column mismatch statistics (column, match, mismatch, null_diff, etc.)samples
: Sample rows with mismatched valueshas_samples
: Boolean indicating if there are any mismatch sampleshas_mismatches
: Boolean indicating if there are any mismatches
df1_unique_rows
: Dict with unique rows in df1 including:has_rows
,rows
,columns
df2_unique_rows
: Dict with unique rows in df2 including:has_rows
,rows
,columns
- Returns:
The report, formatted according to the template.
- Return type:
str
- sample_mismatch(column: str, sample_count: int = 10, for_display: bool = False) DataFrame ¶
Return sample mismatches.
Gets a sub-dataframe which contains the identifying columns, and df1 and df2 versions of the column.
- Parameters:
column (str) – The raw column name (i.e. without
_df1
appended)sample_count (int, optional) – The number of sample records to return. Defaults to 10.
for_display (bool, optional) – Whether this is just going to be used for display (overwrite the column names)
- Returns:
A sample of the intersection dataframe, containing only the “pertinent” columns, for rows that don’t match on the provided column.
- Return type:
pyspark.sql.DataFrame
- subset() bool ¶
Return True if dataframe 2 is a subset of dataframe 1.
Dataframe 2 is considered a subset if all of its columns are in dataframe 1, and all of its rows match rows in dataframe 1 for the shared columns.
- Returns:
True if dataframe 2 is a subset of dataframe 1.
- Return type:
bool
- datacompy.spark.sql.calculate_max_diff(dataframe: DataFrame, col_1: str, col_2: str) float ¶
Get a maximum difference between two columns.
- Parameters:
dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on
col_1 (str) – The first column to look at
col_2 (str) – The second column
- Returns:
max diff
- Return type:
float
- datacompy.spark.sql.calculate_null_diff(dataframe: DataFrame, col_1: str, col_2: str) int ¶
Get the null differences between two columns.
- Parameters:
dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on
col_1 (str) – The first column to look at
col_2 (str) – The second column
- Returns:
null diff
- Return type:
int
- datacompy.spark.sql.columns_equal(dataframe: DataFrame, col_1: str, col_2: str, rel_tol: float = 0, abs_tol: float = 0, ignore_spaces: bool = False, ignore_case: bool = False) Column ¶
Compare if two columns are considered equal, returns a boolean Spark Column to be used in a .withColumn(…) statement.
Returns a True/False series with the same index as column 1.
Two nulls (np.nan) will evaluate to True.
A null and a non-null value will evaluate to False.
Numeric values will use the relative and absolute tolerances.
Decimal values (decimal.Decimal) will attempt to be converted to floats before comparing
Non-numeric values (i.e. where np.isclose can’t be used) will just trigger True on two nulls or exact matches.
- Parameters:
dataframe (pyspark.sql.DataFrame) – DataFrame to do comparison on
col_1 (str) – The first column to look at
col_2 (str) – The second column
rel_tol (float, optional) – Relative tolerance
abs_tol (float, optional) – Absolute tolerance
ignore_spaces (bool, optional) – Flag to strip whitespace (including newlines) from string columns
ignore_case (bool, optional) – Flag to ignore the case of string columns
- Returns:
Boolean Spark Column: True if values match according to the rules above, False otherwise.
- Return type:
pyspark.sql.Column
Example
>>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.getOrCreate() >>> df = spark.createDataFrame([ ... (1, 1.1, "ABC", None), ... (2, 2.0, "DEF ", 4.0), ... (3, None, "ghi", 5.0) ... ], ["id", "col1", "col2", "col3"]) >>> # Compare numeric columns with tolerance >>> df = df.withColumn("col1_equals_col3", columns_equal(df, "col1", "col3", rel_tol=0.1)) >>> # Compare string columns ignoring case and spaces >>> df = df.withColumn("col2_normalized", columns_equal(df, "col2", "col2", ignore_spaces=True, ignore_case=True))
Note
Starting in version 0.18.0, the behavior of this function was changed so rather than returning a DataFrame a Column expression is returned.
- datacompy.spark.sql.decimal_comparator()¶
Check equality with decimal(X, Y) types.
Otherwise treated as the string “decimal”.
- datacompy.spark.sql.get_merged_columns(original_df: DataFrame, merged_df: DataFrame, suffix: str) List[str] ¶
Get the columns from an original dataframe, in the new merged dataframe.
- Parameters:
original_df (pyspark.sql.DataFrame) – The original, pre-merge dataframe
merged_df (pyspark.sql.DataFrame) – Post-merge with another dataframe, with suffixes added in.
suffix (str) – What suffix was used to distinguish when the original dataframe was overlapping with the other merged dataframe.
- Returns:
Column list of the original dataframe pre suffix
- Return type:
List[str]
Module contents¶
Spark comparisons.