Out-of-core processing of sparse CSR arrays
Asked Answered
A

1

43

How can one apply some function in parallel on chunks of a sparse CSR array saved on disk using Python? Sequentially this could be done e.g. by saving the CSR array with joblib.dump opening it with joblib.load(.., mmap_mode="r") and processing the chunks of rows one by one. Could this be done more efficiently with dask?

In particular, assuming one doesn't need all the possible out of core operations on sparse arrays, but just the ability to load row chunks in parallel (each chunk is a CSR array) and apply some function to them (in my case it would be e.g. estimator.predict(X) from scikit-learn).

Besides, is there a file format on disk that would be suitable for this task? Joblib works but I'm not sure about the (parallel) performance of CSR arrays loaded as memory maps; spark.mllib appears to use either some custom sparse storage format (that doesn't seem to have a pure Python parser) or LIBSVM format (the parser in scikit-learn is, in my experience, much slower than joblib.dump)...

Note: I have read documentation, various issues about it on https://github.com/dask/dask/ but I'm still not sure how to best approach this problem.

Edit: to give a more practical example, below is the code that works in dask for dense arrays but fails when using sparse arrays with this error,

import numpy as np
import scipy.sparse

import joblib
import dask.array as da
from sklearn.utils import gen_batches

np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')

fh = joblib.load('X_dense.pkl', mmap_mode='r')

# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))

# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()

Edit2: following the discussion below, the example below overcomes the previous error but gets ones about IndexError: tuple index out of range in dask/array/core.py:L3413,

import dask
# +imports from the example above
dask.set_options(get=dask.get)  # disable multiprocessing

fh = joblib.load('X_csr.pkl', mmap_mode='r')

def func(x):
    if x.ndim == 0:
        # dask does some heuristics with dummy data, if the x is a 0d array
        # the sum command would fail
        return x
    res = np.asarray(x.sum(axis=1, keepdims=True))
    return res

Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()
Abe answered 17/7, 2017 at 13:20 Comment(11)
It would depend on how joblib stores data on disk. I suspect that they store it as an opaque blob, in which case it would be difficult to read in chunks.Llovera
@Llovera Well yes, they have a NumpyPickler (github.com/joblib/joblib/blob/… ) that stores everything in a single blob. For sparse CSR arrays, I think, this should be fairly equivalent to applying np.save to X.data, X.indices and X.indptr arrays. In fact, previous versions of joblib.dump resulted in one file per numpy array. The advantage is that joblib.load("<sparse array pickled file>", mmap_mode="r")[slice, :] already loads only a single chunk of the array..Abe
In the latest release of scipy has a sparse.savenz. For csr format it uses np.savez to save dict(data=matrix.data, indices=matrix.indices, indptr=matrix.indptr). That is, the key attributes of the matrix are saved to separate zip archive files. A 'chunked' load will have to read from all 3 arrays.Maloy
@Maloy Thanks, np.savez could work too. My point though is that I'm still not sure how to make a memmap of a sparse array work with dask (see edited example above), and I'm looking for some suggestions, on the best approach to make it work. In particular, @Llovera would dask.pydata.org/en/latest/array-sparse.html be relevant in this use case?Abe
Looks like the error occurs in the fh[...].sum(), axis 1 is out of bounds for array of dimension 0 suggests that something, maybe fh is 0d array, possibly an object array wrapper around a sparse matrix. I think you need to examine fh before trying use it in calculations.Maloy
I have no idea what a memmap of a sparse array would look like.Maloy
@Maloy fh is a scipy.sparse.csr.csr_matrix were fh.data, fh.indices and fh.indptr are a np.memmap .If you prefer not to use joblib, I imagine, it's somewhat similar to take a csr array, save it with np.savez , then load it with np.load(..., memmap='r')... Well fh is not a 0d array, maybe it has something to do with not satisfying requirements in dask.pydata.org/en/latest/array-sparse.html#requirements (e.g. no concatenate function in scipy.sparse)...Abe
Sparse has vstack and hstack but they are very different from the numpy versions. They build a new matrix using coo attributes.Maloy
Maybe you are not doing it the correct way. I would use h5py for this task. This way, the data is compressed anyways..Selfdeceit
@sdgawerzswer It's possible to hdf5 for storage instead of numpy, this doesn't really change the problem that you need to be able to handle the sparse output in dask..Abe
np.load('test.npz',mmap_mode='r') does not raise an error, but the mmap_mode value is ignored when creating the NpzFile object, and thus doesn't do anything.Maloy
E
6

