Here is an example that will illustrate what's going on.
Let's consider what happens when you call reduce on a list with some function f:
reduce(f, [a,b,c]) = f(f(a,b),c)
If we take your example, f = lambda u, v: u[1] + v[1], then the above expression breaks down into:
reduce(f, [a,b,c]) = f(f(a,b),c) = f(a[1]+b[1],c)
But a[1] + b[1] is an integer so there is no __getitem__ method, hence your error.
In general, the better approach (as shown below) is to use map() to first extract the data in the format that you want, and then apply reduceByKey().
A MCVE with your data
element = sc.parallelize(
[
('A', ('toto' , 10)),
('A', ('titi' , 30)),
('5', ('tata', 10)),
('A', ('toto', 10))
]
)
You can almost get your desired output with a more sophisticated reduce function:
def add_tuple_values(a, b):
try:
u = a[1]
except:
u = a
try:
v = b[1]
except:
v = b
return u + v
print(element.reduceByKey(add_tuple_values).collect())
Except that this results in:
[('A', 50), ('5', ('tata', 10))]
Why? Because there's only one value for the key '5', so there is nothing to reduce.
For these reasons, it's best to first call map. To get your desired output, you could do:
>>> print(element.map(lambda x: (x[0], x[1][1])).reduceByKey(lambda u, v: u+v).collect())
[('A', 50), ('5', 10)]
Update 1
Here's one more approach:
You could create tuples in your reduce function, and then call map to extract the value you want. (Essentially reverse the order of map and reduce.)
print(
element.reduceByKey(lambda u, v: (0,u[1]+v[1]))
.map(lambda x: (x[0], x[1][1]))
.collect()
)
[('A', 50), ('5', 10)]
Notes
- Had there been at least 2 records for each key, using
add_tuple_values() would have given you the correct output.
x[1]+y[1]is an int, not a tuple (which is whatreduceByKeyexpect in the next iteration.(A , 50) (5, 10), but whyreduceByKeyshould expect a tuple in the next iteration? should it keep the same type of the values reduced?