I am calling a cuda code to get a sum of all the values for each key. The purpose is to decrease the time taken by reducer by paralleling the operation. But, the values in the reducer are in IntWritable form. So, I have to convert them to an array of integers for passing to cuda code. Here is my reducer code:
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
List<Integer> numbers = new ArrayList<Integer>();
for(IntWritable val : values)
numbers.add(val.get());
}
int[] ret = ArrayUtils.toPrimitive(numbers.toArray(new Integer[numbers.size()]));
result.set(Main.sumNumbers(ret));
context.write(key,result);
}
}
The problem is that for converting IntWritable to Integer array, I have to iterate through each of the value which is a serial operation. So, it is increasing the time even more. So, is there any way by which I do not have to iterate through each of the value and directly convert to int array?
Here is the mapper code :
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Here is my cuda code :
#include <stdio.h>
#ifndef _ADDARRAY_KERNEL_H_
#define _ADDARRAY_KERNEL_H_
#ifdef __cplusplus
extern "C"
{
#endif
__global__ void add_array(int *a, int *c, int N)
{
*c = 0;
int i;
for(i = 0; i<N;i++)
{
*c = *c + a[i];
}
}
#ifdef __cplusplus
}
#endif
#endif // #ifndef _ADDARRAY_KERNEL_H_
#ifdef __cplusplus
extern "C"
{
#endif
int cuda_sum(int *a_h, int N)
{
int *a_d, c=0;
int *dev_c;
cudaMalloc((void**)&dev_c, sizeof(int));
size_t size = N * sizeof (int);
// a_h = (int *) malloc(size);
cudaMalloc((void **) & a_d, size);
cudaMemcpy(a_d, a_h, size, cudaMemcpyHostToDevice);
add_array <<<1, 1 >>>(a_d, dev_c, N);
cudaMemcpy(&c, dev_c, sizeof(int), cudaMemcpyDeviceToHost);
cudaFree(dev_c);
return c;
}
#ifdef __cplusplus
}
#endif
Thanks