How to gather arrays of unequal length with mpi4py
Asked Answered
V

1

6

Desired behavior:

I am trying to take a number of lists of varied lengths on different nodes, collect them together in one node, and have that master node place them in a set. This list is named rout_array in each node. Note that the elements in rout_array are only integers, and non-unique across nodes.

Error:

Traceback (most recent call last):
  File "prout.py", line 160, in <module>
    main()
  File "prout.py", line 153, in main
    num = DetermineRoutingNumber(steps, goal, vertexSetSize)
  File "prout.py", line 129, in DetermineRoutingNumber
    comm.Gather(send_buffer, recv_buffer, root = 0)

  File "MPI\Comm.pyx", line 589, in mpi4py.MPI.Comm.Gather (c:\projects\mpi4py\src\mpi4py.MPI.c:97806)
  File "MPI\msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (c:\projects\mpi4py\src\mpi4py.MPI.c:34678)
  File "MPI\msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (c:\projects\mpi4py\src\mpi4py.MPI.c:33938)
  File "MPI\msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (c:\projects\mpi4py\src\mpi4py.MPI.c:30349)
  File "MPI\msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (c:\projects\mpi4py\src\mpi4py.MPI.c:29448)

  KeyError: 'O'

I have no idea how I am getting a KeyError for 'O' when there are no strings in my code. All lists contains integers, the numpy arrays contain integers, and the only dictionary active here only has integers for keys. It should be noted that every node outputs this error.

Code:

import numpy, math
from mpi4py import MPI
from sympy.combinatorics import Permutation as Perm     

def GetEdges(size,file):
    """This function takes in a file of edges in a graph in the form 'u,v'
    without quotes, where u and v are vertices of the graph. It then
    generates a permutation that swaps those vertices, and returns these
    transpositions."""

    edgeFile = open(file, "r")
    edges = []
    for line in edgeFile:
        line = line.strip()
        line = line.split(",")
        for vertex in line:
            line[line.index(vertex)] = int(vertex)
        edges.append(Perm([line], size = size))

    edgeFile.close()
    edges.append(Perm([[size - 1]], size = size))

    return edges


def AreDisjoint(p1,p2):
    """This function determines whether or not two permutations move any
    common elements, and returns the appropriate boolean."""
    v1 = set(p1.support())
    v2 = set(p2.support())

    return len(v1 & v2) == 0


def GetMatchings(edges, maxMatching, size):
    """This function takes in a set of edges given by GetEdges(), and 
    generates all possible matchings in the given graph. It then converts
    each matching into its rank given by lexicographical order, and appends
    that rank to a set, which is then returned."""

    stepDict = {1:set(edges)}
    steps = set(edges)
    for i in range(1,maxMatching):
        temp = set()
        for p1 in stepDict[1]:
            for p2 in stepDict[i]:
                newPerm = p1 * p2
                if AreDisjoint(p1,p2) and newPerm not in steps:
                    temp.add(newPerm)
                    steps.add(newPerm)

        stepDict[i+1] = temp

    newSteps = set()
    for step in steps:
        newSteps.add(step.rank())
    return newSteps


def FromRank(rank,level):
    """This function takes in a rank and size of a permutation, then returns
    the permutation that lies at the rank according to lexicographical 
    ordering. """

    lst = list(range(level + 1))
    perm = []
    while lst:
        fact = math.factorial(len(lst) - 1)
        index, rank = divmod(rank, fact)
        perm.append(lst.pop(index))
    assert rank == 0 
    return perm


def SplitArrayBetweenNodes(rank, rem, length):
    """This function takes in the rank of a node and any remainder after
    dividing up an array between all the nodes. It then returns a starting
    and ending partition index unique to each node."""
    if rem != 0:
        if rank in list(range(rem)):
            if rank == 0:
                part_start = 0
                part_end = length
            else:
                part_start = rank * (length + 1)
                part_end = part_start + length
        else:
            part_start = rank * length + rem
            part_end = part_start + length - 1
    else:
        part_start = rank * length
        part_end = part_start + length - 1

    return part_start, part_end


