How can one apply some function in parallel on chunks of a sparse CSR array saved on disk using Python? Sequentially this could be done e.g. by saving the CSR array with joblib.dump
opening it with joblib.load(.., mmap_mode="r")
and processing the chunks of rows one by one. Could this be done more efficiently with dask?
In particular, assuming one doesn't need all the possible out of core operations on sparse arrays, but just the ability to load row chunks in parallel (each chunk is a CSR array) and apply some function to them (in my case it would be e.g. estimator.predict(X)
from scikit-learn).
Besides, is there a file format on disk that would be suitable for this task? Joblib works but I'm not sure about the (parallel) performance of CSR arrays loaded as memory maps; spark.mllib
appears to use either some custom sparse storage format (that doesn't seem to have a pure Python parser) or LIBSVM format (the parser in scikit-learn is, in my experience, much slower than joblib.dump
)...
Note: I have read documentation, various issues about it on https://github.com/dask/dask/ but I'm still not sure how to best approach this problem.
Edit: to give a more practical example, below is the code that works in dask for dense arrays but fails when using sparse arrays with this error,
import numpy as np
import scipy.sparse
import joblib
import dask.array as da
from sklearn.utils import gen_batches
np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')
fh = joblib.load('X_dense.pkl', mmap_mode='r')
# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))
# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()
Edit2: following the discussion below, the example below overcomes the previous error but gets ones about IndexError: tuple index out of range
in dask/array/core.py:L3413
,
import dask
# +imports from the example above
dask.set_options(get=dask.get) # disable multiprocessing
fh = joblib.load('X_csr.pkl', mmap_mode='r')
def func(x):
if x.ndim == 0:
# dask does some heuristics with dummy data, if the x is a 0d array
# the sum command would fail
return x
res = np.asarray(x.sum(axis=1, keepdims=True))
return res
Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()
NumpyPickler
(github.com/joblib/joblib/blob/… ) that stores everything in a single blob. For sparse CSR arrays, I think, this should be fairly equivalent to applyingnp.save
toX.data
,X.indices
andX.indptr
arrays. In fact, previous versions of joblib.dump resulted in one file per numpy array. The advantage is thatjoblib.load("<sparse array pickled file>", mmap_mode="r")[slice, :]
already loads only a single chunk of the array.. – Abescipy
has asparse.savenz
. Forcsr
format it usesnp.savez
to savedict(data=matrix.data, indices=matrix.indices, indptr=matrix.indptr)
. That is, the key attributes of the matrix are saved to separatezip
archive files. A 'chunked' load will have to read from all 3 arrays. – Maloynp.savez
could work too. My point though is that I'm still not sure how to make a memmap of a sparse array work with dask (see edited example above), and I'm looking for some suggestions, on the best approach to make it work. In particular, @Llovera would dask.pydata.org/en/latest/array-sparse.html be relevant in this use case? – Abefh[...].sum()
,axis 1 is out of bounds for array of dimension 0
suggests that something, maybefh
is 0d array, possibly an object array wrapper around a sparse matrix. I think you need to examinefh
before trying use it in calculations. – Maloymemmap of a sparse array
would look like. – Maloyfh
is ascipy.sparse.csr.csr_matrix
werefh.data
,fh.indices
andfh.indptr
are anp.memmap
.If you prefer not to use joblib, I imagine, it's somewhat similar to take a csr array, save it withnp.savez
, then load it withnp.load(..., memmap='r')
... Well fh is not a 0d array, maybe it has something to do with not satisfying requirements in dask.pydata.org/en/latest/array-sparse.html#requirements (e.g. noconcatenate
function inscipy.sparse
)... – Abevstack
andhstack
but they are very different from the numpy versions. They build a new matrix usingcoo
attributes. – Maloynp.load('test.npz',mmap_mode='r')
does not raise an error, but themmap_mode
value is ignored when creating theNpzFile
object, and thus doesn't do anything. – Maloy