Upgrade to tf.dataset not working properly when parsing csv
Asked Answered
N

2

9

I have a GCMLE experiment and I am trying to upgrade my input_fn to use the new tf.data functionality. I have created the following input_fn based off of this sample

def input_fn(...):
    dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards) # shuffle up the list of input files
    dataset = dataset.interleave(lambda filename: # mix together records from cycle_length number of shards
                tf.data.TextLineDataset(filename).skip(1).map(lambda row: parse_csv(row, hparams)), cycle_length=5) 
    if shuffle:
      dataset = dataset.shuffle(buffer_size = 10000)
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)
    iterator = dataset.make_one_shot_iterator()
    features = iterator.get_next()

    labels = features.pop(LABEL_COLUMN)

    return features, labels

my parse_csv is the same as what I used previously, but it is not currently working. I can fix some of the issues, but I don't fully understand why I am having these issues. Here is the start of my parse_csv() function

def parse_csv(..):
    columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
    raw_features = dict(zip(FIELDNAMES, columns))

    words = tf.string_split(raw_features['sentences']) # splitting words
    vocab_table = tf.contrib.lookup.index_table_from_file(vocabulary_file = hparams.vocab_file,
                default_value = 0)

....
  1. Right away this tf.string_split() stops working and the error is ValueError: Shape must be rank 1 but is rank 0 for 'csv_preprocessing/input_sequence_generation/StringSplit' (op: 'StringSplit') with input shapes: [], []. -- this is easily solved by packing raw_features['sentences'] into a tensor via [raw_features['sentences']] but I do not understand why this is needed with the this dataset approach? How come in the old version this worked fine? For the shapes to match up with the rest of my model, I end up needing to remove this extra dimension at the end via words = tf.squeeze(words, 0) because I add this "unecessary" dimension to the tensor.

  2. For whatever reason, I am also getting an error that the table is not initialized tensorflow.python.framework.errors_impl.FailedPreconditionError: Table not initialized. however, this code works completely fine with my old input_fn() (see below) so I don't know why I would now need to initialize the tables? I have not figured out a solution to this part. Is there anything that I am missing to be able to use tf.contrib.lookup.index_table_from_file within my parse_csv function?

For reference, this is my old input_fn() that still does work:

def input_fn(...):
    filename_queue = tf.train.string_input_producer(tf.train.match_filenames_once(filenames), 
                num_epochs=num_epochs, shuffle=shuffle, capacity=32)
    reader = tf.TextLineReader(skip_header_lines=skip_header_lines)

    _, rows = reader.read_up_to(filename_queue, num_records=batch_size)

    features = parse_csv(rows, hparams)


        if shuffle:
            features = tf.train.shuffle_batch(
                features,
                batch_size,
                min_after_dequeue=2 * batch_size + 1,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(), 
                enqueue_many=True,
                allow_smaller_final_batch=True
            )
        else:
            features = tf.train.batch(
                features,
                batch_size,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(),
                enqueue_many=True,
                allow_smaller_final_batch=True
            )

labels = features.pop(LABEL_COLUMN)

return features, labels

UPDATE TF 1.7

I am revisiting this with TF 1.7 (which should have all of the TF 1.6 features mentioned in @mrry answer) but I'm still unable to replicate the behavior. For my old input_fn() I am able to gete around 13 steps/sec. The new function that I am using is as follows:

def input_fn(...):
    files = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
    dataset = files.apply(tf.contrib.data.parallel_interleave(lambda filename: tf.data.TextLineDataset(filename).skip(1), cycle_length=num_shards))
    dataset = dataset.apply(tf.contrib.data.map_and_batch(lambda row:
            parse_csv_dataset(row, hparams = hparams), 
            batch_size = batch_size, 
            num_parallel_batches = multiprocessing.cpu_count())) 
    dataset = dataset.prefetch(1)
    if shuffle:
        dataset = dataset.shuffle(buffer_size = 10000)
    dataset = dataset.repeat(num_epochs)
    iterator = dataset.make_initializable_iterator()
    features = iterator.get_next()
    tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)

    labels = {key: features.pop(key) for key in LABEL_COLUMNS}

    return features, labels 

