Spark Usage¶
Important
With version v0.12.0
the original SparkCompare
was replaced with a
Pandas on Spark implementation The original SparkCompare
implementation differs from all the other native implementations. To align the API better,
and keep behaviour consistent we are deprecating the original SparkCompare
into a new module LegacySparkCompare
Subsequently in v0.13.0
a PySaprk DataFrame class has been introduced (SparkSQLCompare
)
which accepts pyspark.sql.DataFrame
and should provide better performance. With this version
the Pandas on Spark implementation has been renamed to SparkPandasCompare
and all the spark
logic is now under the spark
submodule.
If you wish to use the old SparkCompare moving forward you can import it like so:
from datacompy.spark.legacy import LegacySparkCompare
Important
Starting with v0.14.1
, SparkPandasCompare
is slated for deprecation. SparkSQLCompare
is the prefered and much more performant. It should be noted that if you continue to use SparkPandasCompare
that numpy
2+ is not supported due to dependnecy issues.
For SparkSQLCompare
on_index
is not supported.Joining is done using
<=>
which is the equality test that is safe for null values.SparkSQLCompare
comparespyspark.sql.DataFrame
’s
SparkSQLCompare¶
There is currently only one supported method for joining your dataframes - by join column(s).
from io import StringIO
import pandas as pd
import pyspark.pandas as ps
from datacompy import SparkSQLCompare
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data1 = """acct_id,dollar_amt,name,float_fld,date_fld
10000001234,123.45,George Maharis,14530.1555,2017-01-01
10000001235,0.45,Michael Bluth,1,2017-01-01
10000001236,1345,George Bluth,,2017-01-01
10000001237,123456,Bob Loblaw,345.12,2017-01-01
10000001239,1.05,Lucille Bluth,,2017-01-01
"""
data2 = """acct_id,dollar_amt,name,float_fld
10000001234,123.4,George Michael Bluth,14530.155
10000001235,0.45,Michael Bluth,
10000001236,1345,George Bluth,1
10000001237,123456,Robert Loblaw,345.12
10000001238,1.05,Loose Seal Bluth,111
"""
# SparkSQLCompare
df1 = spark.createDataFrame(pd.read_csv(StringIO(data1)))
df2 = spark.createDataFrame(pd.read_csv(StringIO(data2)))
compare = SparkSQLCompare(
spark,
df1,
df2,
join_columns='acct_id', # You can also specify a list of columns
abs_tol=0, # Optional, defaults to 0
rel_tol=0, # Optional, defaults to 0
df1_name='Original', # Optional, defaults to 'df1'
df2_name='New' # Optional, defaults to 'df2'
)
compare.matches(ignore_extra_columns=False)
# False
# This method prints out a human-readable report summarizing and sampling differences
print(compare.report())
Reports¶
A report is generated by calling report()
, which returns a string.
Here is a sample report generated by datacompy
for the two tables above,
joined on acct_id
(Note: if you don’t specify df1_name
and/or df2_name
,
then any instance of “original” or “new” in the report is replaced with “df1”
and/or “df2”.):
DataComPy Comparison
--------------------
DataFrame Summary
-----------------
DataFrame Columns Rows
0 Original 5 5
1 New 4 5
Column Summary
--------------
Number of columns in common: 4
Number of columns in Original but not in New: 1
Number of columns in New but not in Original: 0
Row Summary
-----------
Matched on: acct_id
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 4
Number of rows in Original but not in New: 1
Number of rows in New but not in Original: 1
Number of rows with some compared columns unequal: 4
Number of rows with all compared columns equal: 0
Column Comparison
-----------------
Number of columns compared with some values unequal: 3
Number of columns compared with all values equal: 1
Total number of values which compare unequal: 6
Columns with Unequal Values or Types
------------------------------------
Column Original dtype New dtype # Unequal Max Diff # Null Diff
0 dollar_amt float64 float64 1 0.0500 0
2 float_fld float64 float64 3 0.0005 2
1 name object object 2 NaN 0
Sample Rows with Unequal Values
-------------------------------
acct_id dollar_amt (Original) dollar_amt (New)
0 10000001234 123.45 123.4
acct_id name (Original) name (New)
0 10000001234 George Maharis George Michael Bluth
3 10000001237 Bob Loblaw Robert Loblaw
acct_id float_fld (Original) float_fld (New)
0 10000001234 14530.1555 14530.155
1 10000001235 1.0000 NaN
2 10000001236 NaN 1.000
Sample Rows Only in Original (First 10 Columns)
-----------------------------------------------
acct_id_df1 dollar_amt_df1 name_df1 float_fld_df1 date_fld_df1 _merge_left
5 10000001239 1.05 Lucille Bluth NaN 2017-01-01 True
Sample Rows Only in New (First 10 Columns)
------------------------------------------
acct_id_df2 dollar_amt_df2 name_df2 float_fld_df2 _merge_right
4 10000001238 1.05 Loose Seal Bluth 111.0 True
Convenience Methods¶
There are a few convenience methods and attributes available after the comparison has been run:
print(compare.intersect_rows[['name_df1', 'name_df2', 'name_match']])
# name_df1 name_df2 name_match
# 0 George Maharis George Michael Bluth False
# 1 Michael Bluth Michael Bluth True
# 2 George Bluth George Bluth True
# 3 Bob Loblaw Robert Loblaw False
print(compare.df1_unq_rows)
# acct_id_df1 dollar_amt_df1 name_df1 float_fld_df1 date_fld_df1 _merge_left
# 5 10000001239 1.05 Lucille Bluth NaN 2017-01-01 True
print(compare.df2_unq_rows)
# acct_id_df2 dollar_amt_df2 name_df2 float_fld_df2 _merge_right
# 4 10000001238 1.05 Loose Seal Bluth 111.0 True
print(compare.intersect_columns())
# OrderedSet(['acct_id', 'dollar_amt', 'name', 'float_fld'])
print(compare.df1_unq_columns())
# OrderedSet(['date_fld'])
print(compare.df2_unq_columns())
# OrderedSet()
Duplicate rows¶
Datacompy will try to handle rows that are duplicate in the join columns. It does this behind the scenes by generating a unique ID within each unique group of the join columns. For example, if you have two dataframes you’re trying to join on acct_id:
acct_id |
name |
---|---|
1 |
George Maharis |
1 |
Michael Bluth |
2 |
George Bluth |
acct_id |
name |
---|---|
1 |
George Maharis |
1 |
Michael Bluth |
1 |
Tony Wonder |
2 |
George Bluth |
Datacompy will generate a unique temporary ID for joining:
acct_id |
name |
temp_id |
---|---|---|
1 |
George Maharis |
0 |
1 |
Michael Bluth |
1 |
2 |
George Bluth |
0 |
acct_id |
name |
temp_id |
---|---|---|
1 |
George Maharis |
0 |
1 |
Michael Bluth |
1 |
1 |
Tony Wonder |
2 |
2 |
George Bluth |
0 |
And then merge the two dataframes on a combination of the join_columns you specified and the temporary ID, before dropping the temp_id again. So the first two rows in the first dataframe will match the first two rows in the second dataframe, and the third row in the second dataframe will be recognized as uniquely in the second.