MPI python-Open-MPI
Asked Answered
L

1

9

I have 20,000*515 numpy matrix,representing biological datas. I need to find the correlation coefficient of the biological data,meaning as a result I would have a 20,000*20,000 matrix with correlation value. Then I populate a numpy array with 1's and 0's if each correlation coefficient is greater than a threshold value.

I used numpy.corrcoef to find the correlation coefficient and it works well in my machine.

Then I wanted to put in the cluster(having 10 computers and node varying from 2 to 8). when I tried putting it in the cluster, each node generating (40)random numbers and getting those 40 random columns from the biological data resulting in 20,000*40 matrix, I ran into memory issue saying.

mpirun noticed that process rank # with PID # on node name exited on signal 9 (Killed).

Then I tried rewriting the program like getting each rows finding the correlation coefficient and if the value is greater than the threshold then store 1 in the matrix or else 0 rather than creating a correlation matrix. But it takes 1.30 hrs to run this program.I need to run it 100 times.

Can anyone please suggest a better way of doing this, like how to solve the memory issue by allocating jobs once each core has finished it's job. I am new to MPI. Below is my code.

If you need any more information please let me know. Thank you

    import numpy
    from mpi4py import MPI
    import time


    Size=MPI.COMM_WORLD.Get_size();
    Rank=MPI.COMM_WORLD.Get_rank();
    Name=MPI.Get_processor_name();

    RandomNumbers={};





    rndm_indx=numpy.random.choice(range(515),40,replace=False)
    rndm_indx=numpy.sort(rndm_indx)


    Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);




    RandomNumbers[Rank]=rndm_indx;
    CORR_CR=numpy.zeros((Data.shape[0],Data.shape[0]));


    start=time.time();
    for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-np.mean(Data[i]);
        alpha1=1./np.linalg.norm(Data[i]);
        for j in range(i,Data.shape[0]):
           if(i==j):
               CORR_CR[i][j]=1;
           else:
               Data[j]=Data[j]-np.mean(Data[j]);
               alpha2=1./np.linalg.norm(Data[j]);
               corr=np.inner(Data[i],Data[j])*(alpha1*alpha2);
               corr=int(np.absolute(corrcoef)>=0.9)
               CORR_CR[i][j]=CORR_CR[j][i]=corr
    end=time.time();           

    CORR_CR=CORR_CR-numpy.eye(CORR_CR.shape[0]);  


    elapsed=(end-start)
    print('Total Time',elapsed)
Lashonda answered 1/11, 2016 at 15:21 Comment(6)
Welcome to Stackoverflow! It's a very interesting problem. In MPI, it's as if all the code were executed by all processes by default. As-is, if you run mpirun -np 10 python main.py, the file will be read 10 times, the correlation matrix will be computed 10 times and the total time will be printed 10 times. To overcome this difficulty and reduce the memory footprint, you will need to allot the computations to the processes, use MPI functions to communicate the required data to the processes needing it... See mpi4py.scipy.org/docs/usrman/tutorial.html about these functions.Liew
Thank you francis. I will look into MPI commands meantime do yo have a better idea of finding the correlation coeffiecient so as to reduce the execution time . Now it is taking almost 2 hrs.Lashonda
The operation Data1 - numpy.mean(Data1), diff_Data2 = Data2 - numpy.mean(Data2), do not need to be performed at each correlation point: you can replace Data[i]=Data[i]- numpy.mean(Data[i]) once for all, before any call to corr_coef. Moreover, storing alpha[:]=1./numpy.linalg.norm(Data[i]) is light and would avoid any call to numpy.linalg.norm() during corr_coef(). In the end, corr_coef(i,j) would just write numpy.inner(Data[i], Data[j])*alpha[i]*alpha[j].Liew
There might be a way to use nditer instead of the for loops... It often prooves faster than for loops for numpy arrays. See these examples, in particular those using the flag multi_index.Liew
Lastly, if high correlation is rare, using a sparce matrix will drastically reduce your memory requirements. See scipy.sparse.csr_matrix and #15897088 for instance.Liew
Thank you Francis. I did update my code but still uses the for loop. I am new to python let me try using nditer. Thanks againLashonda
L
11