I believe that I am following all of the performance guildines such as 1) use prefetch 2) use map_and_batch with num_parallel_batches = cores 3) use parallel_interleave 4) applying shuffle before the repeat. The only steps I am not using is the cache suggestion, but would expect that to really only help for epochs beyond the first one as well as "applying interleave, prefetch and shuffle first." -- however I found that having prefetch and shuffle after the map_and_batch was ~10% speedup.

BUFFER ISSUE The first performance issue that I am noticing is with my old input_fn() it took me about 13 wall clock minutes to get through 20k steps, and yet even with the buffer_size of 10,000 (which I take to mean we are waiting until we have 10,000 batches processed) I am still waiting more than 40 minutes for the buffer to get full . Does it make sense to take this long? If I know that my sharded .csv's on GCS are already randomized, is it acceptable to have this shuffle/buffer size smaller? I am trying to replicate the behavior from tf.train.shuffle_batch() -- however, it seems that at worst it should take the same 13 mins that it took to reach 10k steps in order to fill up the buffer?

STEPS/SEC

Even once the buffer has filled up, the global steps/sec tops out around 3 steps/sec (often as low as 2 steps/sec) on the same model with the previous input_fn() that is getting ~13 steps/sec.

SLOPPY INTERLEAVE I finall tried to replace parallel_interleave() with sloppy_interleave() as this is another suggestion from @mrry. When I switched to sloppy_interleave I got 14 steps/sec! I know this means that it is not deterministic, but that should really just mean it is not deterministic from one run (or epoch) to the next? Or are there larger implications for this? Should I be concerned about any real difference between the old shuffle_batch() method and sloppy_interleave? Does the fact that this results in a 4-5x improvement suggest what the previous blocking factor was?

Nich answered 14/2, 2018 at 3:43 Comment(0)
N
2

In TF 1.4 (which is currently the latest version of TF that works with GCMLE) you will not be able to use make_one_shot_iterator() with the lookup tables (see relevant post) you will need to use Dataset.make_initializable_iterator() and then initialize iterator.initalizer with your default TABLES_INITIALIZER (from this post). Here is what the input_fn() should look like:

def input_fn(...):
  dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)

  # Define `vocab_table` outside the map function and use it in `parse_csv()`.
  vocab_table = tf.contrib.lookup.index_table_from_file(
      vocabulary_file=hparams.vocab_file, default_value=0)

  dataset = dataset.interleave(
      lambda filename: (tf.data.TextLineDataset(filename)
                        .skip(1)
                        .map(lambda row: parse_csv(row, hparams),
                             num_parallel_calls=multiprocessing.cpu_count())),
      cycle_length=5) 

  if shuffle:
    dataset = dataset.shuffle(buffer_size=10000)
  dataset = dataset.repeat(num_epochs)
  dataset = dataset.batch(batch_size)
  iterator = dataset.make_initializable_iterator()
  features = iterator.get_next()

  # add iterator.intializer to be handled by default table initializers
  tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer) 

  labels = features.pop(LABEL_COLUMN)

  return features, labels
