Spark Usage

Important

With version v0.12.0 the original SparkCompare was replaced with a Pandas on Spark implementation The original SparkCompare implementation differs from all the other native implementations. To align the API better, and keep behaviour consistent we are deprecating the original SparkCompare into a new module LegacySparkCompare

Subsequently in v0.13.0 a PySaprk DataFrame class has been introduced (SparkSQLCompare) which accepts pyspark.sql.DataFrame and should provide better performance. With this version the Pandas on Spark implementation has been renamed to SparkPandasCompare and all the spark logic is now under the spark submodule.

If you wish to use the old SparkCompare moving forward you can import it like so:

from datacompy.spark.legacy import LegacySparkCompare

Important

Starting with v0.14.1, SparkPandasCompare is slated for deprecation. SparkSQLCompare is the prefered and much more performant. It should be noted that if you continue to use SparkPandasCompare that numpy 2+ is not supported due to dependnecy issues.

For SparkSQLCompare

  • on_index is not supported.

  • Joining is done using <=> which is the equality test that is safe for null values.

  • SparkSQLCompare compares pyspark.sql.DataFrame’s

SparkSQLCompare

There is currently only one supported method for joining your dataframes - by join column(s).

from io import StringIO
import pandas as pd
import pyspark.pandas as ps
from datacompy import  SparkSQLCompare
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data1 = """acct_id,dollar_amt,name,float_fld,date_fld
10000001234,123.45,George Maharis,14530.1555,2017-01-01
10000001235,0.45,Michael Bluth,1,2017-01-01
10000001236,1345,George Bluth,,2017-01-01
10000001237,123456,Bob Loblaw,345.12,2017-01-01
10000001239,1.05,Lucille Bluth,,2017-01-01
"""

data2 = """acct_id,dollar_amt,name,float_fld
10000001234,123.4,George Michael Bluth,14530.155
10000001235,0.45,Michael Bluth,
10000001236,1345,George Bluth,1
10000001237,123456,Robert Loblaw,345.12
10000001238,1.05,Loose Seal Bluth,111
"""

# SparkSQLCompare
df1 = spark.createDataFrame(pd.read_csv(StringIO(data1)))
df2 = spark.createDataFrame(pd.read_csv(StringIO(data2)))

compare = SparkSQLCompare(
    spark,
    df1,
    df2,
    join_columns='acct_id',  # You can also specify a list of columns
    abs_tol=0,  # Optional, defaults to 0
    rel_tol=0,  # Optional, defaults to 0
    df1_name='Original',  # Optional, defaults to 'df1'
    df2_name='New'  # Optional, defaults to 'df2'
)
compare.matches(ignore_extra_columns=False)
# False
# This method prints out a human-readable report summarizing and sampling differences
print(compare.report())

Reports

A report is generated by calling report(), which returns a string. Here is a sample report generated by datacompy for the two tables above, joined on acct_id (Note: if you don’t specify df1_name and/or df2_name, then any instance of “original” or “new” in the report is replaced with “df1” and/or “df2”.):

DataComPy Comparison
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0  Original        5     5
1       New        4     5

Column Summary
--------------

Number of columns in common: 4
Number of columns in Original but not in New: 1
Number of columns in New but not in Original: 0

Row Summary
-----------

Matched on: acct_id
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 4
Number of rows in Original but not in New: 1
Number of rows in New but not in Original: 1

Number of rows with some compared columns unequal: 4
Number of rows with all compared columns equal: 0

Column Comparison
-----------------

Number of columns compared with some values unequal: 3
Number of columns compared with all values equal: 1
Total number of values which compare unequal: 6

Columns with Unequal Values or Types
------------------------------------

       Column Original dtype New dtype  # Unequal  Max Diff  # Null Diff
0  dollar_amt        float64   float64          1    0.0500            0
2   float_fld        float64   float64          3    0.0005            2
1        name         object    object          2       NaN            0

Sample Rows with Unequal Values
-------------------------------

       acct_id  dollar_amt (Original)  dollar_amt (New)
0  10000001234                 123.45             123.4

       acct_id name (Original)            name (New)
0  10000001234  George Maharis  George Michael Bluth
3  10000001237      Bob Loblaw         Robert Loblaw

       acct_id  float_fld (Original)  float_fld (New)
0  10000001234            14530.1555        14530.155
1  10000001235                1.0000              NaN
2  10000001236                   NaN            1.000

Sample Rows Only in Original (First 10 Columns)
-----------------------------------------------

   acct_id_df1  dollar_amt_df1       name_df1  float_fld_df1 date_fld_df1  _merge_left
5  10000001239            1.05  Lucille Bluth            NaN   2017-01-01         True

Sample Rows Only in New (First 10 Columns)
------------------------------------------

   acct_id_df2  dollar_amt_df2          name_df2  float_fld_df2  _merge_right
4  10000001238            1.05  Loose Seal Bluth          111.0          True

Convenience Methods

There are a few convenience methods and attributes available after the comparison has been run:

print(compare.intersect_rows[['name_df1', 'name_df2', 'name_match']])
#          name_df1              name_df2  name_match
# 0  George Maharis  George Michael Bluth       False
# 1   Michael Bluth         Michael Bluth        True
# 2    George Bluth          George Bluth        True
# 3      Bob Loblaw         Robert Loblaw       False

print(compare.df1_unq_rows)
#    acct_id_df1  dollar_amt_df1       name_df1  float_fld_df1 date_fld_df1  _merge_left
# 5  10000001239            1.05  Lucille Bluth            NaN   2017-01-01         True

print(compare.df2_unq_rows)
#    acct_id_df2  dollar_amt_df2          name_df2  float_fld_df2  _merge_right
# 4  10000001238            1.05  Loose Seal Bluth          111.0          True

print(compare.intersect_columns())
# OrderedSet(['acct_id', 'dollar_amt', 'name', 'float_fld'])

print(compare.df1_unq_columns())
# OrderedSet(['date_fld'])

print(compare.df2_unq_columns())
# OrderedSet()

Duplicate rows

Datacompy will try to handle rows that are duplicate in the join columns. It does this behind the scenes by generating a unique ID within each unique group of the join columns. For example, if you have two dataframes you’re trying to join on acct_id:

acct_id

name

1

George Maharis

1

Michael Bluth

2

George Bluth

acct_id

name

1

George Maharis

1

Michael Bluth

1

Tony Wonder

2

George Bluth

Datacompy will generate a unique temporary ID for joining:

acct_id

name

temp_id

1

George Maharis

0

1

Michael Bluth

1

2

George Bluth

0

acct_id

name

temp_id

1

George Maharis

0

1

Michael Bluth

1

1

Tony Wonder

2

2

George Bluth

0

And then merge the two dataframes on a combination of the join_columns you specified and the temporary ID, before dropping the temp_id again. So the first two rows in the first dataframe will match the first two rows in the second dataframe, and the third row in the second dataframe will be recognized as uniquely in the second.