The execution time of the program you posted is about 96s on my computer. Let's optimize a couple of things before exploring parallel computations.
Let's store the norms of the vectors to avoid computing it each time the norm is needed. Getting alpha1=1./numpy.linalg.norm(Data[i]);
out of the second loop is a good starting point. Since the vectors do not change during computations, their norms can be computed in advance:
alpha=numpy.zeros(Data.shape[0])
for i in range(0,Data.shape[0]):
Data[i]=Data[i]-numpy.mean(Data[i])
alpha[i]=1./numpy.linalg.norm(Data[i])
for i in range(0,Data.shape[0]):
for j in range(i,Data.shape[0]):
if(i==j):
CORR_CR[i][j]=1;
else:
corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]);
corr=int(numpy.absolute(corr)>=0.9)
CORR_CR[i][j]=CORR_CR[j][i]=corr
The computation time is already down to 17s.
Assuming that the vectors are not highly correlated, most of the correlation coefficients will be rounded to zero. Hence, the matrix is likely to be sparce (full of zeros). Let's use the scipy.sparse.coo_matrix
sparce matrix format, which is very easy to populate: the non-null items and their coordinates i,j are to be stored in lists.
data=[]
ii=[]
jj=[]
...
if(corr!=0):
data.append(corr)
ii.append(i)
jj.append(j)
data.append(corr)
ii.append(j)
jj.append(i)
...
CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))
The computation time is down to 13s (negligeable improvement ?) and the memory footprint is greatly reduced. It would be a major improvement if larger datasets were to be considered.
for
loops in python are pretty unefficient. See for loop in python is 10x slower than matlab for instance. But there are plenty of ways around, such as using vectorized operations or optimized iterators such as those provided by numpy.nditer
. One of the reasons for for
loops being unefficient is that python is a interpreted language: not compilation occurs in the process. Hence, to overcome this problem, the most tricky part of the code can be compiled by using a tool like Cython
.
The critical part of the code are written in Cython, in a dedicated file correlator.pyx
.
This file is turned into a correlator.c
file by Cython
This file is compiled by your favorite c compiler gcc
to build a shared library correlator.so
The optimized function can be used in your program after import correlator
.
The content of correlator.pyx
, starting from Numpy vs Cython speed , looks like:
import numpy
cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):
cdef unsigned int rows = array.shape[0]
cdef int cols = array.shape[1]
cdef unsigned int row, row2
cdef int one=1
ii=[]
jj=[]
data=[]
for row in range(imin,imax):
for row2 in range(row,rows):
if row==row2:
data.append(0)
ii.append(row)
jj.append(row2)
else:
corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
corr=int(numpy.absolute(corr)>=0.9)
if(corr!=0):
data.append(corr)
ii.append(row)
jj.append(row2)
data.append(corr)
ii.append(row2)
jj.append(row)
return ii,jj,data
where scipy.linalg.cython_blas.ddot()
will perform the inner product.
To cythonize and compile the .pyx file, the following makefile can be used (I hope you are using Linux...)
all: correlator correlatorb
correlator: correlator.pyx
cython -a correlator.pyx
correlatorb: correlator.c
gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c
The new correlation function is called in the main python file by:
import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])
By using a compiled loop, the time is down to 5.4s! It's already ten times faster. Moreover, this computations are performed on a single process!
Let's introduce parallel computations.
Please notice that two arguments are added to the function process
: imin
and imax
. These arguments signals the range of rows of CORR_CR
that are computed. It is performed so as to anticipate the use of parallel computation. Indeed, a straightforward way to parallelize the program is to split the outer for
loop (index i
) to the different processes.
Each processes will perform the outer for loop for a particular range of the index i
which is computed so as to balance the workload between the processes.
The program has to complete the following operations:
- Process 0 ("root process") reads the vectors
Data
in the file.
- The
Data
is broadcast to all processes by using the MPI function bcast()
.
- The range of indexes
i
to be considered by each process is computed.
- The correlation coefficient are computed by each process for the corresponding indexes. The non-null terms of the matrix are stored in lists
data
,ii
,jj
on each process.
- These lists are gathered on the root process by calling the MPI function
gather()
. It produces three lists of Size
lists which are concatenated to get 3 lists required to create the sparce adjacency matrix.
Here goes the code:
import numpy
from mpi4py import MPI
import time
import scipy.sparse
import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)
Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();
#a dummy set of data is created here.
#Samples such that (i-j)%10==0 are highly correlated.
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)
Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
#Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
lin=numpy.linspace(0.,1.,515)
for i in range(Data.shape[0]):
Data[i]+=numpy.sin((1+i%10)*10*lin)*100
start=time.time();
#braodcasting the matrix
Data=MPI.COMM_WORLD.bcast(Data, root=0)
RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]
#an array to store the inverse of the norm of each sample
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
Data[i]=Data[i]-numpy.mean(Data[i])
if numpy.linalg.norm(Data[i])==0:
print "process "+str(Rank)+" is facing a big problem"
else:
alpha[i]=1./numpy.linalg.norm(Data[i])
#balancing the computational load between the processes.
#Each process must perform about Data.shape[0]*Data.shape[0]/(2*Size) correlation.
#each process cares for a set of rows.
#Of course, the last rank must care about more rows than the first one.
ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
icurr=0
for i in range(Size):
nbjob=0
while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
nbjob+=(Data.shape[0]-icurr)
icurr+=1
ilimits[i+1]=icurr
ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)
#the "local" job has been cythonized in file main2.pyx
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])
#gathering the matrix inputs from every processes on the root process.
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)
if Rank==0:
#concatenate the lists
data2=sum(data,[])
ii2=sum(ii,[])
jj2=sum(jj,[])
#create the adjency matrix
CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))
print CORR_CR
end=time.time();
elapsed=(end-start)
print('Total Time',elapsed)
By running mpirun -np 4 main.py
, the computation time is 3.4s. It's not 4 time faster... This is likely due to the fact that the bottleneck of the computation is the computations of scalar products, which requires a large memory bandwidth. And my personnal computer is really limited regarding the memory bandwidth...
Last comment: there are plenty of possibilities for improvements.
- The vectors in Data
are copied to every processes... This affects the memory required by the program. Dispatching the computation in a different way and trading memory against communications could overcome this problem...
- Each process still computes the norms of all the vectors...It could be improved by having each process computing the norms of some vector and using the MPI function allreduce()
on alpha
...
What to do with this adjacency matrix ?
I think you already know the answer to this question, but you can provide this adjacency matrix to sparse.csgraph
routines such as connected_components()
or laplacian()
. Indeed, you are not very far from spectral clustering!
For instance, if the clusters are obvious, using connected_components()
is sufficient:
if Rank==0:
# coo to csr format
S=CORR_CR.tocsr()
print S
n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)
print "number of different famillies "+str(n_components)
numpy.set_printoptions(threshold=numpy.nan)
print labels
mpirun -np 10 python main.py
, the file will be read 10 times, the correlation matrix will be computed 10 times and the total time will be printed 10 times. To overcome this difficulty and reduce the memory footprint, you will need to allot the computations to the processes, use MPI functions to communicate the required data to the processes needing it... See mpi4py.scipy.org/docs/usrman/tutorial.html about these functions. – LiewData1 - numpy.mean(Data1)
,diff_Data2 = Data2 - numpy.mean(Data2)
, do not need to be performed at each correlation point: you can replaceData[i]=Data[i]- numpy.mean(Data[i])
once for all, before any call tocorr_coef
. Moreover, storingalpha[:]=1./numpy.linalg.norm(Data[i])
is light and would avoid any call tonumpy.linalg.norm()
duringcorr_coef()
. In the end,corr_coef(i,j)
would just writenumpy.inner(Data[i], Data[j])*alpha[i]*alpha[j]
. – Liewnditer
instead of the for loops... It often prooves faster than for loops for numpy arrays. See these examples, in particular those using the flagmulti_index
. – Liewscipy.sparse.csr_matrix
and #15897088 for instance. – Liew