Summing Python Objects with MPI's Allreduce
Asked Answered
P

1

5

I am using a sparse tensor array manipulation I built using dictionaries and Counters in Python. I would like to make it possible to use this array manipulation in parallel. The bottom line is that I have ended up having Counters on each node which I would like to add together using MPI.Allreduce (or another nice solution). For instance with Counters one can do this

A = Counter({a:1, b:2, c:3})
B = Counter({b:1, c:2, d:3})

such that

C = A+B = Counter({a:1, b:3, c:5, d:3}).

I would like to do this same operation but with all the relevant nodes,

MPI.Allreduce(send_counter, recv_counter, MPI.SUM)

however, MPI doesn't seem to recognize this operation on dictionaries/Counters, throwing an error expecting a buffer or a list/tuple. Is my best option a `User-Defined Operation,' or is there a way to get Allreduce to add Counters? Thanks,

EDIT (7/14/15): I have attempted to create a user operation for dictionaries but there have been some discrepancies. I wrote the following

def dict_sum(dict1, dict2, datatype):
    for key in dict2:
        try:
            dict1[key] += dict2[key]
        except KeyError:
            dict1[key] = dict2[key]

and when I told MPI about the function I did this:

dictSumOp = MPI.Op.Create(dict_sum, commute=True)

and in the code I used it as

the_result = comm.allreduce(mydict, dictSumOp)

However, it threw unsupported operand '+' for type dict. so I wrote

the_result = comm.allreduce(mydict, op=dictSumOp)

and now it throws dict1[key] += dict2[key] TypeError: 'NoneType' object has no attribute '__getitem__' so apparently it wants to know those things are dictionaries? How do I tell it they do have type dictionary?

Prefrontal answered 13/7, 2015 at 16:19 Comment(1)
Can you turn the operation into a list comprehension?Stonecutter
B
7

Neither MPI nor MPI4py knows anything about Counters in particular, so you need to create your own reduction operation for this to work; this would be the same for any other sort of python object:

#!/usr/bin/env python
from mpi4py import MPI
import collections

def addCounter(counter1, counter2, datatype):
    for item in counter2:
        counter1[item] += counter2[item]
    return counter1

if __name__=="__main__":

    comm = MPI.COMM_WORLD

    if comm.rank == 0:
        myCounter = collections.Counter({'a':1, 'b':2, 'c':3})
    else:
        myCounter = collections.Counter({'b':1, 'c':2, 'd':3})


    counterSumOp = MPI.Op.Create(addCounter, commute=True)

    totcounter = comm.allreduce(myCounter, op=counterSumOp)
    print comm.rank, totcounter

Here we've taken a function which sums two counter objects and created an MPI operator out of them with MPI.Op.Create; mpi4py will unpickle the objects, run this function to combine these items pairwise, then pickle the partial result and send it off to the next task.

Note too that we're using (lowercase) allreduce, which works on arbitrary python objects, rather than (uppercase) Allreduce, which works on numpy arrays or their moral equivalents (buffers, which map onto the Fortran/C arrays that the MPI API is designed on).

Running gives:

$ mpirun -np 2 python ./counter_reduce.py 
0 Counter({'c': 5, 'b': 3, 'd': 3, 'a': 1})
1 Counter({'c': 5, 'b': 3, 'd': 3, 'a': 1})

$ mpirun -np 4 python ./counter_reduce.py 
0 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
2 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
1 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
3 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})

And with only modest changes works with a generic dictionary:

#!/usr/bin/env python
from mpi4py import MPI

def addCounter(counter1, counter2, datatype):
    for item in counter2:
        if item in counter1:
            counter1[item] += counter2[item]
        else:
            counter1[item] = counter2[item]
    return counter1

if __name__=="__main__":

    comm = MPI.COMM_WORLD

    if comm.rank == 0:
        myDict = {'a':1, 'c':"Hello "}
    else:
        myDict = {'c':"World!", 'd':3}

    counterSumOp = MPI.Op.Create(addCounter, commute=True)

    totDict = comm.allreduce(myDict, op=counterSumOp)
    print comm.rank, totDict

Running giving

$ mpirun -np 2 python dict_reduce.py 
0 {'a': 1, 'c': 'Hello World!', 'd': 3}
1 {'a': 1, 'c': 'Hello World!', 'd': 3}
Bowsprit answered 13/7, 2015 at 16:54 Comment(6)
I can't see how `datatype' is used.Veronicaveronika
It isn't; it's part of the MPI API, but it isn't easy to use meaningfully with mpi4py's pickling of Python objects. In principle you could use it to perform different operations on different MPI data types.Bowsprit
There have been new developments, see the edit above please, thank you for your help so far.Veronicaveronika
@Prefrontal - Your update is now a whole separate question; one way or another you're not passing a dictionary (or a Counter, which is what your question was about) to the function, and we can't diagnose why that would be without seeing code. When you ran my provided example, did it work?Bowsprit
Your code works as written, but if I write totcounter = comm.allreduce(myCounter, op=counterSumOp) it throws the same NoneType error. Are you sure your code is using your created function and not the op=SUM default?Veronicaveronika
@Prefrontal - Oops, in an edit I deleted the return statement of addCounter, so that it returned None, which (of course) doesn't have a __getitem__ method, leading to what you saw. Sorry. The above works, and yes; if op isn't specified it just uses sum, so that's why it seemed to work before.Bowsprit

© 2022 - 2024 — McMap. All rights reserved.