How to parallelize stochastic gradient descent?
Asked Answered
K

2

7

I have a fairly large training matrix (over 1 billion rows, two features per row). There are two classes (0 and 1). This is too large for a single machine, but fortunately I have about 200 MPI hosts at my disposal. Each is a modest dual-core workstation.

Feature generation is already successfully distributed.

The answers in Multiprocessing scikit-learn suggest it is possible to distribute the work of a SGDClassifier:

You can distribute the data sets across cores, do partial_fit, get the weight vectors, average them, distribute them to the estimators, do partial fit again.

When I have run partial_fit for the second time on each estimator, where do I go from there to get a final aggregate estimator?

My best guess was to average the coefs and the intercepts again and make an estimator with those values. The resulting estimator gives a different result than an estimator constructed with fit() on the entire data.

Details

Each host generates a local matrix and a local vector. This is n rows of the test set and the corresponding n target values.

Each host uses the local matrix and local vector to make an SGDClassifier and do a partial fit. Each then sends the coef vector and the intercept to root. Root averages these and sends them back to the hosts. The hosts do another partial_fit and sends the coef vector and the intercept to root.

Root constructs a new estimator with these values.

local_matrix = get_local_matrix()
local_vector = get_local_vector()

estimator = linear_model.SGDClassifier()
estimator.partial_fit(local_matrix, local_vector, [0,1])

comm.send((estimator.coef_,estimator.intersept_),dest=0,tag=rank)

average_coefs = None
avg_intercept = None

comm.bcast(0,root=0)
if rank > 0:
    comm.send( (estimator.coef_, estimator.intercept_ ), dest=0, tag=rank)
else:
    pairs = [comm.recv(source=r, tag=r) for r in range(1,size)]
    pairs.append( (estimator.coef_, estimator.intercept_) )
    average_coefs = np.average([ a[0] for a in pairs ],axis=0)
    avg_intercept = np.average( [ a[1][0] for a in pairs ] )

estimator.coef_ = comm.bcast(average_coefs,root=0)
estimator.intercept_ = np.array( [comm.bcast(avg_intercept,root=0)] )
estimator.partial_fit(metric_matrix, edges_exist,[0,1])

if rank > 0:
    comm.send( (estimator.coef_, estimator.intercept_ ), dest=0, tag=rank)
else:
    pairs = [comm.recv(source=r, tag=r) for r in range(1,size)]
    pairs.append( (estimator.coef_, estimator.intercept_) )
    average_coefs = np.average([ a[0] for a in pairs ],axis=0)
    avg_intercept = np.average( [ a[1][0] for a in pairs ] )

    estimator.coef_ = average_coefs
    estimator.intercept_ = np.array( [avg_intercept] )
    print("The estimator at rank 0 should now be working")
Kilmer answered 10/1, 2014 at 18:47 Comment(0)
E
10

Training a linear model on a dataset with 1e9 samples and 2 features is very likely to underfit or waste CPU / IO time in case the data is actually linearly separable. Don't waste time thinking about parallelizing such a problem with a linear model:

  • either switch to a more complex class of models (e.g. train random forests on smaller partitions of the data that fit in memory and aggregate them)

  • or either select random subsamples of your dataset of increasing and train linear models. Measure the predictive accuracy on an held out test and stop when you see diminishing returns (probably after a couple 10s of thousands of samples of the minority class).

Electrolier answered 10/1, 2014 at 21:59 Comment(1)
The general consensus seems to be to down sample my data. I guess I was just overcomplicating it. Thank you.Kilmer
L
5

What you are experiencing is normal and expected. First is the fact that using SGD means you are never going to hit an exact result. You will quickly converge towards the optimal solution (since this is a convex problem) and then hover around that area for the remainder. Different runs with the whole data set alone should produce slightly different results each time.

where do I go from there to get a final aggregate estimator?

In theory, you would just keep doing that over and over until you are happy with convergence. Completely unnecessary for what you are doing. Other systems switch to using more sophisticated methods (like L-BFGS) to converge towards the final solution now that they have a good "warm start" on the solution. However this will not get you any drastic gains in accuracy (think maybe getting a whole percentage point if you are lucky) - so don't consider it a make or break. Consider it what it is, a fine tuning.

Second is the fact that linear models don't parallellize well. Despite the claims of vowpalwabbit and some other libraries, you are not going to get linear scaling out of training a linear model in parallel. Simply averaging the intermediate results is a bad way to parallelize such a system, and sadly thats about as good as it gets for training linear models in parallel.

The fact is, you only have 2 features. You should be able to easily train far more complicated models using only a smaller subset of your data. 1 billion rows is overkill for just 2 features.

Lupitalupo answered 11/1, 2014 at 1:43 Comment(1)
Thank you very much for the detailed answer. I'm clearly overworking unnecessarily. I will use much smaller subsets.Kilmer

© 2022 - 2024 — McMap. All rights reserved.