How to efficiently save a Pandas Dataframe into one/more TFRecord file?
Asked Answered
E

3

19

First I want to quickly give some background. What I want to achieve eventually is to train a fully connected neural network for a multi-class classification problem under tensorflow framework.

The challenge of the problem is that the size of training data is huge (~ 2 TB). In order for the training to work under limited memory, I want to save training set into small files and use mini-batch gradient descent algorithm to train the model. (Each time only one or a few files are loaded into the memory).

Say now I already have two data frames with processed data, one with X_train (7 million entries * 200 features with column names) and one with training_y (7 million entries * 1 label). How can I efficiently save this into TFrecord files, keeping column names, row index, etc, and I may want to have each file to contain say 100,000 entries? I know that with everything under TFrecord I can utilize some of the neat shuffling and batching functionalities implemented in tensorflow. I probably need a very efficient way to write such records because later on I will need to write 2TB of data into this file format.

I tried to search "How to write pandas data frame to TFRecords" on Google but didn't get any luck on good examples. Most examples ask me to create a tf.train.Example column by column, row by row and write to tfrecord file using tf.python_io.TFRecordWriter. Just want to confirm this is best of what I can get here.

If you have other suggestions for the problem I am trying to solve, it will be much appreciated too!

Educatee answered 11/10, 2017 at 3:36 Comment(0)
G
6

You can check here to write pandas df to tfRecord

install pandas-tfrecords

pip install pandas-tfrecords

Try

import pandas as pd
from pandas_tfrecords import pd2tf, tf2pd

df = pd.DataFrame({'A': [1, 2, 3], 'B': ['a', 'b', 'c'], 'C': [[1, 2], [3, 4], [5, 6]]})

# local
pd2tf(df, './tfrecords')
my_df = tf2pd('./tfrecords')

Hope this will help.

Glanville answered 21/5, 2020 at 7:38 Comment(1)
Everything would be good about this package if it actually inherited the type from the pandas frame. The get_schema method has a bug.Firman
T
2

Turning a file to TFRecords is (unfortunately) quite involved if you are only using tensorflow and pandas. As other answers have given clever ways to avoid this, I will show how to make the conversion using only tensorflow and pandas, if only for completeness' sake.

TRIGGER WARNING: lots of TF boilerplate. You have been warned.

import pandas as pd
import tensorflow as tf

#Creating fake data for demonstration
X_train = pd.DataFrame({'feat1':[1,2,3], 
                  'feat2':['one','two','three']})
training_y = pd.DataFrame({'target': [3.4, 11.67, 44444.1]})

X_train.to_csv('X_train.csv')
training_y.to_csv('training_y.csv')

#TFRecords boilerplate
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def serialize_example(index, feat1, feat2, target):
    """
    Creates a tf.train.Example message ready to be written to a file.
    """
    # Create a dictionary mapping the feature name to the tf.train.Example-compatible
    # data type.
    feature = {
      'index': _int64_feature(index),
      'feat1': _int64_feature(feat1),
      'feat2': _bytes_feature(feat2),
      'target': _float_feature(target)
    }
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

#Loading the data into chunks of size 2.  Change this to 1e5 in your code
CHUNKSIZE = 2
train = pd.read_csv('X_train.csv', chunksize=CHUNKSIZE)
y = pd.read_csv('training_y.csv', chunksize=CHUNKSIZE)

file_num = 0
while 1:
    try:
        print(f'{file_num}')
        #Getting the data from the two files 
        df = pd.concat([train.get_chunk(), y.get_chunk()],1)
        
        #Writing the TFRecord
        with tf.io.TFRecordWriter(f'Record_{file_num}.tfrec') as writer:
            for k in range(df.shape[0]):
                row = df.iloc[k,:]
                example = serialize_example(
                    df.index[k],
                    row['feat1'],
                    str.encode(row['feat2']), #Note the str.encode to make tf play nice with strings
                    row['target']) 
                writer.write(example)    
        file_num += 1
    except:
        print(f'ERROR: {sys.exc_info()[0]}')
        break

