5

OpenMPI: I want to read a file on the root node and send the contents of that file to all other nodes. I have found that MPI_Bcast does that :

int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype,
    int root, MPI_Comm comm)

All the examples that I have found have the count value already known, but in my case, the count value is primarily known on the root. Other examples say the same call of MPI_Bcast retrieves data on the other nodes.

I've added this:

typedef short Descriptor[128];
MPI_Datatype descriptorType;
MPI_Type_contiguous(sizeof(Descriptor), MPI_SHORT, &descriptorType);
MPI_Type_commit(&descriptorType);



 if(world_rank == 0)   {
  struct stat finfo;

  if(stat(argv[1], &finfo) == 0) {
        querySize = finfo.st_size/sizeof(Descriptor);
  }

 {
  //read binary query
  queryDescriptors = new Descriptor[querySize];
  fstream qFile(argv[1], ios::in | ios::binary);
  qFile.read((char*)queryDescriptors, querySize*sizeof(Descriptor));
  qFile.close();

  }
}

  MPI_Bcast((void*)&querySize, 1, MPI_INT, 0, MPI_COMM_WORLD);
  if (world_rank != 0)
  {
        queryDescriptors = new Descriptor[querySize];
  }
  MPI_Bcast((void*)queryDescriptors, querySize, descriptorType, 0, MPI_COMM_WORLD);

When I call it like this : mpirun -np 2 ./mpi_hello_world it works ok, but when I call it with more than 2, I get this:

mpi_hello_world: malloc.c:3096: sYSMALLOc: Assertion `(old_top == (((mbinptr) (((char *) &((av)->bins[((1) - 1) * 2])) - __builtin_offsetof (struct malloc_chunk, fd)))) && old_size == 0) || ((unsigned long) (old_size) >= (unsigned long)((((__builtin_offsetof (struct malloc_chunk, fd_nextsize))+((2 * (sizeof(size_t))) - 1)) & ~((2 * (sizeof(size_t))) - 1))) && ((old_top)->size & 0x1) && ((unsigned long)old_end & pagemask) == 0)' failed.
mpi_hello_world: malloc.c:3096: sYSMALLOc: Assertion `(old_top == (((mbinptr) (((char *) &((av)->bins[((1) - 1) * 2])) - __builtin_offsetof (struct malloc_chunk, fd)))) && old_size == 0) || ((unsigned long) (old_size) >= (unsigned long)((((__builtin_offsetof (struct malloc_chunk, fd_nextsize))+((2 * (sizeof(size_t))) - 1)) & ~((2 * (sizeof(size_t))) - 1))) && ((old_top)->size & 0x1) && ((unsigned long)old_end & pagemask) == 0)' failed.
5
  • 2
    So issue two broadcasts, the first with the count, the second with the buffer contents. Commented Jan 13, 2015 at 17:10
  • You are right, that is a solution. I was wondering if there is a mechanism in MPI for such situations. Commented Jan 13, 2015 at 17:32
  • Not that I'm aware of, but my MPI is getting a bit rusty. Commented Jan 13, 2015 at 18:05
  • Mark is right - the only solution is to use two broadcasts. Unlike with regular point-to-point communication, MPI provides no way to probe in advance for broadcast messages. As a matter of fact, that also applies to all collective calls, e.g. MPI_SCATTER, MPI_GATHER, etc. Commented Jan 14, 2015 at 8:12
  • I've used the solution Mark pointed, but I get this error because of the second MPI_Bcast, querySize is 23. I am working on a single node, would that be an issue ? Commented Jan 14, 2015 at 13:23

1 Answer 1

2

If qFile.read(...) is not enclosed in a if(rank==0){} test, all processes will read the file. And queryDescriptors = new Descriptor[querySize]; should be called after the first MPI_Bcast() for all processes except 0 : before, querySize is meaningless on these processes.

Process 0 must :

  • read the number of items
  • allocate
  • read the array
  • broadcast the number of items
  • broadcast the array

Other processes must :

  • receive the number of items
  • allocate
  • receive the array

Here is a example of how to read an array of float and use dynamic allocation :

#include <stdio.h>
#include <iostream>
#include <fstream>

#include <mpi.h>
using namespace std;

