Find out if 2 tables (`tbl_spark`) are equal without collecting them using sparklyr
Asked Answered
T

3

6

Consider there are 2 tables or table references in spark which you want to compare, e.g. to ensure that your backup worked correctly. Is there a possibility to do that remote in spark? Because it's not useful to copy all the data to R using collect().

library(sparklyr)
library(dplyr)
library(DBI)

##### create spark connection here
# sc <- spark_connect(<yourcodehere>)
spark_connection(sc)
spark_context(sc)

trees1_tbl <- sdf_copy_to(sc, trees, "trees1")
trees2_tbl <- sdf_copy_to(sc, trees, "trees2")
identical(trees1_tbl, trees2_tbl) # FALSE
identical(collect(trees1_tbl), collect(trees2_tbl)) # TRUE
setequal(trees1_tbl, trees2_tbl) # FALSE
setequal(collect(trees1_tbl), (trees2_tbl)) # TRUE

spark_disconnect(sc)

Would be nice, if dplyr::setequal() could be used directly.

Turtleback answered 26/7, 2018 at 8:51 Comment(0)
T
2

Thanks @Cosmin for the hint!

First use setdiff(), which has a method for tbl_lazy-objects provided by dplyr (unlike setequal), count the rows and compare them with 0.

trees1_tbl %>% setdiff(trees2_tbl) %>% sdf_nrow() == 0
## TRUE

Would result in TRUE if all data from trees1_tbl is contained in trees2_tbl. If they differ, one can leave out the == 0 to get the number of rows missing in trees2_tbl.

Turtleback answered 31/10, 2018 at 10:38 Comment(0)
J
1

It is just not gonna work. The main point to remember here, is that Spark DataFrames* are not data containers. There are descriptions of transformations, that will be applied on the data, once pipeline is executed. It means, that result can be different every time you evaluate the data. The only meaningful question you can ask here is if both DataFrames describes the same execution plan, which is obviously not useful in your case.

So how to compare the data? There is really no universal answer here.

Testing

If it is a part of an unit test collecting data and comparing local objects is the way to go (although please keep in mind that using sets can miss some subtle but common problems).

Production

Outside unit test you can try to check if

  • Size A is equal to the size of B
  • A EXCEPT B IS ∅ AND B EXCEPT A IS ∅

This however is very expensive and if feasible might significantly increase the cost of the process. So in practice you might prefer methods which don't provide strict guarantees, but have better performance profile. These will differ depending on the input and output source as well as the failure model (for example file based sources are more reliable than ones using databases or message queues).

In the simplest case you can manually inspect basic invariants, like the number of rows read and written, using Spark web UI. For more advanced monitoring you can implement your own Spark listeners (check for example Spark: how to get the number of written rows?), query listeners, or accumulators, but all this components are not exposed in sparklyr and will require writing native (Scala or Java) code.


* I refer here to Spark, but using dplyr with database backend is not that different.

Justen answered 26/7, 2018 at 13:30 Comment(0)
V
1

I wrote an example of how I think you can do it. Basically, you just have to union both tables, and after that just apply distinct() to the result of union. After distinct() just compare the number of rows of resulted dataframe with the initial number of rows.

>>> rdd = spark.sparkContext.parallelize([("test","test1")])
>>> rdd.collect()
[('test', 'test1')]
>>> df1 = spark.createDataFrame(rdd).toDF("col1","col2")
>>> df1.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+

>>> df2 = spark.createDataFrame(rdd).toDF("col1","col2")
>>> df2.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+

>>> df3 = df1.union(df2)
>>> df3.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
|test|test1|
+----+-----+

>>> df3.distinct().show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+

>>> df1.count()
1
>>> df3.distinct().count()
1
Verlie answered 23/10, 2018 at 18:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.