Creating a counter that stays synchronized across MPI processes
Asked Answered
W

4

6

I have quite a bit of experience using the basic comm and group MPI2 methods, and do quite a bit of embarrassingly parallel simulation work using MPI. Up until now, I have structured my code to have a dispatch node, and a bunch of worker nodes. The dispatch node has a list of parameter files that will be run with the simulator. It seeds each worker node with a parameter file. The worker nodes run their simulation, then request another parameter file, which the dispatch node provides. Once all parameter files have been run, the dispatch node shuts down each worker node, before shutting itself down.

The parameter files are typically named "Par_N.txt" where N is the identifying integer (e.g.- N = 1-1000). So I was thinking, if I could create a counter, and could have this counter synchronized across all of my nodes, I could eliminate the need to have a dispatch node, and make the system a bit more simple. As simple as this sounds in theory, in practice I suspect it is a bit more difficult, as I'd need to ensure the counter is locked while being changed, etc.. And thought there might be a built-in way for MPI to handle this. Any thoughts? Am I over thinking this?

Weisbart answered 9/2, 2011 at 18:15 Comment(3)
Could you explain what benefits you're hoping to derive from eliminating the dispatcher?Jadajadd
@aix- Sure. On some of our larger runs, I've noticed that the dispatch node gets saturated with communication (say a run with np=10k nodes). To get over this, I've started allowing multiple dispatch nodes, where each dispatch node takes a sub-group. However, this leads to more complex (i.e.- harder to maintain) code. So it is mostly an issue of trying to simplify things (if it is something that could be done simply).Weisbart
additionally, on smaller runs (say 5-10 nodes) which are done more often, it would be nice not to hand an entire node over to be a dispatch node. Our sys-admin is very much against over-loading cores, and has set the job scheduler not to allow jobs where the number of processes > number of requested cores.Weisbart
U
10

Implementing a shared counter isn't trivial, but once you do it and have it in a library somewhere you can do a lot with it.

In the Using MPI-2 book, which you should have to hand if you're going to implement this stuff, one of the examples (the code is available online) is a shared counter. The "non-scalable" one should work well out to several dozens of processes -- the counter is an array of 0..size-1 of integers, one per rank, and then the `get next work item #' operation consists of locking the window, reading everyone elses' contribution to the counter (in this case, how many items they've taken), updating your own (++), closing the window, and calculating the total. This is all done with passive one-sided operations. (The better-scaling one just uses a tree rather than a 1-d array).

So the use would be you have say rank 0 host the counter, and everyone keeps doing work units and updating the counter to get the next one until there's no more work; then you wait at a barrier or something and finalize.

Once you have something like this - using a shared value to get the next work unit available - working, then you can generalize to more sophisticated approach. So as suzterpatt suggested, everyone taking "their share" of work units at the start works great, but what to do if some finish faster than others? The usual answer now is work-stealing; everyone keeps their list of work units in a dequeue, and then when one runs out of work, it steals work units from the other end of someone elses dequeue, until there's no more work left. This is really the completely-distributed version of master-worker, where there's no more single master partitioning work. Once you have a single shared counter working, you can make mutexes from those, and from that you can implement the dequeue. But if the simple shared-counter works well enough, you may not need to go there.