Nich answered 14/2, 2018 at 14:34 Comment(12)
@mrry this approach appears to be slowing down my model quite significantly. Before this change (using read_up_to()) I was my global steps/sec were around ~120 and now they are down around 6. I'm assuming that this means my current setup is now input/output constrained, but I'm not sure how to check or what I should change. When I used shuffle_batch it automatically output some I/O statistics to tensorboard like fraction_of_N_full which I took to mean that the buffers/queues were all full and everything was running smoothly. Could the interleave/shuffling be creating that much slowdown?Nich
There are some suggestions in the Input Pipeline Performance Guide. The simplest are: (1) try adding dataset = dataset.prefetch(1) just before you create the iterator, and (2) try using tf.contrib.data.parallel_interleave() instead of dataset.interleave() (which parallelizes the I/O and might be important if you're reading from GCS).Marismarisa
Would there be any benefit to prefect buffer_size > 1? Also I see the pipeline performance guide suggests map_and_batch() -- is there any particular reason that you left this out from your suggestion? I guess I currently have map() -> repeat(num_epochs) -> batch(). Would it be appropriate to use map_and_batch() -> repeat(num_epochs) as an alternative?Nich
@Marismarisa it looks like parallel_interleave(), map_and_batch() are also only something that is available for TF 1.5+. Are there any performance optimizations for TF1.4 (seems like prefetch is still available)Nich
Prefetch is the single most useful optimization. Setting a larger buffer can be useful to mask variability in the input pipeline, but might not be important if the CSV data is homogeneous. In TF 1.4 you can use tf.contrib.data.sloppy_interleave(), which has many of the same benefits as its replacement parallel_interleave(), but it is "sloppy" in that it might reorder elements. (The queue-based pipeline is always "sloppy" in this sense, so that might not be a problem for your application.)Marismarisa
Interesting - it just seems like something is pretty significantly off because I can achieve 32 steps/sec on my local CPU with the old queue based readers, but then even when I added prefetch(1) to the new method I was still only achieving 6 steps/sec on a standard_p100. I'm just alternating between the two input functions in my question above. Could there be anything else that is slowing things down? I even have with tf.device('cpu:0') for all of the preprocessing stuff to tie it to the CPU (which helped a lot in my old input_fn)Nich
Are you measuring the time to fill the shuffle buffer, perhaps? The buffer_size=10000 in the tf.data version will need to process 10000 elements before it returns the first one. Your queue-based version has a min_after_dequeue=2 * batch_size + 1, which is much smaller and will start returning data faster.Marismarisa
@Marismarisa I am just looking at global steps/sec once the model is ramped up and has started going. I recognize there can be startup costs, so this is at step say 1,000 and beyond (or just looking at the logs of steps once it has begun).Nich
could it be related to the capacity: capacity=batch_size * 10?Nich
Possibly. A larger capacity has the effect of acting like a larger prefetch buffer, so it might be masking variability in the upstream part of the pipeline. Without concrete values for batch_size and the overall data size, it's difficult to say with any certainty.Marismarisa
batch_size in this example was 32 (purposely going small to see how it would perform) and the individual record data size I would estimate at ~180k records on each 300MB shards which would suggest ~1.67KB per record or about 50KB/0.05MB per batch (pretty lightweight)Nich
@Marismarisa with TF 1.6 and using sloppy_interleave() I was finally able to achieve better performance than the old queue! I'm still not sure exactly how the "sloppy" reordering could have a negative effect on training? Does that fact that this led to a 4-5x speed up suggest anything else about the previous bottleneck? Is it just that some batches took much longer to be processed?Nich
M
2
  1. When you use tf.data.TextLineDataset, each element is a scalar string. In this respect, it is more similar to using tf.TextLineReader.read(), rather than the batch version tf.TextLineReader.read_up_to(), which returns a vector of strings. Unfortunately the tf.string_split() op demands a vector input (although this could potentially be changed in future), so the shape manipulation is currently necessary.

  2. Lookup tables interact a little differently with the functions in tf.data. The intuition is that you should declare the lookup table once outside the Dataset.map() call (so that it will be initialized once) and then capture it inside the parse_csv() function to call vocab_table.lookup(). Something like the following should work:

    def input_fn(...):
      dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
    
      # Define `vocab_table` outside the map function and use it in `parse_csv()`.
      vocab_table = tf.contrib.lookup.index_table_from_file(
          vocabulary_file=hparams.vocab_file, default_value=0)
    
      def parse_csv(...):
        columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
        raw_features = dict(zip(FIELDNAMES, columns))
        words = tf.string_split([raw_features['sentences']]) # splitting words
    
        # Use the captured `vocab_table` here.
        word_indices = vocab_table.lookup(words)
    
        # ...    
        features = ...
    
        # NOTE: Structure the output here so that you can simply return
        # the dataset from `input_fn()`.
        labels = features.pop(LABEL_COLUMN)
        return features, labels
    
      # NOTE: Consider using `tf.contrib.data.parallel_interleave()` to perform
      # the reads in parallel.
      dataset = dataset.interleave(
          lambda filename: (tf.data.TextLineDataset(filename)
                            .skip(1)
                            .map(lambda row: parse_csv(row, hparams),
                                 num_parallel_calls=multiprocessing.cpu_count())),
          cycle_length=5) 
    
      if shuffle:
        dataset = dataset.shuffle(buffer_size=10000)
      dataset = dataset.repeat(num_epochs)
      dataset = dataset.batch(batch_size)
    
      # NOTE: Add prefetching here to run the input pipeline in the background.
      dataset = dataset.prefetch(1)
    
      # NOTE: This requires TensorFlow 1.5 or later, but this change simplifies the
      # initialization of the lookup table.
      return dataset
    
Marismarisa answered 14/2, 2018 at 5:37 Comment(6)
Thanks! So which part requires TF 1.5? Is it just the initialization of the lookup table outside of the parse_csv mapping function? GCMLE currently only works with TF 1.4, so how would the initialization need to be handled differently? I am working through this right now, but it doesn't seem to be working well. I actually have a few tables (for vocab and label files) and it seems like I will need to pass all of these to the parse_csv function? or did you put the parse_csv function within the input_fn purposely so that it would be able to use those tables without passing them along?Nich
I am getting an error when trying to pass the vocab_table (and similar) through parse_csv: ValueError: Cannot capture a stateful node (name:create_lookup_tables/string_to_index/hash_table, type:HashTableV2) by value. and am guessing that this is tied to the TF 1.4 initialization issue. The only difference is that I currently have my parse_csv function defined outside of the input_fn as I also use it as a serving_fnNich
Based on your answer to this question: #44374583 I am guessing that in TF 1.4 I would need to switch to Dataset.make_initializable_iterator() and then call iterator.initializer somewhere? What would be the proper location for calling iterator.initializer?Nich
Think that I was able to post a solution for TF 1.4 below -- let me know if this is the best way for 1.4 :)Nich
I have started a new SO question because I am still having some strouble with TF 1.7 & GCMLE (currently getting issues where the input_fn errors out if the last batch is is a different size). Would you be able to take a look? #50264485Nich
I am trying to follow your guidance in GCMLE with TF 1.10 but it is not clear to me where I should be handling the initialization of the lookup tables and creation of the iterator (if still necessary) while still returning a dataset. I have added a new SO question here: #52934454Nich
N
2

