0

I am calling a cuda code to get a sum of all the values for each key. The purpose is to decrease the time taken by reducer by paralleling the operation. But, the values in the reducer are in IntWritable form. So, I have to convert them to an array of integers for passing to cuda code. Here is my reducer code:

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
   private IntWritable result = new IntWritable();

   public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
       List<Integer> numbers = new ArrayList<Integer>();
       for(IntWritable val : values)
            numbers.add(val.get());
       }
       int[] ret = ArrayUtils.toPrimitive(numbers.toArray(new Integer[numbers.size()]));
       result.set(Main.sumNumbers(ret));
       context.write(key,result);
   }
}

The problem is that for converting IntWritable to Integer array, I have to iterate through each of the value which is a serial operation. So, it is increasing the time even more. So, is there any way by which I do not have to iterate through each of the value and directly convert to int array?

Here is the mapper code :

public static class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

Here is my cuda code :

#include <stdio.h>
#ifndef _ADDARRAY_KERNEL_H_
#define _ADDARRAY_KERNEL_H_

#ifdef __cplusplus
extern "C"
{
#endif
__global__ void add_array(int *a, int *c, int N)
{
  *c = 0;
  int i;
   for(i = 0; i<N;i++)
   {
    *c = *c + a[i];
   }
}
#ifdef __cplusplus
}
#endif 
#endif // #ifndef _ADDARRAY_KERNEL_H_

#ifdef __cplusplus
extern "C"
{
#endif

int cuda_sum(int *a_h, int N)
{   
    int *a_d, c=0;
    int *dev_c;
    cudaMalloc((void**)&dev_c, sizeof(int));
    size_t size = N * sizeof (int);

//      a_h = (int *) malloc(size);
    cudaMalloc((void **) & a_d, size);
    cudaMemcpy(a_d, a_h, size, cudaMemcpyHostToDevice);
        add_array <<<1, 1 >>>(a_d, dev_c, N);
        cudaMemcpy(&c, dev_c, sizeof(int), cudaMemcpyDeviceToHost);
    cudaFree(dev_c);
    return c;
}
#ifdef __cplusplus
}
#endif 

Thanks

6
  • Can you please share a link for "cuda code", is it a library which is more efficient to add numbers ?, I would assume you could use the inbuilt IntSumReducer Commented Apr 21, 2014 at 10:27
  • cuda code will be creating a customized library for efficiently adding the numbers Commented Apr 21, 2014 at 11:21
  • Can you provide me a link to this library please ? Also then @Pradyumna's reply seems ok. Commented Apr 21, 2014 at 11:41
  • See, I am creating a cuda library with .so extension. This library sums all the integers using the kernel call. (Just like a simple add operation in any cuda code). So, I need to give an integer array as an input to this cuda library. Commented Apr 21, 2014 at 14:05
  • Any short comings in the answer already provided ? Commented Apr 21, 2014 at 14:07

2 Answers 2

1

I suggest you should do something as below

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, ArrayPrimitiveWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<ArrayPrimitiveWritable> values, Context context) throws IOException,InterruptedException { int[] ret = values.next(); result.set(Main.sumNumbers(ret)); context.write(key,result); } }

The ArrayPrimitiveWritable will do the job for you.

Sign up to request clarification or add additional context in comments.

3 Comments

There is no method "next()" available for values.
It is values.iterator().next()
It is showing error and suggesting to Change type of 'ret' to 'ArrayPrimitiveWritable'
0
public static class IntSumReducer extends Reducer<Text, ArrayPrimitiveWritable, Text, IntWritable>{
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<ArrayPrimitiveWritable> values, Context context) throws IOException,InterruptedException {
            ArrayPrimitiveWritable arrayOfInts = values.iterator().next();
            final int[] ret = (int [])arrayOfInts.get();
            result.set(Main.sumNumbers(ret));
            context.write(key,result);
        }
    }

I have modified the code slightly so that it should work, however I have not run it, can you give it a shot. I am sticking to using the ArrayPrimitiveWritable just made updates so that the compilation errors go away.

5 Comments

I am getting errors with the above mentioned code. Here is a screenshot
Paste your mapper code too, you need to make a couple of changes there too I think
I am not sure, why you are going through so much !!, you are sending a hard coded 1 from your mapper, then why do you want to extract the value and do a sum ?? Why can't you just keep adding one in the for loop for(IntWritable val : values) sum++; } Something like the above in the reducer, should be enough for your case
That is not my objective. I want to reduce the time of summing the values from O(n) to O(logn). So, for this, I will be using thrust library of cuda in future. It will do the sum in O(logn). So, for this reason I want the conversion operation in reducer in O(1)
Ohh well, I do not think you can achieve this especially with the WordCount kind of problem, to achieve O(1) in the reducer it needs to fed a collection iterable

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.