def DetermineRoutingNumber(steps, goal, vertexSetSize):
    """This function takes in the matchings created by GetMatchings(), 
    and calculates all possible products between its own elements. It then
    takes all unique products, and calculates all possible prducts between
    the matching set and the previous output. This repeats until all 
    permutations of a given type are found. The level at which this occurs
    is then returned."""

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    length = len(steps)
    rem = length % size
    part_len = length // size

    part_start, part_end = SplitArrayBetweenNodes(rank,rem, part_len)

    permDict = {1: steps}
    i = 1
    while True:
        rout_array = set()
        work_array = set(list(permDict[i])[part_start:part_end + 1])

        #Calculate all possible products    
        for p1 in permDict[1]:
            for p2 in work_array:
                p2_perm = Perm(FromRank(p2,vertexSetSize - 1))
                p1_perm = Perm(FromRank(p1,vertexSetSize - 1))
                new = p2_perm * p1_perm

                if new(0) == 0 or new(0) == 1:
                    order = new.rank()
                    rout_array.add(order)

        #All nodes send their work to master node
        comm.Barrier()

        send_buffer = numpy.array(rout_array)
        sendcounts = numpy.array(comm.gather(len(rout_array), root = 0))

        if rank == 0:
            recv_buffer = numpy.empty(sum(sendcounts), dtype = int)
        else:
            recv_buffer = None

        comm.Gatherv(sendbuf = send_buffer, recvbuf = (recv_buffer, sendcounts), root = 0) 

        #Generate input for next level of the loop, and weed out repeats.
        permDict[i+1] = rout_array
        for j in range(1,i+1):
            permDict[i+1] = permDict[i+1] - permDict[j]


def main():
    file = "EdgesQ2.txt"
    maxMatching = 2
    vertexSetSize = 4

    edges = GetEdges(vertexSetSize, file)
    steps = GetMatchings(edges, maxMatching, vertexSetSize)
    goal = 2 * math.factorial(vertexSetSize-1)

    num = DetermineRoutingNumber(steps, goal, vertexSetSize)
    print(num)


main()

Test Cases:

EdgesQ2.txt:

Note that maxMatching = 2 and vertexSetSize = 4 in this example. Output should be 3.

0,1
1,2
2,3
0,3

EdgesQ3.txt:

Note that maxMatching = 4 and vertexSetSize = 8 in this example. Output should be 4.

0,1
0,3
0,4
1,2
1,5
2,3
2,6
3,7
4,5
4,7
5,6
6,7
Vasomotor answered 23/6, 2016 at 19:8 Comment(1)
Welcome to SO. I took the liberty to answer your underlying problem as opposed to the KeyError. Please keep in mind that in general it is necessary to post a minimal reproducible example to get debugging help. Your code is close, but rout_array, permDict, part_len are missing to be able to reproduce the issue.Earthling
E
8

If your lengths are different across processes, you need to use the vector variant Gatherv. With that function you provide an array containing the various lengths (recvcounts).

Unfortunately the mpi4py documentation does currently not describe how to use Gatherv or any other of the vector variants. Here is a simple example:

#!/usr/bin/env python3

import numpy as np
from mpi4py import MPI
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
root = 0

local_array = [rank] * random.randint(2, 5)
print("rank: {}, local_array: {}".format(rank, local_array))

sendbuf = np.array(local_array)

# Collect local array sizes using the high-level mpi4py gather
sendcounts = np.array(comm.gather(len(sendbuf), root))

if rank == root:
    print("sendcounts: {}, total: {}".format(sendcounts, sum(sendcounts)))
    recvbuf = np.empty(sum(sendcounts), dtype=int)
else:
    recvbuf = None

comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=root)
if rank == root:
    print("Gathered array: {}".format(recvbuf))

As you can see, mpi4py does not take the sendcounts or recvcounts as extra parameters, but as tuple/list from of the recvbuf parameter. If you pass (recvbuf, sendcounts) it will derive the type from recvbuf. The displacements/offsets will be made such that data from all ranks are stored contiguous and ordered by rank.

Basically mpi4py wildly guesses what you might have meant with various forms of the recvbuf parameter. The complete and unambiguous form is (buffer, counts, displacements, type).

Edit Regarding the KeyError:

The rather confusingly named rout_array is a set, which is not a valid input to numpy.array. A set neither is a sequence nor has an array interface. Unfortunately, instead of failing, numpy.array creates a very odd ndarray object with no dimensions. You can wrap the array creation in a list:

send_buffer = numpy.array(list(rout_array))

The collective works, but the loop does not terminate which is not surprising considering there is no return or break in the while true loop in DetermineRoutingNumber.

Earthling answered 24/6, 2016 at 7:59 Comment(2)
I went ahead and implemented .Gatherv() into my code, and at first got the error len() of unsized object which pointed to len(sendbuf). Unfortunately, after changing this to len(rout_array), I now get KeyError: 'O'.Vasomotor
Okay, I pared down the code as much as possible to still give a working example, and added two test cases. Let me know if you need anything else.Vasomotor

© 2022 - 2024 — McMap. All rights reserved.