In TF 1.4 (which is currently the latest version of TF that works with GCMLE) you will not be able to use make_one_shot_iterator() with the lookup tables (see relevant post) you will need to use Dataset.make_initializable_iterator() and then initialize iterator.initalizer with your default TABLES_INITIALIZER (from this post). Here is what the input_fn() should look like:

def input_fn(...):
  dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)

  # Define `vocab_table` outside the map function and use it in `parse_csv()`.
  vocab_table = tf.contrib.lookup.index_table_from_file(
      vocabulary_file=hparams.vocab_file, default_value=0)

  dataset = dataset.interleave(
      lambda filename: (tf.data.TextLineDataset(filename)
                        .skip(1)
                        .map(lambda row: parse_csv(row, hparams),
                             num_parallel_calls=multiprocessing.cpu_count())),
      cycle_length=5) 

  if shuffle:
    dataset = dataset.shuffle(buffer_size=10000)
  dataset = dataset.repeat(num_epochs)
  dataset = dataset.batch(batch_size)
  iterator = dataset.make_initializable_iterator()
  features = iterator.get_next()

  # add iterator.intializer to be handled by default table initializers
  tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer) 

  labels = features.pop(LABEL_COLUMN)

  return features, labels
Nich answered 14/2, 2018 at 14:34 Comment(12)
@mrry this approach appears to be slowing down my model quite significantly. Before this change (using read_up_to()) I was my global steps/sec were around ~120 and now they are down around 6. I'm assuming that this means my current setup is now input/output constrained, but I'm not sure how to check or what I should change. When I used shuffle_batch it automatically output some I/O statistics to tensorboard like fraction_of_N_full which I took to mean that the buffers/queues were all full and everything was running smoothly. Could the interleave/shuffling be creating that much slowdown?Nich
There are some suggestions in the Input Pipeline Performance Guide. The simplest are: (1) try adding dataset = dataset.prefetch(1) just before you create the iterator, and (2) try using tf.contrib.data.parallel_interleave() instead of dataset.interleave() (which parallelizes the I/O and might be important if you're reading from GCS).Marismarisa
Would there be any benefit to prefect buffer_size > 1? Also I see the pipeline performance guide suggests map_and_batch() -- is there any particular reason that you left this out from your suggestion? I guess I currently have map() -> repeat(num_epochs) -> batch(). Would it be appropriate to use map_and_batch() -> repeat(num_epochs) as an alternative?Nich
@Marismarisa it looks like parallel_interleave(), map_and_batch() are also only something that is available for TF 1.5+. Are there any performance optimizations for TF1.4 (seems like prefetch is still available)Nich
Prefetch is the single most useful optimization. Setting a larger buffer can be useful to mask variability in the input pipeline, but might not be important if the CSV data is homogeneous. In TF 1.4 you can use tf.contrib.data.sloppy_interleave(), which has many of the same benefits as its replacement parallel_interleave(), but it is "sloppy" in that it might reorder elements. (The queue-based pipeline is always "sloppy" in this sense, so that might not be a problem for your application.)Marismarisa
Interesting - it just seems like something is pretty significantly off because I can achieve 32 steps/sec on my local CPU with the old queue based readers, but then even when I added prefetch(1) to the new method I was still only achieving 6 steps/sec on a standard_p100. I'm just alternating between the two input functions in my question above. Could there be anything else that is slowing things down? I even have with tf.device('cpu:0') for all of the preprocessing stuff to tie it to the CPU (which helped a lot in my old input_fn)Nich
Are you measuring the time to fill the shuffle buffer, perhaps? The buffer_size=10000 in the tf.data version will need to process 10000 elements before it returns the first one. Your queue-based version has a min_after_dequeue=2 * batch_size + 1, which is much smaller and will start returning data faster.Marismarisa
@Marismarisa I am just looking at global steps/sec once the model is ramped up and has started going. I recognize there can be startup costs, so this is at step say 1,000 and beyond (or just looking at the logs of steps once it has begun).Nich
could it be related to the capacity: capacity=batch_size * 10?Nich
Possibly. A larger capacity has the effect of acting like a larger prefetch buffer, so it might be masking variability in the upstream part of the pipeline. Without concrete values for batch_size and the overall data size, it's difficult to say with any certainty.Marismarisa
batch_size in this example was 32 (purposely going small to see how it would perform) and the individual record data size I would estimate at ~180k records on each 300MB shards which would suggest ~1.67KB per record or about 50KB/0.05MB per batch (pretty lightweight)Nich
@Marismarisa with TF 1.6 and using sloppy_interleave() I was finally able to achieve better performance than the old queue! I'm still not sure exactly how the "sloppy" reordering could have a negative effect on training? Does that fact that this led to a 4-5x speed up suggest anything else about the previous bottleneck? Is it just that some batches took much longer to be processed?Nich

© 2022 - 2024 — McMap. All rights reserved.