int main (int argc,  char *argv[])
{
    int rank;
    int size;

    MPI_Init(&argc, &argv);

    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(rank == 0)
    {
        //creating the file
        ofstream myfile;
        myfile.open ("example.txt", ios::out |ios::binary);
        int nbitem=42;
        myfile.write((char*)&nbitem,sizeof(int));

        float a=0;
        for(int i=0;i<nbitem;i++){
            myfile.write((char*)&a,sizeof(float));
            a+=2;
        }
        myfile.close();    
    }


    //now reading the file
    int nbitemread=0;
    float* buffer;
    if(rank==0){
        ifstream file ("example.txt",  ios::in |ios::binary);
        file.read ((char*)&nbitemread, sizeof(int));
        buffer=new float[nbitemread];
        file.read ((char*)buffer,nbitemread* sizeof(float));
        file.close();
        //communication
        MPI_Bcast(&nbitemread, 1, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(buffer, nbitemread, MPI_FLOAT, 0, MPI_COMM_WORLD);
    }else{

        MPI_Bcast(&nbitemread, 1, MPI_INT, 0, MPI_COMM_WORLD);
        //nbitemread is meaningfull now
        buffer=new float[nbitemread];
        MPI_Bcast(buffer, nbitemread, MPI_FLOAT, 0, MPI_COMM_WORLD);

    }

    //printing...
    cout<<"on rank "<<rank<<" rode "<<buffer[nbitemread/2]<<" on position "<<nbitemread/2<<endl;

    delete[] buffer;
    MPI_Finalize();

    return 0;
}

Compile it with mpiCC main.cpp -o main and run by mpirun -np 2 main

Another issue in your code is MPI_Type_contiguous(sizeof(Descriptor), MPI_SHORT, &descriptorType);. It should be MPI_Type_contiguous(sizeof(Descriptor), MPI_CHAR, &descriptorType); Here is a piece of code based on yours that should do the trick :

#include <stdio.h>
#include <iostream>
#include <fstream>

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>

#include <mpi.h>
using namespace std;

int main (int argc,  char *argv[])
{
    int world_rank;
    int size;

    MPI_Init(&argc, &argv);

    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

    int querySize;


    typedef short Descriptor[128];
    MPI_Datatype descriptorType;
    MPI_Type_contiguous(sizeof(Descriptor), MPI_CHAR, &descriptorType);
    MPI_Type_commit(&descriptorType);


    Descriptor* queryDescriptors;


    if(world_rank == 0)   {
        struct stat finfo;

        if(stat(argv[1], &finfo) == 0) {
            cout<<"st_size "<<finfo.st_size<<" descriptor "<<sizeof(Descriptor)<< endl;
            querySize = finfo.st_size/sizeof(Descriptor);
            cout<<"querySize "<<querySize<<endl;
        }else{
            cout<<"stat error"<<endl;
        }

        {
            //read binary query
            queryDescriptors = new Descriptor[querySize];
            fstream qFile(argv[1], ios::in | ios::binary);
            qFile.read((char*)queryDescriptors, querySize*sizeof(Descriptor));
            qFile.close();

        }
    }

    MPI_Bcast((void*)&querySize, 1, MPI_INT, 0, MPI_COMM_WORLD);
    if (world_rank != 0)
    {
        queryDescriptors = new Descriptor[querySize];
    }
    MPI_Bcast((void*)queryDescriptors, querySize, descriptorType, 0, MPI_COMM_WORLD);

    cout<<"on rank "<<world_rank<<" rode "<<queryDescriptors[querySize/2][12]<<" on position "<<querySize/2<<endl;

    delete[] queryDescriptors;

    MPI_Finalize();

    return 0;
}
Sign up to request clarification or add additional context in comments.

6 Comments

I'm sorry not have mention it, but I mostly do the same. I use mpic++ and mpirun -np 3 main
I have added more code in my question. Vive la France!
I wonder if this is an issue because I am running this on a single node ?
Traiasca Romania ! It should not be an issue, except if the file is really big. Since the whole file is broadcast, there are as many copies as processes ( maybe more for message passing). If your file is 20 times smaller than your RAM, it should not fail. How much is that ratio ? Moreover, if you use a cluster, you may need to specify a memory requirement to your job manager. For SLURM it could be #SBATCH --mem 1G. The default value for this limit is usually low and specifying a higher value could help.
cat /proc/meminfo says: 2052392 kB and file is: 5888 bytes. I am not yet using a cluster, just mpirun and host parameter.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.