Compute a Pairwise Distance Matrix: is a scalable, big-data-ready approach available in Python?
Asked Answered
F

1

6

I have a CSV file containing feature values for items: each row is a triple (id_item, id_feature, value) representing the value of a specific feature for a specific item. The data is very sparse.

I need to compute two item distance matrixes, one using Pearson correlation as metric and the other using the Jaccard index.

At the moment I implemented an in-memory solution and I do something like this:

import numpy as np
from numpy import genfromtxt
from scipy.sparse import coo_matrix
from scipy.sparse import csr_matrix
from scipy.stats.stats import pearsonr
import sklearn.metrics.pairwise
import scipy.spatial.distance as ds
import scipy.sparse as sp

# read the data
my_data = genfromtxt('file.csv', delimiter=',')
i,j,value=my_data.T

# create a sparse matrix
m=coo_matrix( (value,(i,j)) )

# convert in a numpy array
m = np.array(m.todense())

# create the distance matrix using pdist
d = ds.pdist(m.T, 'correlation')

d= ds.squareform(d)

it works well and it's pretty fast but it is not scalable horizontally. I would like to be able to increase the performances just by adding nodes to a cluster and that everything could work even in a big data scenario, again just by adding nodes. I don't care if the process takes hours; distances need to be updated once a day.

What's the best approach?

1) Sklearn pairwise_distances has a n_jobs parameter that allows to take advantage of parallel computing (http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.pairwise_distances.html) but as far as I know it supports multiple cores on the same machine and not cluster computing. This is a related question Easy way to use parallel options of scikit-learn functions on HPC but I didn't get what is the best solution in my specific case and if Joblib actually has issues.

Also, the part that reads in memory the CSV would still be a bottleneck: I can store the CSV in HDFS and read it doing something like:

import subprocess
cat = subprocess.Popen(["hadoop", "fs", "-cat", "data.csv"], stdout=subprocess.PIPE)

and then loop through cat.stdout:

for line in cat.stdout:
    ....

but I am not sure it is a good solution.

2) Store data in HDFS, implement computation in a map reduce fashion and run the job via mrjob

3) Store data in HDFS, implement the computation in a SQL-like fashion (I don't know if it is easy and feasible, I have to think about it) and run it using PyHive

Of course I would like to keep as much as possible the current code, so a variant of the solution 1) is the best one for me.

Foretoken answered 14/6, 2017 at 20:8 Comment(5)
I would try Intel's Python distribution and MPI for Python. You can have a glance of it in this GoParallel issue.Twocolor
What is the size of your data.csv file (number of lines, MB ...) ?Meli
@Meli the problem is not the size of the CSV file now, but the possibility to scale in the futureForetoken
But it is important to have a visibility to answer ? What is the size for the future ? The order of magnitude is GB or TB or 100TB ... ?Meli
for sure we are talking at least about TB but I really don't knowForetoken
M
0

To prototype:

I suggest you to use Pyro4 and to implement that with divide and conquer paradigm, a master node and several slave nodes.

If you have n items you have n(n-1)/2 pairs, you use sklearn pairwise distances with maximum of jobs (n_jobs parameter) on each node.

You split your set of pairs in a tasks and execute that on b nodes and regroup result on your master node.

For production:

I advice you PySpark 2.1.1. Map Reduce becomes deprecated.

Meli answered 22/6, 2017 at 21:14 Comment(1)
As far as I've seen, there isn't a simple method in spark to just compute distances (which was my question); your answer, however, is pointing me toward the right direction so I decided to accept it anyway.Foretoken

© 2022 - 2024 — McMap. All rights reserved.