I have a CSV file containing feature values for items: each row is a triple (id_item, id_feature, value) representing the value of a specific feature for a specific item. The data is very sparse.
I need to compute two item distance matrixes, one using Pearson correlation as metric and the other using the Jaccard index.
At the moment I implemented an in-memory solution and I do something like this:
import numpy as np
from numpy import genfromtxt
from scipy.sparse import coo_matrix
from scipy.sparse import csr_matrix
from scipy.stats.stats import pearsonr
import sklearn.metrics.pairwise
import scipy.spatial.distance as ds
import scipy.sparse as sp
# read the data
my_data = genfromtxt('file.csv', delimiter=',')
i,j,value=my_data.T
# create a sparse matrix
m=coo_matrix( (value,(i,j)) )
# convert in a numpy array
m = np.array(m.todense())
# create the distance matrix using pdist
d = ds.pdist(m.T, 'correlation')
d= ds.squareform(d)
it works well and it's pretty fast but it is not scalable horizontally. I would like to be able to increase the performances just by adding nodes to a cluster and that everything could work even in a big data scenario, again just by adding nodes. I don't care if the process takes hours; distances need to be updated once a day.
What's the best approach?
1) Sklearn pairwise_distances has a n_jobs parameter that allows to take advantage of parallel computing (http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.pairwise_distances.html) but as far as I know it supports multiple cores on the same machine and not cluster computing. This is a related question Easy way to use parallel options of scikit-learn functions on HPC but I didn't get what is the best solution in my specific case and if Joblib actually has issues.
Also, the part that reads in memory the CSV would still be a bottleneck: I can store the CSV in HDFS and read it doing something like:
import subprocess
cat = subprocess.Popen(["hadoop", "fs", "-cat", "data.csv"], stdout=subprocess.PIPE)
and then loop through cat.stdout:
for line in cat.stdout:
....
but I am not sure it is a good solution.
2) Store data in HDFS, implement computation in a map reduce fashion and run the job via mrjob
3) Store data in HDFS, implement the computation in a SQL-like fashion (I don't know if it is easy and feasible, I have to think about it) and run it using PyHive
Of course I would like to keep as much as possible the current code, so a variant of the solution 1) is the best one for me.