Disclaimer: I'm VERY new to spark and scala. I am working on a document similarity project in Scala with Spark. I have a dataframe which looks like this:
+--------+--------------------+------------------+
| text| shingles| hashed_shingles|
+--------+--------------------+------------------+
| qwerty|[qwe, wer, ert, rty]| [-4, -6, -1, -9]|
|qwerasfg|[qwe, wer, era, r...|[-4, -6, 6, -2, 2]|
+--------+--------------------+------------------+
Where I split the document text into shingles and computed a hash value for each one.
Imagine I have a hash_function(integer, seed) -> integer.
Now I want to apply n different hash functions of this form to the hashed_shingles arrays. I.e. obtain an array of n arrays such that each array is hash_function(hashed_shingles, seed) with seed from 1 to n.
I'm trying something like this, but I cannot get it to work:
val n = 3
df = df.withColumn("tmp", array_repeat($"hashed_shingles", n)) // Repeat minhashes
val minhash_expr = "transform(tmp,(x,i) -> hash_function(x, i))"
df = df.withColumn("tmp", expr(minhash_expr)) // Apply hash to each array
I know how to do it with a udf, but as I understand they are not optimized and I should try to avoid using them, so I try to do everything with org.apache.spark.sql.functions.
Any ideas on how to approach it without udf?
The udf which achieves the same goal is this:
// Family of hashing functions
class Hasher(seed: Int, max_val : Int, p : Int = 104729) {
private val random_generator = new scala.util.Random(seed)
val a = 1 + 2*random_generator.nextInt((p-2)/2)// a odd in [1, p-1]
val b = 1 + random_generator.nextInt(p - 2) // b in [1, p-1]
def getHash(x : Int) : Int = ((a*x + b) % p) % max_val
}
// Compute a list of minhashes from a list of hashers given a set of ids
class MinHasher(hashes : List[Hasher]) {
def getMinHash(set : Seq[Int])(hasher : Hasher) : Int = set.map(hasher.getHash).min
def getMinHashes(set: Seq[Int]) : Seq[Int] = hashes.map(getMinHash(set))
}
// Minhasher
val minhash_len = 100
val hashes = List.tabulate(minhash_len)(n => new Hasher(n, shingle_bins))
val minhasher = new MinHasher(hashes)
// Compute Minhashes
val minhasherUDF = udf[Seq[Int], Seq[Int]](minhasher.getMinHashes)
df = df.withColumn("minhashes", minhasherUDF('hashed_shingles))
transform(oraggregatelike in this SO answer) are for transforming data of complex type (e.g. Array) "element-wise" with a user-provided function. In your use case, the entire Array is being used as a whole by your custom function, thus it isn't suitable to usetransform. I would go with your UDF approach.