dask bag not using all cores? alternatives?
Asked Answered
W

3

8

I have a python script which does the following: i. which takes an input file of data (usually nested JSON format) ii. passes the data line by line to another function which manipulates the data into desired format iii. and finally it writes the output into a file.

Here is my current simple python line that does this...

def manipulate(line):
    # a pure python function which transforms the data
    # ...
    return manipulated_json

for line in f:
    components.append(manipulate(ujson.loads(line)))
    write_to_csv(components)`

This works, but with the python GIL limiting it to one core on the server, it's painfully slow, especially with large amounts of data.

The amount of data I normally deal with is around 4 gigs gzip compressed but occasionally I have to process data that is hundreds of gigs gzip compressed. It is not Big Data necessarily but still cannot be processed all in memory and with Python's GIL is very slow to process.

While searching for a solution to optimize our data processing, I came across dask. While PySpark seemed to be the obvious solution to me at the time, the promises of dask and it's simplicity won me over and I decided to give it a try.

After a lot of research into dask and how to use it, I put together a very small script to replicate my current process. The script looks like this:

import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`

This works and produces the same results as my original non-dask script but it still only uses one CPU on the server. So, it didn't help at all. In fact, it's slower.

What am I doing wrong? Am I missing something? I'm still fairly new to dask so let me know if I've overlooked something or if I should be doing something different altogether.

Also, are there any alternatives to dask for using the full capacity of the server (i.e. all CPUs) for what I need to do?

Thanks,

T

Woolgathering answered 3/12, 2015 at 19:9 Comment(3)
Hmm never heard of dask, really interesting, thanks. Have you looked at the box-standard multiprocessing? It's simple(istic), but it works.Crafty
You might want to ask on the Blaze mailing list. Dask is relatively new and in flux and, from what I see, there have only been 20 StackOverflow questions about it ever, so there may not be very many people who see your question here and know enough to help.Consubstantiate
FWIW, I subscribe to this tag, so there is always someone watching it. Stackoverflow is a great place for such questions.Nisbet
N
3

The problem here is with dask.dataframe.to_csv, which forces you to single-core mode.

I recommend using dask.bag to do your reading and manipulation and then dump down to a bunch of CSV files in parallel. Dumping to many CSV files is a lot easier to coordiante than dumping to a single CSV file.

import dask.bag as bag
import json
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat()
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute()

There might also be an issue with trying to read a single GZIP file in parallel, but the above should get you started.

Nisbet answered 3/12, 2015 at 21:14 Comment(2)
Thank @MRocklin! It didn't work. Lol... But then I split the input file and into multiple chunks and it worked. Seems it only uses as many CPUs as there are number of input files. Any plans of making this functionality dynamic so you can pass in one input file and bag will split it and process it in parallel under the hood?Woolgathering
dask.bag does this now, just imperfectly. One possible issue here is that GZIP has poor support for random access.Nisbet
S
0

It seems that bags are only as parallel as the number of partitions they have.

For me, running

mybag=bag.from_filenames(filename, chunkbytes=1e7)
mybag.npartitions

yields

1746

which solved the problem and made the processing fully parallelizable.

Shoffner answered 5/1, 2016 at 12:25 Comment(0)
A
0

If you provide a glob based filename e.g. MyFiles-*.csv to the dask dataframe.to_csv() you should be able to output the dataframe to disk. It will create multiple files instead of 1 large csv file. See this thread for mre https://groups.google.com/a/continuum.io/forum/#!searchin/blaze-dev/to_csv/blaze-dev/NCQfCoOWEcI/S7fwuCfeCgAJ

MyFiles-0001.csv  
MyFiles-0002.csv 
....
Ananna answered 26/8, 2016 at 18:36 Comment(1)
ddf.to_csv('mydata/myfile-*.csv', index=False) it exports with the same speed as single file export.Jolin

© 2022 - 2024 — McMap. All rights reserved.