2

I have the following code snippet to combine a Seq of (DataFrame, BloomFilter) tuples. The DFs are in chronological order, and the goal is to union and deduplicate all the DFs while always selecting the latest record. These are the input DFs in order:

+---+----------------+
|id |data            |
+---+----------------+
|d  |[8]             |
+---+----------------+ 

+---+----------------+
|id |data            |
+---+----------------+
|b  |[4]             |
|c  |[4]             |
+---+----------------+ 

+---+----------------+
|id |data            |
+---+----------------+
|b  |[4, 16]         |
+---+----------------+

And this is the code:

println("print input dfs")
dfList.foreach(_._1.show(20, false))
val combined = dfList.reverse.reduceLeft((tupNewer, tupOlder) => {
   println("tupNewer")
   tupNewer._1.show(20, false)
   println("tupOlder")
   tupOlder._1.show(20, false)
   val newerDfBloomFilterUDF = udf((s: String) => !tupNewer._2.mightContain(s))
   val filteredOlderDf = tupOlder._1.filter(newerDfBloomFilterUDF(col("id")))
   val unionedDf = tupNewer._1.union(filteredOlderDf)
   println("unionedDf")
   unionedDf.show(20, false)
   val mergedBloomFilter = tupNewer._2.mergeInPlace(tupOlder._2)
   (unionedDf, mergedBloomFilter)
})
println("print combined")
combined._1.show(20, false)

However, when I run it the output makes no sense:

tupNewer
+---+----------------+
|id |data            |
+---+----------------+
|b  |[4, 16]         |
+---+----------------+

tupOlder
+---+----------------+
|id |data            |
+---+----------------+
|b  |[4]             |
|c  |[4]             |
+---+----------------+

unionedDf
+---+----------------+
|id |data            |
+---+----------------+
|b  |[4, 16]         |
|c  |[4]             |
+---+----------------+

tupNewer
+---+----------------+
|id |data            |
+---+----------------+
|b  |[4, 16]         |
+---+----------------+

tupOlder
+---+----------------+
|id |data            |
+---+----------------+
|d  |[8]             |
+---+----------------+

unionedDf
+---+----------------+
|id |data            |
+---+----------------+
|b  |[4, 16]         |
|d  |[8]             |
+---+----------------+

print combined
+---+----------------+
|id |data            |
+---+----------------+
|b  |[4, 16]         |
+---+----------------+

At the very least, each tupNewer value should be equivalent to the previous unionedDf value, as that is what is passed as the intermediate result after every iteration. However, it seems as though the accumulator value of the reduceLeft is not being updated, and so each DF is being compared to the first, while the intermediate steps are lost. This is driving me up the wall. Any help is much appreciated.

I've tried writing the same loop with just DFs rather than tuples and commented out all lines referencing the BloomFilter logic. That code worked as expected. I then rewrote it with tuples of primitives instead of DFs and BloomFilters to make sure that there wasn't something wrong with using tuples and that worked too. In other words, I can't isolate what part of the above code is causing such strange behavior from reduceLeft.

My best guess at this point would be some kind of typing issue, though I am at a loss on how to proceed.

3
  • Welcome to SO! For me, your question has way too much cognitive load. I know you said you couldn't reproduce it with just DFs or primitive tuples, and that's a great direction. Keep narrowing down until you reach the exact point where the problem occurs, then strip away everything unnecessary. Commented Sep 13, 2024 at 9:42
  • Your code seems correct and I am also baffled about the error. My only guess would be that the BloomFilter somehow mutates the DF but that doesn't make sense AFAIK. Can you create a Scastie reproducing the issue? Or a github repo? - BTW, fun fact, reverse.reduceLeft is the same as reduceRight :) Commented Sep 13, 2024 at 16:08
  • 1
    Try to persist (write-and-read) each new dataframe, to make sure there is no lineage. If it works, it might be a Spark bug. I've encountered some performance bugs related to union(), but this one is a real gem :) Commented Sep 16, 2024 at 14:19

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.