Spark Usage =========== .. important:: With version ``v0.12.0`` the original ``SparkCompare`` was replaced with a Pandas on Spark implementation The original ``SparkCompare`` implementation differs from all the other native implementations. To align the API better, and keep behaviour consistent we are deprecating the original ``SparkCompare`` into a new module ``LegacySparkCompare`` Subsequently in ``v0.13.0`` a PySaprk DataFrame class has been introduced (``SparkSQLCompare``) which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark logic is now under the ``spark`` submodule. If you wish to use the old SparkCompare moving forward you can import it like so: .. code-block:: python from datacompy.spark.legacy import LegacySparkCompare .. important:: Starting with ``v0.14.1``, ``SparkPandasCompare`` is slated for deprecation. ``SparkSQLCompare`` is the prefered and much more performant. It should be noted that if you continue to use ``SparkPandasCompare`` that ``numpy`` 2+ is not supported due to dependnecy issues. For ``SparkSQLCompare`` - ``on_index`` is not supported. - Joining is done using ``<=>`` which is the equality test that is safe for null values. - ``SparkSQLCompare`` compares ``pyspark.sql.DataFrame``'s SparkSQLCompare --------------- There is currently only one supported method for joining your dataframes - by join column(s). .. code-block:: python from io import StringIO import pandas as pd import pyspark.pandas as ps from datacompy import SparkSQLCompare from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() data1 = """acct_id,dollar_amt,name,float_fld,date_fld 10000001234,123.45,George Maharis,14530.1555,2017-01-01 10000001235,0.45,Michael Bluth,1,2017-01-01 10000001236,1345,George Bluth,,2017-01-01 10000001237,123456,Bob Loblaw,345.12,2017-01-01 10000001239,1.05,Lucille Bluth,,2017-01-01 """ data2 = """acct_id,dollar_amt,name,float_fld 10000001234,123.4,George Michael Bluth,14530.155 10000001235,0.45,Michael Bluth, 10000001236,1345,George Bluth,1 10000001237,123456,Robert Loblaw,345.12 10000001238,1.05,Loose Seal Bluth,111 """ # SparkSQLCompare df1 = spark.createDataFrame(pd.read_csv(StringIO(data1))) df2 = spark.createDataFrame(pd.read_csv(StringIO(data2))) compare = SparkSQLCompare( spark, df1, df2, join_columns='acct_id', # You can also specify a list of columns abs_tol=0, # Optional, defaults to 0 rel_tol=0, # Optional, defaults to 0 df1_name='Original', # Optional, defaults to 'df1' df2_name='New' # Optional, defaults to 'df2' ) compare.matches(ignore_extra_columns=False) # False # This method prints out a human-readable report summarizing and sampling differences print(compare.report()) Reports ------- A report is generated by calling ``report()``, which returns a string. Here is a sample report generated by ``datacompy`` for the two tables above, joined on ``acct_id`` (Note: if you don't specify ``df1_name`` and/or ``df2_name``, then any instance of "original" or "new" in the report is replaced with "df1" and/or "df2".):: DataComPy Comparison -------------------- DataFrame Summary ----------------- DataFrame Columns Rows 0 Original 5 5 1 New 4 5 Column Summary -------------- Number of columns in common: 4 Number of columns in Original but not in New: 1 Number of columns in New but not in Original: 0 Row Summary ----------- Matched on: acct_id Any duplicates on match values: No Absolute Tolerance: 0 Relative Tolerance: 0 Number of rows in common: 4 Number of rows in Original but not in New: 1 Number of rows in New but not in Original: 1 Number of rows with some compared columns unequal: 4 Number of rows with all compared columns equal: 0 Column Comparison ----------------- Number of columns compared with some values unequal: 3 Number of columns compared with all values equal: 1 Total number of values which compare unequal: 6 Columns with Unequal Values or Types ------------------------------------ Column Original dtype New dtype # Unequal Max Diff # Null Diff 0 dollar_amt float64 float64 1 0.0500 0 2 float_fld float64 float64 3 0.0005 2 1 name object object 2 NaN 0 Sample Rows with Unequal Values ------------------------------- acct_id dollar_amt (Original) dollar_amt (New) 0 10000001234 123.45 123.4 acct_id name (Original) name (New) 0 10000001234 George Maharis George Michael Bluth 3 10000001237 Bob Loblaw Robert Loblaw acct_id float_fld (Original) float_fld (New) 0 10000001234 14530.1555 14530.155 1 10000001235 1.0000 NaN 2 10000001236 NaN 1.000 Sample Rows Only in Original (First 10 Columns) ----------------------------------------------- acct_id_df1 dollar_amt_df1 name_df1 float_fld_df1 date_fld_df1 _merge_left 5 10000001239 1.05 Lucille Bluth NaN 2017-01-01 True Sample Rows Only in New (First 10 Columns) ------------------------------------------ acct_id_df2 dollar_amt_df2 name_df2 float_fld_df2 _merge_right 4 10000001238 1.05 Loose Seal Bluth 111.0 True Convenience Methods ------------------- There are a few convenience methods and attributes available after the comparison has been run: .. code-block:: python print(compare.intersect_rows[['name_df1', 'name_df2', 'name_match']]) # name_df1 name_df2 name_match # 0 George Maharis George Michael Bluth False # 1 Michael Bluth Michael Bluth True # 2 George Bluth George Bluth True # 3 Bob Loblaw Robert Loblaw False print(compare.df1_unq_rows) # acct_id_df1 dollar_amt_df1 name_df1 float_fld_df1 date_fld_df1 _merge_left # 5 10000001239 1.05 Lucille Bluth NaN 2017-01-01 True print(compare.df2_unq_rows) # acct_id_df2 dollar_amt_df2 name_df2 float_fld_df2 _merge_right # 4 10000001238 1.05 Loose Seal Bluth 111.0 True print(compare.intersect_columns()) # OrderedSet(['acct_id', 'dollar_amt', 'name', 'float_fld']) print(compare.df1_unq_columns()) # OrderedSet(['date_fld']) print(compare.df2_unq_columns()) # OrderedSet() Duplicate rows -------------- Datacompy will try to handle rows that are duplicate in the join columns. It does this behind the scenes by generating a unique ID within each unique group of the join columns. For example, if you have two dataframes you're trying to join on acct_id: =========== ================ acct_id name =========== ================ 1 George Maharis 1 Michael Bluth 2 George Bluth =========== ================ =========== ================ acct_id name =========== ================ 1 George Maharis 1 Michael Bluth 1 Tony Wonder 2 George Bluth =========== ================ Datacompy will generate a unique temporary ID for joining: =========== ================ ======== acct_id name temp_id =========== ================ ======== 1 George Maharis 0 1 Michael Bluth 1 2 George Bluth 0 =========== ================ ======== =========== ================ ======== acct_id name temp_id =========== ================ ======== 1 George Maharis 0 1 Michael Bluth 1 1 Tony Wonder 2 2 George Bluth 0 =========== ================ ======== And then merge the two dataframes on a combination of the join_columns you specified and the temporary ID, before dropping the temp_id again. So the first two rows in the first dataframe will match the first two rows in the second dataframe, and the third row in the second dataframe will be recognized as uniquely in the second.