The code above loads the files in chunks using the chunksize parameter of pandas.read_csv. If your files are not csv, check if the appropriate pandas read_filetype has the chunksize parameter.

In writing this, I leaned heavily on Chris Deotte's How to Create TFRecords kernel. I tried the official documentation, but they would fail to mention things like how to get tf.io to read your pandas string. This made life significantly harder.

If, for whatever reason, you feel the need to check inside the TFRecords to make sure that the data is correct, you will need even more boilerplate. Enjoy.

#Reading the TFRecord
def read_tfrecord(example):
    LABELED_TFREC_FORMAT = {
        "index": tf.io.FixedLenFeature([], tf.int64), 
        "feat1": tf.io.FixedLenFeature([], tf.int64),
        "feat2": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([], tf.float32)
    }
    
    example = tf.io.parse_single_example(example, LABELED_TFREC_FORMAT)
    index = example['index']
    feat1 = example['feat1']
    feat2 = example['feat2']
    target = example['target']
    return index, feat1, feat2, target 

def load_dataset(filenames, labeled=True, ordered=False):
    # Read from TFRecords. For optimal performance, reading from multiple files at once and
    # disregarding data order. Order does not matter since we will be shuffling the data anyway.

    ignore_order = tf.data.Options()
    if not ordered:
        ignore_order.experimental_deterministic = False # disable order, increase speed

    dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTO) # automatically interleaves reads from multiple files
    dataset = dataset.with_options(ignore_order) # uses data as soon as it streams in, rather than in its original order
    dataset = dataset.map(read_tfrecord)
    # returns a dataset of (image, label) pairs if labeled=True or (image, id) pairs if labeled=False
    return dataset

AUTO = tf.data.experimental.AUTOTUNE
def get_training_dataset(filenames, batch_size=2):
    dataset = load_dataset(filenames, labeled=True)
    dataset = dataset.repeat() # the training dataset must repeat for several epochs
    #dataset = dataset.shuffle(2048)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(AUTO) # prefetch next batch while training (autotune prefetch buffer size)
    return dataset

training_dataset = get_training_dataset(filenames= ['Record_0.tfrec', 'Record_1.tfrec'])
#training_dataset = training_dataset.unbatch().batch(20)
next(iter(training_dataset))

(<tf.Tensor: shape=(2,), dtype=int64, numpy=array([0, 2])>,

<tf.Tensor: shape=(2,), dtype=int64, numpy=array([1, 3])>,

<tf.Tensor: shape=(2,), dtype=int64, numpy=array([11, 33])>,

<tf.Tensor: shape=(2,), dtype=string, numpy=array([b'one', b'three'], dtype=object)>,

<tf.Tensor: shape=(2,), dtype=float32, numpy=array([3.40000e+00, 4.44441e+04], dtype=float32)>)

Triceratops answered 28/10, 2021 at 21:11 Comment(2)
LABELED_TFREC_FORMAT and def serialize_example have the list of the dataframe columns ... hardcoded. IS there a method that would allow this to be done dynamically? IF my dataframe has 20 columns instead of 3, extending this answer for many columns would be prohibitive. And if I had multiple dataframes to process, the wheels fall off the bus. IS this possible? thanksHeterosexual
It is certainly possible! Here is the sketch for turning the csv to tfrecords: Make the serialize_example function accept the index and row. The feature inside of serialize example can be created using dictionary comprehension as below: ``` def pd_to_tf(col): if 'int' in col.dtype.name: return _int64_feature(column) elif 'float in col.dtype.name: return _float_feature(target) elif 'str' in col.dtype.name: return _bytes_feature(str.encode(col)) feature = {col: pd_to_tf(row[col]) for col in row.columns)} ```Triceratops
D
1

A work around that might work is to export the pandas dataframe to a parquet file. This is one of the best ways to efficiently store data since the data will be partitioned into some files.

You can even decide which column to use for the partitons so that each unique value of that column will go into one file. More info to_parquet pandas doc.

Then you can do the batch process using those partitions.

Dolph answered 7/2, 2020 at 11:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.