The execution time of the program you posted is about 96s on my computer. Let's optimize a couple of things before exploring parallel computations.

  • Let's store the norms of the vectors to avoid computing it each time the norm is needed. Getting alpha1=1./numpy.linalg.norm(Data[i]); out of the second loop is a good starting point. Since the vectors do not change during computations, their norms can be computed in advance:

    alpha=numpy.zeros(Data.shape[0])
    for i in range(0,Data.shape[0]):
      Data[i]=Data[i]-numpy.mean(Data[i])
      alpha[i]=1./numpy.linalg.norm(Data[i])
    
    for i in range(0,Data.shape[0]):
      for j in range(i,Data.shape[0]):
        if(i==j):
           CORR_CR[i][j]=1;
        else:
           corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]);
           corr=int(numpy.absolute(corr)>=0.9)
           CORR_CR[i][j]=CORR_CR[j][i]=corr
    

The computation time is already down to 17s.

  • Assuming that the vectors are not highly correlated, most of the correlation coefficients will be rounded to zero. Hence, the matrix is likely to be sparce (full of zeros). Let's use the scipy.sparse.coo_matrix sparce matrix format, which is very easy to populate: the non-null items and their coordinates i,j are to be stored in lists.

    data=[]
    ii=[]
    jj=[]
    ...
      if(corr!=0):
               data.append(corr)
               ii.append(i)
               jj.append(j)
               data.append(corr)
               ii.append(j)
               jj.append(i)
    ...
    CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))
    

The computation time is down to 13s (negligeable improvement ?) and the memory footprint is greatly reduced. It would be a major improvement if larger datasets were to be considered.

  • for loops in python are pretty unefficient. See for loop in python is 10x slower than matlab for instance. But there are plenty of ways around, such as using vectorized operations or optimized iterators such as those provided by numpy.nditer. One of the reasons for for loops being unefficient is that python is a interpreted language: not compilation occurs in the process. Hence, to overcome this problem, the most tricky part of the code can be compiled by using a tool like Cython.

    1. The critical part of the code are written in Cython, in a dedicated file correlator.pyx.

    2. This file is turned into a correlator.c file by Cython

    3. This file is compiled by your favorite c compiler gcc to build a shared library correlator.so

    4. The optimized function can be used in your program after import correlator.

The content of correlator.pyx, starting from Numpy vs Cython speed , looks like:

import numpy

cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):

    cdef unsigned int rows = array.shape[0]
    cdef  int cols = array.shape[1]
    cdef unsigned int row, row2
    cdef  int one=1
    ii=[]
    jj=[]
    data=[]

    for row in range(imin,imax):
        for row2 in range(row,rows):
            if row==row2:
               data.append(0)
               ii.append(row)
               jj.append(row2)
            else:
                corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
                corr=int(numpy.absolute(corr)>=0.9)
                if(corr!=0):
                    data.append(corr)
                    ii.append(row)
                    jj.append(row2)

                    data.append(corr)
                    ii.append(row2)
                    jj.append(row)

    return ii,jj,data

where scipy.linalg.cython_blas.ddot() will perform the inner product.

To cythonize and compile the .pyx file, the following makefile can be used (I hope you are using Linux...)

all: correlator correlatorb


correlator: correlator.pyx
    cython -a correlator.pyx

correlatorb: correlator.c
    gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c

The new correlation function is called in the main python file by:

import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])

By using a compiled loop, the time is down to 5.4s! It's already ten times faster. Moreover, this computations are performed on a single process!

Let's introduce parallel computations. Please notice that two arguments are added to the function process : imin and imax. These arguments signals the range of rows of CORR_CR that are computed. It is performed so as to anticipate the use of parallel computation. Indeed, a straightforward way to parallelize the program is to split the outer for loop (index i) to the different processes.

Each processes will perform the outer for loop for a particular range of the index i which is computed so as to balance the workload between the processes.

