0

Disclaimer: I'm VERY new to spark and scala. I am working on a document similarity project in Scala with Spark. I have a dataframe which looks like this:

+--------+--------------------+------------------+
|    text|            shingles|   hashed_shingles|
+--------+--------------------+------------------+
|  qwerty|[qwe, wer, ert, rty]|  [-4, -6, -1, -9]|
|qwerasfg|[qwe, wer, era, r...|[-4, -6, 6, -2, 2]|
+--------+--------------------+------------------+

Where I split the document text into shingles and computed a hash value for each one.

Imagine I have a hash_function(integer, seed) -> integer. Now I want to apply n different hash functions of this form to the hashed_shingles arrays. I.e. obtain an array of n arrays such that each array is hash_function(hashed_shingles, seed) with seed from 1 to n.

I'm trying something like this, but I cannot get it to work:

val n = 3
df = df.withColumn("tmp", array_repeat($"hashed_shingles", n)) // Repeat minhashes
val minhash_expr = "transform(tmp,(x,i) -> hash_function(x, i))"
df = df.withColumn("tmp", expr(minhash_expr)) // Apply hash to each array

I know how to do it with a udf, but as I understand they are not optimized and I should try to avoid using them, so I try to do everything with org.apache.spark.sql.functions.

Any ideas on how to approach it without udf?

The udf which achieves the same goal is this:

// Family of hashing functions
class Hasher(seed: Int, max_val : Int, p : Int = 104729) {
  private val random_generator = new scala.util.Random(seed)
  val a = 1 + 2*random_generator.nextInt((p-2)/2)// a odd in [1, p-1]
  val b = 1 + random_generator.nextInt(p - 2) // b in [1, p-1]
  def getHash(x : Int) : Int = ((a*x + b) % p) % max_val
}

// Compute a list of minhashes from a list of hashers given a set of ids
class MinHasher(hashes : List[Hasher]) {
  def getMinHash(set : Seq[Int])(hasher : Hasher) : Int = set.map(hasher.getHash).min
  def getMinHashes(set: Seq[Int]) : Seq[Int] = hashes.map(getMinHash(set))
}

// Minhasher
val minhash_len = 100
val hashes = List.tabulate(minhash_len)(n => new Hasher(n, shingle_bins))
val minhasher = new MinHasher(hashes)

// Compute Minhashes
val minhasherUDF = udf[Seq[Int], Seq[Int]](minhasher.getMinHashes)
df = df.withColumn("minhashes", minhasherUDF('hashed_shingles))
4
  • 1
    It would help to understand the exact problem/requirement if you could provide the failed result/error along with expected result. Commented Nov 5, 2020 at 20:18
  • You are right, I added the udf which achieves the same goal. Commented Nov 6, 2020 at 17:05
  • 1
    Higher-order functions like transform (or aggregate like in this SO answer) are for transforming data of complex type (e.g. Array) "element-wise" with a user-provided function. In your use case, the entire Array is being used as a whole by your custom function, thus it isn't suitable to use transform. I would go with your UDF approach. Commented Nov 6, 2020 at 18:43
  • Thank you, good to know. Commented Nov 6, 2020 at 19:44

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.