So I don't know anything about joblib or dask, let alone your application specific data format. But it is actually possible to read sparse matrices from disk in chunks while retaining the sparse data structure.

While the Wikipedia article for the CSR format does a great job explaining how it works, I'll give a short recap:

Some sparse Matrix, e.g.:

1 0 2
0 0 3
4 5 6

is stored by remembering each nonzero-value and the column it resides in:

sparse.data    = 1 2 3 4 5 6  # acutal value
sparse.indices = 0 2 2 0 1 2  # number of column (0-indexed)

Now we are still missing the rows. The compressed format just stores how many non-zero values there are in each row, instead of storing every single values row.

Note that the non-zero count is also accumulated, so the following array contains the number of non-zero values up until and including this row. To complicate things even further, the array always starts with a 0 and thus contains num_rows+1 entries:

sparse.indptr = 0 2 3 6

so up until and including the second row there are 3 nonzero values, namely 1, 2 and 3.

Since we got this sorted out, we can start 'slicing' the matrix. The goal is to construct the data, indices and indptr arrays for some chunks. Assume the original huge matrix is stored in three binary files, which we will incrementally read. We use a generator to repeatedly yield some chunk.

For this we need to know how many non-zero values are in each chunk, and read the according amount of values and column-indices. The non-zero count can be conveniently read from the indptr array. This is achieved by reading some amount of entries from the huge indptr file that corresponds to the desired chunk size. The last entry of that portion of the indptr file minus the number of non-zero values before gives the number of non-zeros in that chunk. So the chunks data and indices arrays are just sliced from the big data and indices files. The indptr array needs to be prepended artificially with a zero (that's what the format wants, don't ask me :D).

Then we can just construct a sparse matrix with the chunk data, indices and indptr to get a new sparse matrix.

It has to be noted that the actual matrix size cannot be directly reconstructed from the three arrays alone. It is either the maximum column index of the matrix, or if you are unlucky and there is no data in the chunk undetermined. So we also need to pass the column count.

I probably explained things in a rather complicated way, so just read this just as opaque piece of code, that implements such a generator:

import numpy as np
import scipy.sparse


def gen_batches(batch_size, sparse_data_path, sparse_indices_path, 
                sparse_indptr_path, dtype=np.float32, column_size=None):
    data_item_size = dtype().itemsize

    with open(sparse_data_path, 'rb') as data_file, \
            open(sparse_indices_path, 'rb') as indices_file, \
            open(sparse_indptr_path, 'rb') as indptr_file:
        nnz_before = np.fromstring(indptr_file.read(4), dtype=np.int32)

        while True:
            indptr_batch = np.frombuffer(nnz_before.tobytes() +
                              indptr_file.read(4*batch_size), dtype=np.int32)

            if len(indptr_batch) == 1:
                break

            batch_indptr = indptr_batch - nnz_before
            nnz_before = indptr_batch[-1]
            batch_nnz = np.asscalar(batch_indptr[-1])

            batch_data = np.frombuffer(data_file.read(
                                       data_item_size * batch_nnz), dtype=dtype)
            batch_indices = np.frombuffer(indices_file.read(
                                          4 * batch_nnz), dtype=np.int32)

            dimensions = (len(indptr_batch)-1, column_size)

            matrix = scipy.sparse.csr_matrix((batch_data, 
                           batch_indices, batch_indptr), shape=dimensions)

            yield matrix


if __name__ == '__main__':
    sparse = scipy.sparse.random(5, 4, density=0.1, format='csr', dtype=np.float32)

    sparse.data.tofile('sparse.data')        # dtype as specified above  ^^^^^^^^^^
    sparse.indices.tofile('sparse.indices')  # dtype=int32
    sparse.indptr.tofile('sparse.indptr')    # dtype=int32

    print(sparse.toarray())
    print('========')

    for batch in gen_batches(2, 'sparse.data', 'sparse.indices', 
                             'sparse.indptr', column_size=4):
        print(batch.toarray())

the numpy.ndarray.tofile() just stores binary arrays, so you need to remember the data format. scipy.sparse represents the indices and indptr as int32, so that's a limitation for the total matrix size.

Also I benchmarked the code and found that the scipy csr matrix constructor is the bottleneck for small matrices. Your mileage might vary tho, this is just a 'proof of principle'.

If there is need for a more sophisticated implementation, or something is too obstruse, just hit me up :)

Eyehole answered 19/6, 2018 at 20:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.