Is it possible to attach a callback to be executed on a request completion?
Asked Answered
L

1

7

In MPI it is possible to run an asynchronous message passing routine (e.g. receive, with MPI_Irecv). Is it possible to attach a callback function to be executed as soon as the request is complete? For example to process the data received.

This is an example of what I am looking for:

#include "mpi.h"
#include <stdio.h>

void mycallback(void* data){
   (int*)data += 1; // add one to the received data
}

int main(int argc, char *argv[]){
    int myid, numprocs, left, right;
    int buffer[10], buffer2[10];
    MPI_Request request;
    MPI_Status status;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);

    right = (myid + 1) % numprocs;
    left = myid - 1;
    if (left < 0)
        left = numprocs - 1;

    MPI_Irecv(buffer, 10, MPI_INT, left, 123, MPI_COMM_WORLD, &request);
 // Attach_Callback(request, &mycallback); //somewhere after this point recv is completed an f is executed
    MPI_Send(buffer2, 10, MPI_INT, right, 123, MPI_COMM_WORLD);
    MPI_Wait(&request, &status); //the recv and the callback must have been called at this point
    MPI_Finalize();
    return 0;
}

I found that there is a MPI_Grequest_start and MPI_Grequest_complete function but they seem to be for something else as the request created is not related to a particular message passing.

Perhaps I have to implement a Grequest (generalized request) where the callback consists in a MPI_Recv (not MPI_Irecv). Is that the idea?

Lorentz answered 10/4, 2018 at 22:19 Comment(4)
MPI_Irecv is non blocking, MPI_Wait will wait for the request to complete, so why not put the call to mycallback after the MPI_Wait? the Send will only start once the Recv has finished (but request will be marked complete as soon as that's done) ... You may have to change the MPI_Send to MPI_IsendHeterogeneous
That is a possibility, but the point is that then the call will be executed at the point of MPI_Wait while could have been executed before using the MPI thread itself.Lorentz
ah, looking at what you what to achieve more closely MPI_Grequest_start is the correct semantic and you have to use extra_state to maintain any message specific state that you want to maintain. MPI folks seem to be avoiding creating call back support like this (see github.com/mpi-forum/mpi-forum-historic/issues/26) and as of 3.0.1 no joyHeterogeneous
@AhmedMasud Thanks for the link. Seems in the same lines. As a workaround, does it mean that if I want something like this I have to put MPI_Recv in a callback that emulates MPI_Irecv + "tasks after receive"? I don't quite understand in which of the three function I should put the completion code.Lorentz
B
2

In the standard there is not such a thing.

As said by@AhmedMasud , you can find a way around using generalized requests: http://mpi-forum.org/docs/mpi-3.1/mpi31-report/node297.htm#Node297

As you can read in it, the standard will probably never include callback for the Irecv, due to some reason I agree with (split of the work between MPI threads and your program threads).

What you attempt to do is not trivial and is linked to quite a lot of portability issue. The question you should ask yourself: will I really get any benefit from having the callback executed in the MPI thread? I suspect that this benefit in your mind is efficency, but for efficiency reasons one should avoid Irecv and Isend, non-blocking is a nice feature but should be used only and only if you have no other choices (for example a output server, where you definitely do not want to wast time of the compute clients (but even it that case, buffered send is usually better and leads you to a larger bandwidth and a smaller latency)).

What is the real structure of the comm you need? If it is 0->1, 1->2 ... n-1->n, n->0 this code works well (and will be faster than your solution) then you can define a callback easily with your favorite way of doing it (time to solution will be much smaller, debugging infinitely easier :-)):

template<class Type> 
void Parallel::sendUp(Type& bufferSend,
                      Type& bufferRec, 
                      long len)
{
    if(this->rank()%2==0)
    {
        if(this->rank()!=this->size()-1)
        {
            this->send(bufferSend,len,this->rank());
        }
        if(this->rank()!= 0)
        {
            this->receive(bufferRec,len,this->rank()-1);
        }
        else if(this->size()%2==0)
        {
            this->receive(bufferRec,len,this->size()-1);
        }
    }
    else
    {
        this->receive( bufferRec, len , this->rank()-1);
        if(this->grid_rank()!=this->grid_size()-1)
        {
            this->send(bufferSend,len,this->rank()+1);
        }
        else
        {
            this->send( bufferSend, len , 0);
        }
    }

    if(this->size()%2!=0)
    {
        if(this->rank()==this->size()-1)
        {
            this->send( bufferSend, len , 0);
        }
        if(this->grid()==0)
        {
            this->receive(bufferRec, len , this->size()-1);
        }
    }
}

