TensorFlow - tf.data.Dataset reading large HDF5 files
Asked Answered
T

3

23

I am setting up a TensorFlow pipeline for reading large HDF5 files as input for my deep learning models. Each HDF5 file contains 100 videos of variable size length stored as a collection of compressed JPG images (to make size on disk manageable). Using tf.data.Dataset and a map to tf.py_func, reading examples from the HDF5 file using custom Python logic is quite easy. For example:

def read_examples_hdf5(filename, label):
    with h5py.File(filename, 'r') as hf:
        # read frames from HDF5 and decode them from JPG
    return frames, label

filenames = glob.glob(os.path.join(hdf5_data_path, "*.h5"))
labels = [0]*len(filenames) # ... can we do this more elegantly?

dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(
    lambda filename, label: tuple(tf.py_func(
        read_examples_hdf5, [filename, label], [tf.uint8, tf.int64]))
)

dataset = dataset.shuffle(1000 + 3 * BATCH_SIZE)
dataset = dataset.batch(BATCH_SIZE)
iterator = dataset.make_one_shot_iterator()
next_batch = iterator.get_next()

This example works, however the problem is that it seems like tf.py_func can only handle one example at a time. As my HDF5 container stores 100 examples, this limitation causes significant overhead as the files constantly need to be opened, read, closed and reopened. It would be much more efficient to read all the 100 video examples into the dataset object and then move on with the next HDF5 file (preferably in multiple threads, each thread dealing with it's own collection of HDF5 files).

So, what I would like is a number of threads running in the background, reading video frames from the HDF5 files, decode them from JPG and then feed them into the dataset object. Prior to the introduction of the tf.data.Dataset pipeline, this was quite easy using the RandomShuffleQueue and enqueue_many ops, but it seems like there is currently no elegant way of doing this (or the documentation is lacking).

Does anyone know what would be the best way of achieving my goal? I have also looked into (and implemented) the pipeline using tfrecord files, but taking a random sample of video frames stored in a tfrecord file seems quite impossible (see here). Additionally, I have looked at the from_generator() inputs for tf.data.Dataset but that is definitely not going to run in multiple threads it seems. Any suggestions are more than welcome.

Twi answered 17/1, 2018 at 20:39 Comment(2)
Does the tf.data.Dataset.map(your_map_function, num_parallel_calls=N) do what you want? It will run N threads of your map function. The problem that I see with this is that you now have 6 threads each reading 1 HDF5 file, meaning you better have enough memory for all 6 full HDF5 files. I came to this question because of a related question I posted trying to resolve the issue of limited memory and large HDF5 files. #48349909Melesa
Memory with the HDF5 files is not an issue as it does not read the entire file into memory. But it remains problematic to open-read-close-reopen ... the files all the time and this will come at significant speed penalty.Twi
A
28

I stumbled across this question while dealing with a similar issue. I came up with a solution based on using a Python generator, together with the TF dataset construction method from_generator. Because we use a generator, the HDF5 file should be opened for reading only once and kept open as long as there are entries to read. So it will not be opened, read, and then closed for every single call to get the next data element.

Generator definition

To allow the user to pass in the HDF5 filename as an argument, I generated a class that has a __call__ method since from_generator specifies that the generator has to be callable. This is the generator:

import h5py
import tensorflow as tf

class generator:
    def __init__(self, file):
        self.file = file

    def __call__(self):
        with h5py.File(self.file, 'r') as hf:
            for im in hf["train_img"]:
                yield im

By using a generator, the code should pick up from where it left off at each call from the last time it returned a result, instead of running everything from the beginning again. In this case it is on the next iteration of the inner for loop. So this should skip opening the file again for reading, keeping it open as long as there is data to yield. For more on generators, see this excellent Q&A.

Of course, you will have to replace anything inside the with block to match how your dataset is constructed and what outputs you want to obtain.

Usage example

ds = tf.data.Dataset.from_generator(
    generator(hdf5_path), 
    tf.uint8, 
    tf.TensorShape([427,561,3]))

value = ds.make_one_shot_iterator().get_next()

# Example on how to read elements
while True:
    try:
        data = sess.run(value)
        print(data.shape)
    except tf.errors.OutOfRangeError:
        print('done.')
        break

Again, in my case I had stored uint8 images of height 427, width 561, and 3 color channels in my dataset, so you will need to modify these in the above call to match your use case.

Handling multiple files

I have a proposed solution for handling multiple HDF5 files. The basic idea is to construct a Dataset from the filenames as usual, and then use the interleave method to process many input files concurrently, getting samples from each of them to form a batch, for example.

The idea is as follows:

ds = tf.data.Dataset.from_tensor_slices(filenames)
# You might want to shuffle() the filenames here depending on the application
ds = ds.interleave(lambda filename: tf.data.Dataset.from_generator(
        generator(filename), 
        tf.uint8, 
        tf.TensorShape([427,561,3])),
       cycle_length, block_length)

What this does is open cycle_length files concurrently, and produce block_length items from each before moving to the next file - see interleave documentation for details. You can set the values here to match what is appropriate for your application: e.g., do you need to process one file at a time or several concurrently, do you only want to have a single sample at a time from each file, and so on.

Edit: for a parallel version, take a look at tf.contrib.data.parallel_interleave!

Possible caveats

Be aware of the peculiarities of using from_generator if you decide to go with the solution. For Tensorflow 1.6.0, the documentation of from_generator mentions these two notes.

It may be challenging to apply this across different environments or with distributed training:

NOTE: The current implementation of Dataset.from_generator() uses tf.py_func and inherits the same constraints. In particular, it requires the Dataset- and Iterator-related operations to be placed on a device in the same process as the Python program that called Dataset.from_generator(). The body of generator will not be serialized in a GraphDef, and you should not use this method if you need to serialize your model and restore it in a different environment.

Be careful if the generator depends on external state:

NOTE: If generator depends on mutable global variables or other external state, be aware that the runtime may invoke generator multiple times (in order to support repeating the Dataset) and at any time between the call to Dataset.from_generator() and the production of the first element from the generator. Mutating global variables or external state can cause undefined behavior, and we recommend that you explicitly cache any external state in generator before calling Dataset.from_generator().

Ascham answered 17/3, 2018 at 12:24 Comment(3)
Is there a way to load multiple datasets from the same hdf file? For example hf["train_img"] and hf["labels"].Nave
I have a follow-up question on this. The example of a single HDF5 file works correctly, but I have trouble getting the multi example to work with interleave. The problem is that tf.data.Dataset.from_tensor_slices(filenames) returns a collection of Tensor objects rather than Python strings and thus the generator can't deal with this. What is the correct way of dealing with this?Twi
maybe you want to add this to your caveats (as of TF 2.6.x): "Caution: While this is a convienient approach it has limited portability and scalibility. It must run in the same python process that created the generator, and is still subject to the Python GIL." tensorflow.org/guide/data#consuming_python_generatorsComprehend
E
8

I took me a while to figure this out, so I thought I should record this here. Based on mikkola's answer, this is how to handle multiple files:

import h5py
import tensorflow as tf

class generator:
    def __call__(self, file):
        with h5py.File(file, 'r') as hf:
            for im in hf["train_img"]:
                yield im

ds = tf.data.Dataset.from_tensor_slices(filenames)
ds = ds.interleave(lambda filename: tf.data.Dataset.from_generator(
        generator(), 
        tf.uint8, 
        tf.TensorShape([427,561,3]),
        args=(filename,)),
       cycle_length, block_length)

The key is you can't pass filename directly to generator, since it's a Tensor. You have to pass it through args, which tensorflow evaluates and converts it to a regular python variable.

Echelon answered 6/7, 2018 at 4:49 Comment(4)
I can confirm I needed this modification to get it to work.Lepidosiren
Hello, In the above generator, you didn't return the label of the image? how to return the label? what I want to do is : yield h5file[data_name], label, is there any change need to be made about the the shape of the return: tf.TensorShape([427,561,3]),Daveta
I regret not looking at the second answer (ie. this one). Took me hours to try to get the original one work!Synecious
@Daveta to get the label, what I'm doing is that I'm concating the label with the rest of the output in the generator, and then pass it through a tf.data.Dataset.map() that just slices it into two parts, the features and the labels.Synecious
P
0

The solution provided by @mikkola was edited last on Mar 19, 2018. There have been several changes made to TensorFlow since then - therefore you might have problems running that with one of the recent versions of TensorFlow. Let me provide my version of the solution for TensorFlow 2.10.1 (although it is not the latest version - I believe the modifications will be similar for the latest ones as well):

hdf5_path = ...  # define the path to hdf5 file

class generator:
    def __init__(self, file):
        self.file = file

    def __call__(self):
        with h5py.File(file, 'r') as hf:
            for im in hf["train_img"]:
                yield im

# Define the TensorFlow graph
graph = tf.Graph()
with graph.as_default():
    ds = tf.data.Dataset.from_generator(
       generator(hdf5_path), 
       tf.uint8, 
       tf.TensorShape([427,561,3]))

    value  = tf.compat.v1.data.make_one_shot_iterator(ds).get_next()

# Create a session and run the graph
with tf.compat.v1.Session(graph=graph) as sess:
    # Example on how to read elements
    while True:
        try:
            data = sess.run(value)
            print(data)
        except tf.errors.OutOfRangeError:
            print('done.')
            break
Permanganate answered 5/3, 2023 at 10:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.