The program has to complete the following operations:

  1. Process 0 ("root process") reads the vectors Data in the file.
  2. The Data is broadcast to all processes by using the MPI function bcast().
  3. The range of indexes i to be considered by each process is computed.
  4. The correlation coefficient are computed by each process for the corresponding indexes. The non-null terms of the matrix are stored in lists data,ii,jj on each process.
  5. These lists are gathered on the root process by calling the MPI function gather(). It produces three lists of Size lists which are concatenated to get 3 lists required to create the sparce adjacency matrix.

Here goes the code:

import numpy
from mpi4py import MPI
import time
import scipy.sparse

import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)

Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();

#a dummy set of data is created here. 
#Samples such that (i-j)%10==0 are highly correlated.
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)

Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
    #Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
    Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
    lin=numpy.linspace(0.,1.,515)
    for i in range(Data.shape[0]):
         Data[i]+=numpy.sin((1+i%10)*10*lin)*100

start=time.time();

#braodcasting the matrix
Data=MPI.COMM_WORLD.bcast(Data, root=0)

RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]

#an array to store the inverse of the norm of each sample
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-numpy.mean(Data[i])
        if numpy.linalg.norm(Data[i])==0:
            print "process "+str(Rank)+" is facing a big problem"
        else:
            alpha[i]=1./numpy.linalg.norm(Data[i])


#balancing the computational load between the processes. 
#Each process must perform about Data.shape[0]*Data.shape[0]/(2*Size) correlation.
#each process cares for a set of rows. 
#Of course, the last rank must care about more rows than the first one.

ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
    nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
    icurr=0
    for i in range(Size):
        nbjob=0
        while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
            nbjob+=(Data.shape[0]-icurr)
            icurr+=1
        ilimits[i+1]=icurr
    ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)          

#the "local" job has been cythonized in file main2.pyx
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])

#gathering the matrix inputs from every processes on the root process.
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)

if Rank==0:
   #concatenate the lists
   data2=sum(data,[])
   ii2=sum(ii,[])
   jj2=sum(jj,[])
   #create the adjency matrix
   CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))

   print CORR_CR

end=time.time();           

elapsed=(end-start)
print('Total Time',elapsed)

By running mpirun -np 4 main.py, the computation time is 3.4s. It's not 4 time faster... This is likely due to the fact that the bottleneck of the computation is the computations of scalar products, which requires a large memory bandwidth. And my personnal computer is really limited regarding the memory bandwidth...

Last comment: there are plenty of possibilities for improvements. - The vectors in Data are copied to every processes... This affects the memory required by the program. Dispatching the computation in a different way and trading memory against communications could overcome this problem... - Each process still computes the norms of all the vectors...It could be improved by having each process computing the norms of some vector and using the MPI function allreduce() on alpha...

What to do with this adjacency matrix ?

I think you already know the answer to this question, but you can provide this adjacency matrix to sparse.csgraph routines such as connected_components() or laplacian(). Indeed, you are not very far from spectral clustering!

For instance, if the clusters are obvious, using connected_components() is sufficient:

if Rank==0:
    # coo to csr format
    S=CORR_CR.tocsr()
    print S
    n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)

    print "number of different famillies "+str(n_components)

    numpy.set_printoptions(threshold=numpy.nan)
    print labels
Liew answered 3/11, 2016 at 19:59 Comment(5)
Thank you Francis. I really appreciate it.Lashonda
Where can I find more about scipy.linalg.cython_blas.ddot()? Thank you.Lashonda
The documentation of scipy.linalg.cython_blas.ddot()is not very detailed... It just states that scipy.linalg.cython_blas.ddot is a Raw function pointers (Fortran-style pointer arguments) corresponding to the function ddot() of BLAS.Liew
Moreover, for 1D arrays, numpy.inner() is similar to numpy.dot(). #11034073 Lastly, numpy.dot() often dispatches to BLAS's ddot() #19840039Liew
Thankyou for such a detailed and well articulated answer! This has greatly helped me in optimising my code as well.Strephonn

© 2022 - 2024 — McMap. All rights reserved.