In that code, the parallel object is "just" a wrapper to some MPI calls, just to simplify the calls:

parallel.rank() = rank in the comm
parallel.size() = size of the comm
parallel.send/rec() is defined as follow

template<class Type> 
void Parallel::send(Type* array, int len, int to)
{
    MPI_Send(array, len*sizeof(Type), MPI_BYTE, to, 0,comm_);
}

template<class Type> 
void Parallel::rec(Type* array, int len, int to)
{
    MPI_Send(array, len*sizeof(Type), MPI_BYTE, to, 0,comm_);
}

template<class Type>
MPI_Status Parallel2d::receive(Type& array, int from, int len)
{
    MPI_Status  status;
    MPI_Recv( &array, len*sizeof(Type), MPI_BYTE, from, 0,comm_,&status);
    return status;
}

Hope it helps.

Bathtub answered 11/4, 2018 at 1:22 Comment(9)
I agree, MPI threads shouldn't be used non communication tasks. BUT, in this case I want it to process an MPI packed into the real objects in my program. This is almost an MPI communication task, except that the types are not basic types (or MPI custom types). Thank you for the code, I am not trying to avoid Irecv, I am trying to generalize it to send and receive data that requires some simple interpretation at the end points. I think the example I was looking for doesn't exists around, that is, how to simulate Irecv using only Grequest and MPI_Recv. I still hope that is possible.Lorentz
Thats an interesting problem :-). I face it a couple of times, usually my solution was to pack the data into a continuous array of char. Then adding to the class a encoder/decoder method. Its a bit crappy but works fine. Might be quite tricky to do it, but would be useful. If you solve it I am interested in it :-).Bathtub
If you have a way to identify messages that need some post processing (based on tag and/or type and/or communicator for example), then a simple option is to wrap MPI_Wait() and friends and do the post processing there. you can also wrap MPI_Send() and friends if some pre-processing is needed.Panier
@DavidDaverio, there is yet another problem that is that if the size of the message cannot be known in advance there is not way to make the communication completely asynchronous because a small message has to be exchanged initially with the message size.Lorentz
Hi Had the same issue about sizes of messages for an output server. You do not need to know send another message, in MPI the size of the message is encapsulated with the message metadata. You can use MPI_Iprobe, this return the status of a given message with a given tag if existing in the queue. Then MPI_Get_count.Bathtub
Then a funny fact that most people ignores. In MPI_Recv (and MPI_Irecv) the size does not need to match the size of the message, funny no? In fact the size must be at least the size of the message, but if bigger it should not crash. This allow to use recv with a maximal size of message. But there is some issues with doing so, which means its better to prob and get the size from the status :-).Bathtub
Oh, and myself I usual use some Iprobe coupled to Revc. You test in a non-blocking way if there is a message then you get in a blocking way the message. That way you mimic a bit of non-blockingness. meaning that you only call recv when you have a message to get. Then it is usefull when your computation use all threads. If you have a thread reserved for MPI (what a waist!) then Irecv is better.Bathtub
@DavidDaverio, ok, thanks for the insight. I agree with all that you say, but at the end they all partial solutions. It is not that you can run Iprobe(...); Irecv(..., size); because you have to wait for Iprobe to pass the argument to Irecv. I think I found a solution but by means of Grequest. The problem is that, as you say, I have to generate a thread with the norm probe and receive. This thread is not even from MPI. Which shows to things 1) Threads are wasted and 2) Irecv is useless in the first place because it can be easily implemented by std::async or pthreads.Lorentz
Agreed :-). I would be interested to see your solution with Grequest :-).Bathtub

© 2022 - 2024 — McMap. All rights reserved.