Distribute data from `tf.data.Dataset` to multiple workers (e.g. for Horovod)
Asked Answered
I

2

10

With Horovod, you basically run N independent instances (so it is a form of between-graph replication), and they communicate via special Horovod ops (basically broadcast + reduce).

Now let's say either instance 0, or some other external instance loads your data (via tf.data.Dataset). How would you distribute the iterator.get_next() to each instance? Using Horovod broadcast would be inefficient, as you would copy all the data to all instances.

Having the dataset in every instance, and doing all the loading in there, and then using shard on the dataset would also be inefficient, as you would load the data everywhere, and then throw away (N-1)/N of it. So that's why you would also not want sharding, and instead have the dataset loading only in a single (producer/dataset worker) instance, which then distributes the batches on all the train workers.

I guess the TF MultiDeviceIterator provides some similar functionality (or basically exactly that) but I'm not sure whether that works together with Horovod, and how you would set it up?

Or maybe you can make the distribution somehow via TF workers (guide)? (Maybe that is how you would configure MultiDeviceIterator as well?)

If possible, this should be via TensorFlow operations / functions (there are many related functions which might already give me this, but I might not know about them, or have misunderstood how it works). Or maybe the answer is that TensorFlow does not provide any such functionality yet? (This would still be useful to know. Then I would implement my own solution in C++, wrapped as a TensorFlow op. But before doing so, it would be good to know whether this is really necessary.)

(Related is also this Horovod issue.)

(This question is actually a bit more generic than just Horovod, although Horovod might be a good example. You might have this problem always for distributed TensorFlow?)

(I collected an overview of all the distributed TensorFlow terminology and aspects here, mostly for clarification.)

(Related are (maybe?) also this, this, this, this or this questions.)

Isola answered 23/5, 2020 at 17:18 Comment(5)
I don't think that is possible with Horovod, and most definitely not with plain TensorFlow. What you would need is a MPI scatter operation, but Horovod only implements all-reduce, all-gather and broadcast semantics. If you store your data as TFRecord files, your best bet might be to split them into many files and read a shard from each instance, as suggested in the answer...Harts
@jdehesa As you mention this, I added a feature request about adding MPI scatter support for Horovod.Isola
TensorFlow 2.3.0 is introducing a tf.data service for distributed training. I'm not sure if it solves your problem in particular but might be worth for you to keep an eye on it.Harts
@jdehesa Oh, thanks. That looks exactly like what I'm looking for, when you use a shared job (setting job_name). Or at least almost, because it will not be distributed evenly, but on a first-come first-served basis (which is maybe ok). Interestingly, when the job is not shared, this is another solution I currently implemented already, by having each worker just using a different random seed for the dataset shuffling.Isola
Albert, take a look at fanstore. If you are interested, then I can post an answer how this works. github.com/TACC/FanStoreBoxthorn
I
6

As you said, copying the data in each instance and sharding the data for each instance would be impractical.

One solution would then be to separate the data in a data process and have each instance pull data from the data process as shown in the figure below. For example, this communication can be established using a queue.

In such a system, the data process would load the dataset, preprocess it into batches and push the batches into a queue. Each training instance would then pull batches from this queue. For example, you could pass the queue as a generator into the dataset API (see tf.data.Dataset.from_generator). Also, if batches are not produced fast enough, it is possible to create more data processes to increase the batches throughput.

Depending on your use case, the implementation specifics will vary. For more information, you can look up Networking and Interprocess communication and Multiprocessing pipes and queues.

                                                             Training        
                                                         +--------------+  ++
                                                         |              |   |
                                                    +----+  Instance 1  |   |
                                                    |    |              |   |
                                                    |    +--------------+   |
                                                    |                       |
                      Preprocessing                 |                       |
                  +--------------------+            +---->      X           |
                  |                    |            |                       |
             Load |                    | Batches    +           X           |
    Dataset+------>    Data Process    +--------->Queue                     |  N instances
                  |                    |            +           X           |  Distributed training
                  |                    |            |                       |  For example, using
                  +--------------------+            +---->      X           |  Horovod broadcast + reduce
                                                    |                       |
                                                    |        Training       |
                                                    |    +--------------+   |
                                                    |    |              |   |
                                                    +----+  Instance N  |   |
                                                         |              |   |
                                                         +--------------+  ++

For a tensorflow implementation, you could use tf.data.Dataset.shard with tf.data.TFRecordDataset.

The documentation addresses your inefficiency concern using TFRecords:

Important caveats:

  • Be sure to shard before you use any randomizing operator (such as shuffle).

  • Generally it is best if the shard operator is used early in the dataset pipeline. For example, when reading from a set of TFRecord files, shard before converting the dataset to input samples. This avoids reading every file on every worker. The following is an example of an efficient sharding strategy within a complete pipeline:

d = Dataset.list_files(pattern)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
Incessant answered 29/5, 2020 at 9:49 Comment(9)
Yes, this is basically what I want. But my question is rather: How to implement it (using TensorFlow). I already have the very initial dataset as a tf.data.Dataset (in your figure the very left dataset). The preprocessing part is quite simple in TF. But how would you organize this? I guess this would run on a dedicated TF worker (via distributed TF). But how to implement the queue (in TF)? This is my main question here. I think the Python functions are not so useful here. If you say this does not exist in TF, I would rather implement this part in C++.Isola
@Isola Thanks for your comment. I have updated the answer with more details on how to address this problem using TensorFlow.Incessant
But this code is exactly how I don't want to do it (using sharding). I want to have a single dataset pipeline and distribute it across all train workers (like I described, like you have it in your picture).Isola
@Isola You mentioned not wanting to do sharding because it "would also be inefficient, as you would load the data everywhere, and then throw away (N-1)/N of it". In the proposed solution, only the necessary data is loaded for each instance and nothing is thrown out. The shard solution is indeed different in terms of implementation but it should be equivalent in terms of results for uniform sampling.Incessant
Are you sure? So shard on a TFRecordDataset is specially implemented? Where can I see this? But anyway, my question is specifically about not doing it that way. I want to know how I can do what I asked for (and what you also outlined in your picture).Isola
Yes, the documentation of shard specifies "when reading from a set of TFRecord files, shard before converting the dataset to input samples. This avoids reading every file on every worker.". To my knowledge, tensorflow does not provide such a solution as shard provides the same results. For the exact implementation, it depends on your use case (one computer vs multiple computers, network topology, storage infrastructure etc.). As explained in the answer, the implementation can be done using processes and interprocess communication. It can be done in whichever language you prefer.Incessant
To be more precise, shard "loads the data everywhere, and then throw away (N-1)/N of it" only if you load the entire dataset samples and then use shard. In the case of a TFRecordDataset, it does not happen if you use shard right after initialization.Incessant
I think you refer to when you have multiple input files. In your example, this is not the case. But anyway, this is not how I want to do it (using shard). I want to do it like I described in my question, and like you have it in your picture. I want to do it in TensorFlow, and this is what my question is about: How to do exactly that in TensorFlow (not any other solution like IPC in Python).Isola
@Isola You are right, I inserted the wrong code sample. It is now fixed. Unfortunately, I do not know of any other ways to achieve what you want.Incessant
F
1

I would reccomend taking a looking at YogaDL. It allows you to cache your dataset such that during training (or re-training) you will only access the data you need on that shard, rather than throwing away (N-1)/N your data reads.

Fogel answered 18/8, 2020 at 23:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.