I have a fairly large training matrix (over 1 billion rows, two features per row). There are two classes (0 and 1). This is too large for a single machine, but fortunately I have about 200 MPI hosts at my disposal. Each is a modest dual-core workstation.
Feature generation is already successfully distributed.
The answers in Multiprocessing scikit-learn suggest it is possible to distribute the work of a SGDClassifier:
You can distribute the data sets across cores, do partial_fit, get the weight vectors, average them, distribute them to the estimators, do partial fit again.
When I have run partial_fit for the second time on each estimator, where do I go from there to get a final aggregate estimator?
My best guess was to average the coefs and the intercepts again and make an estimator with those values. The resulting estimator gives a different result than an estimator constructed with fit() on the entire data.
Details
Each host generates a local matrix and a local vector. This is n rows of the test set and the corresponding n target values.
Each host uses the local matrix and local vector to make an SGDClassifier and do a partial fit. Each then sends the coef vector and the intercept to root. Root averages these and sends them back to the hosts. The hosts do another partial_fit and sends the coef vector and the intercept to root.
Root constructs a new estimator with these values.
local_matrix = get_local_matrix()
local_vector = get_local_vector()
estimator = linear_model.SGDClassifier()
estimator.partial_fit(local_matrix, local_vector, [0,1])
comm.send((estimator.coef_,estimator.intersept_),dest=0,tag=rank)
average_coefs = None
avg_intercept = None
comm.bcast(0,root=0)
if rank > 0:
comm.send( (estimator.coef_, estimator.intercept_ ), dest=0, tag=rank)
else:
pairs = [comm.recv(source=r, tag=r) for r in range(1,size)]
pairs.append( (estimator.coef_, estimator.intercept_) )
average_coefs = np.average([ a[0] for a in pairs ],axis=0)
avg_intercept = np.average( [ a[1][0] for a in pairs ] )
estimator.coef_ = comm.bcast(average_coefs,root=0)
estimator.intercept_ = np.array( [comm.bcast(avg_intercept,root=0)] )
estimator.partial_fit(metric_matrix, edges_exist,[0,1])
if rank > 0:
comm.send( (estimator.coef_, estimator.intercept_ ), dest=0, tag=rank)
else:
pairs = [comm.recv(source=r, tag=r) for r in range(1,size)]
pairs.append( (estimator.coef_, estimator.intercept_) )
average_coefs = np.average([ a[0] for a in pairs ],axis=0)
avg_intercept = np.average( [ a[1][0] for a in pairs ] )
estimator.coef_ = average_coefs
estimator.intercept_ = np.array( [avg_intercept] )
print("The estimator at rank 0 should now be working")