2

I have a PySpark DataFrame named DF with (K,V) pairs. I would like to apply multiple functions with ReduceByKey. For example, I have following three simple functions:

def sumFunc(a,b): return a+b

def maxFunc(a,b): return max(a,b)

def minFunc(a,b): return min(a,b)

When I apply only one function, e.g,, following three work:

DF.reduceByKey(sumFunc)  #works
DF.reduceByKey(maxFunc)  #works
DF.reduceByKey(minFunc)  #works

But, when I apply more than one function, it does not work, e.g., followings do not work.

DF.reduceByKey(sumFunc, maxfunc, minFunc) #it does not work
DF.reduceByKey(sumFunc, maxfunc) #it does not work
DF.reduceByKey(maxfunc, minFunc) #it does not work
DF.reduceByKey(sumFunc, minFunc) #it does not work

I do not want to use groupByKey because it slows down the computation.

1 Answer 1

1

If input is a DataFrame just use agg:

import pyspark.sql.functions as sqlf

df = sc.parallelize([
   ("foo", 1.0), ("foo", 2.5), ("bar", -1.0), ("bar", 99.0)
]).toDF(["k", "v"])

df.groupBy("k").agg(sqlf.min("v"), sqlf.max("v"), sqlf.sum("v")).show()

## +---+------+------+------+
## |  k|min(v)|max(v)|sum(v)|
## +---+------+------+------+
## |bar|  -1.0|  99.0|  98.0|
## |foo|   1.0|   2.5|   3.5|
## +---+------+------+------+

With RDDs you can use statcounter:

from pyspark.statcounter import StatCounter

rdd = df.rdd
stats = rdd.aggregateByKey(
    StatCounter(), StatCounter.merge, StatCounter.mergeStats
).mapValues(lambda s: (s.min(), s.max(), s.sum()))

stats.collect()
## [('bar', (-1.0, 99.0, 98.0)), ('foo', (1.0, 2.5, 3.5))]

Using your functions you could do something like this:

def apply(x, y, funs=[minFunc, maxFunc, sumFunc]):
    return [f(x_, y_) for f, x_, y_ in zip(*(funs, x, y))]

rdd.combineByKey(lambda x: (x, x, x), apply, apply).collect()
## [('bar', [-1.0, 99.0, 98.0]), ('foo', [1.0, 2.5, 3.5])]
Sign up to request clarification or add additional context in comments.

4 Comments

Can you also use the RDD method for multiple StatCounter instances at once? For instance if you want min/max for different columns in the same aggregation?
I tried to use your StatCounter example. But when I try it on a key-value RDD with string-float, then I get this error: TypeError: unbound method merge() must be called with NoneType instance as first argument (got StatCounter instance instead)
@Matthias I don't use Python 2 anymore. If you use 2.x you'll have to use functions which use specific instances you receive in seq-op and merge-op.
thanks for the hint. After search for weeks, I finally found a really good explanation here.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.