Update: Ok, so here's a hacky-attempt at doing the shared counter - my version of the simple one in the MPI-2 book: seems to work, but I wouldn't say anything much stronger than that (haven't played with this stuff for a long time). There's a simple counter implementation (corresponding to the non-scaling version in the MPI-2 book) with two simple tests, one corresponding roughly to your work case; each item updates the counter to get a work item, then does the "work" (sleeps for random amount of time). At the end of each test, the counter data structure is printed out, which is the # of increments each rank has done.

#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

struct mpi_counter_t {
    MPI_Win win;
    int  hostrank ;
    int  myval;
    int *data;
    int rank, size;
};

struct mpi_counter_t *create_counter(int hostrank) {
    struct mpi_counter_t *count;

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
    count->hostrank = hostrank;
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

    if (count->rank == hostrank) {
        MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
        for (int i=0; i<count->size; i++) count->data[i] = 0;
        MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    } else {
        count->data = NULL;
        MPI_Win_create(count->data, 0, 1,
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    }
    count -> myval = 0;

    return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
    int *vals = (int *)malloc( count->size * sizeof(int) );
    int val;

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

    for (int i=0; i<count->size; i++) {

        if (i == count->rank) {
            MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
                           count->win);
        } else {
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
        }
    }

    MPI_Win_unlock(0, count->win);
    count->myval += increment;

    vals[count->rank] = count->myval;
    val = 0;
    for (int i=0; i<count->size; i++)
        val += vals[i];

    free(vals);
    return val;
}

void delete_counter(struct mpi_counter_t **count) {
    if ((*count)->rank == (*count)->hostrank) {
        MPI_Free_mem((*count)->data);
    }
    MPI_Win_free(&((*count)->win));
    free((*count));
    *count = NULL;

    return;
}

void print_counter(struct mpi_counter_t *count) {
    if (count->rank == count->hostrank) {
        for (int i=0; i<count->size; i++) {
            printf("%2d ", count->data[i]);
        }
        puts("");
    }
}

int test1() {
    struct mpi_counter_t *c;
    int rank;
    int result;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    result = increment_counter(c, 1);
    printf("%d got counter %d\n", rank, result);

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}


int test2() {
    const int WORKITEMS=50;

    struct mpi_counter_t *c;
    int rank;
    int result = 0;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srandom(rank);

    while (result < WORKITEMS) {
        result = increment_counter(c, 1);
        if (result <= WORKITEMS) {
             printf("%d working on item %d...\n", rank, result);
             sleep(random() % 10);
         } else {
             printf("%d done\n", rank);
         }
    }

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}

int main(int argc, char **argv) {

    MPI_Init(&argc, &argv);

    test1();
    test2();

    MPI_Finalize();
}
Underdrawers answered 10/2, 2011 at 18:15 Comment(1)
MPI_Fetch_and_op in MPI-3 greatly simplifies this code.Socle
C
3

I can't think of any built-in mechanism to solve that problem, you'd have to implement it manually. Judging by your comments you want to decentralize the program, in which case each process (or at least groups of processes) would have to keep their own values of the counter and keep it synchronized. This could probably be done with clever use of non-blocking sends/receives, but the semantics of those are not trivial.

Instead, I'd resolve the saturation issue by simply issuing several files at once to worker processes. This would reduce network traffic and allow you to keep your simple single dispatcher setup.

Crandall answered 9/2, 2011 at 18:45 Comment(2)
@suszterpatt- I've been thinking a bit about what you mention- given a process id, and the total number of processes, I can easily grab a "chunk" of the work to be done by each process. My concern here, however, is that the simulations have widely varying computation time (2+ orders of magnitude depending on convergence rates), and I can see a situation arising, where one single node is given a high number of long-time processes, and my load balancing would become a problem.Weisbart
@MarkD: Theoretically, that's certainly possible. However, it sounds like you're processing truly massive amounts of data, so the chances of it may not actually be that great. Still, a possible workaround could be to let your dispatcher "undispatch" files that their worker node hasn't started to process yet, and dispatch them to a currently idle worker instead. I'd still consider this approach to be simpler than implementing a shared variable.Crandall
S
0

It seems like you are using your dispatch node to do dynamic load balancing (assigning work to processors when they become available). A shared counter that doesn't require all of the processors to stop will not do that. I would recommend staying with what you have now or do what suszterpatt suggests, send batches of files out at a time.

Sorrell answered 10/2, 2011 at 2:27 Comment(0)
C
0

It's not clear if there is a need to go through the files in strict order or not. If not, why not just have each node i handle all files where N % total_workers == i--that is, cyclic distribution of work?

Cherokee answered 10/2, 2011 at 20:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.