6

I'm new to Scala and am having problems writing unit tests.

I'm trying to compare and check equality for two Spark DataFrames in Scala for unit testing, and realized that there is no easy way to check equality for two Spark DataFrames.

The C++ equivalent code would be (assuming that the DataFrames are represented as double arrays in C++):

    int expected[10][2];
    int result[10][2];
    for (int row = 0; row < 10; row++) {
        for (int col = 0; col < 2; col++) {
            if (expected[row][col] != result[row][col]) return false;
        }
    }

The actual test would involve testing for equality based on the data types of the columns of the DataFrames (testing with precision tolerance for floats, etc).

It seems like there's not an easy way to iteratively loop over all the elements in the DataFrames using Scala and the other solutions for checking equality of two DataFrames such as df1.except(df2) do not work in my case as I need to be able to provide support for testing equality with tolerance for floats and doubles.

Of course, I could try to round all the elements beforehand and compare the results afterwards, but I would like to see if there are any other solutions that would allow me to iterate through the DataFrames to check for equality.

5
  • How big are your dataframes ? If they are not so big, you could sort/collect them and then easily compare them. Commented Nov 8, 2016 at 23:02
  • Since those are unit-test data frames, those should be quite small. Just collect them into a List and compare. Commented Nov 9, 2016 at 6:20
  • Yeah, my test currently collects the data frames into a list and compares them, but I was hoping to create tools that could also test on bigger data frames as well. I'm guessing that there's no easy way of accomplishing this? Commented Nov 9, 2016 at 18:24
  • *** Asked 3 years, 4 months ago Active 5 months ago Viewed 7k times --- YET still no Answer accepted ... Commented Apr 2, 2020 at 20:43
  • This question is similar to: DataFrame equality in Apache Spark. If you believe it’s different, please edit the question, make it clear how it’s different and/or how the answers on that question are not helpful for your problem. Commented Feb 9 at 20:46

3 Answers 3

10
import org.scalatest.{BeforeAndAfterAll, FeatureSpec, Matchers}

outDf.collect() should contain theSameElementsAs (dfComparable.collect())
# or ( obs order matters ! )

// outDf.except(dfComparable).toDF().count should be(0)
outDf.except(dfComparable).count should be(0)   
Sign up to request clarification or add additional context in comments.

3 Comments

except function already returns a Dataframe so no need for toDF
outDf.except(dfComparable).count should be(0) is not a good choice because .except returns a table with the elements from left that are not in right. If some element are missing in left the test will not fail. assertSmallDataFrameEqualityis a better alternative see stackoverflow.com/questions/31197353/…
assertDataFrameEquals from spark-testing-base is also an alternative.
1

If you want to check if both the data frames are equal or not for testing purpose, you can make use of subtract() method of data frame (supported in version 1.3 and above)

You can check if diff of both data frames is empty or 0. e.g. df1.subtract(df2).count() == 0

1 Comment

Thanks for the suggestion, but df1.except(df2) that I mentioned in my question has the same functionality as the df1.subtract(df2) and does not really work in this situation, where I am hoping to compare the values with precision tolerance.
0

Assuming that you have a fixed # of col and rows, one solution could be join both Df's by row index (in case you do not have id's for the records), and then iterate direct in the final DF [with all the columns of both DF's]. Something like this:

Schemas
DF1
root
 |-- col1: double (nullable = true)
 |-- col2: double (nullable = true)
 |-- col3: double (nullable = true)

DF2
root
 |-- col1: double (nullable = true)
 |-- col2: double (nullable = true)
 |-- col3: double (nullable = true)

df1
+----------+-----------+------+
|      col1|       col2|  col3|
+----------+-----------+------+
|1.20000001|       1.21|   1.2|
|    2.1111|        2.3|  22.2|
|       3.2|2.330000001| 2.333|
|    2.2444|      2.344|2.3331|
+----------+-----------+------+

df2
+------+-----+------+
|  col1| col2|  col3|
+------+-----+------+
|   1.2| 1.21|   1.2|
|2.1111|  2.3|  22.2|
|   3.2| 2.33| 2.333|
|2.2444|2.344|2.3331|
+------+-----+------+

Added row index
df1
+----------+-----------+------+---+
|      col1|       col2|  col3|row|
+----------+-----------+------+---+
|1.20000001|       1.21|   1.2|  0|
|    2.1111|        2.3|  22.2|  1|
|       3.2|2.330000001| 2.333|  2|
|    2.2444|      2.344|2.3331|  3|
+----------+-----------+------+---+

df2
+------+-----+------+---+
|  col1| col2|  col3|row|
+------+-----+------+---+
|   1.2| 1.21|   1.2|  0|
|2.1111|  2.3|  22.2|  1|
|   3.2| 2.33| 2.333|  2|
|2.2444|2.344|2.3331|  3|
+------+-----+------+---+

Combined DF
+---+----------+-----------+------+------+-----+------+
|row|      col1|       col2|  col3|  col1| col2|  col3|
+---+----------+-----------+------+------+-----+------+
|  0|1.20000001|       1.21|   1.2|   1.2| 1.21|   1.2|
|  1|    2.1111|        2.3|  22.2|2.1111|  2.3|  22.2|
|  2|       3.2|2.330000001| 2.333|   3.2| 2.33| 2.333|
|  3|    2.2444|      2.344|2.3331|2.2444|2.344|2.3331|
+---+----------+-----------+------+------+-----+------+

This is how you can do that:

println("Schemas")
    println("DF1")
    df1.printSchema()
    println("DF2")
    df2.printSchema()
    println("df1")
    df1.show
    println("df2")
    df2.show
    val finaldf1 = df1.withColumn("row", monotonically_increasing_id())
    val finaldf2 = df2.withColumn("row", monotonically_increasing_id())
    println("Added row index")
    println("df1")
    finaldf1.show()
    println("df2")
    finaldf2.show()

    val joinedDfs = finaldf1.join(finaldf2, "row")
    println("Combined DF")
    joinedDfs.show()

    val tolerance = 0.001
    def isInValidRange(a: Double, b: Double): Boolean ={
      Math.abs(a-b)<=tolerance
    }
    joinedDfs.take(10).foreach(row => {
      assert( isInValidRange(row.getDouble(1), row.getDouble(4)) , "Col1 validation. Row %s".format(row.getLong(0)+1))
      assert( isInValidRange(row.getDouble(2), row.getDouble(5)) , "Col2 validation. Row %s".format(row.getLong(0)+1))
      assert( isInValidRange(row.getDouble(3), row.getDouble(6)) , "Col3 validation. Row %s".format(row.getLong(0)+1))
    })

Note: Assert's are not serialized, a workaround is use take() to avoid errors.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.