Apply TensorFlow Transform to transform/scale features in production
Asked Answered
E

1

9

Overview

I followed the following guide to write TF Records, where I used tf.Transform to preprocess my features. Now, I would like to deploy my model, for which I need apply this preprocessing function on real live data.

My Approach

First, suppose I have 2 features:

features = ['amount', 'age']

I have the transform_fn from the Apache Beam, residing in working_dir=gs://path-to-transform-fn/

Then I load the transform function using:

tf_transform_output = tft.TFTransformOutput(working_dir)

I thought that the easiest way to serve in in production was to get a numpy array of processed data, and call model.predict() (I am using Keras model).

To do this, I thought transform_raw_features() method is exactly what I need.

However, it seems that after building the schema:

raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})

print(tf_transform_output.transform_raw_features(raw_features))

I get:

AttributeError: 'Tensor' object has no attribute 'indices'

Now, I am assuming this happens because I used tf.VarLenFeature() when I defined schema in my preprocessing_fn.

def preprocessing_fn(inputs):
    outputs = inputs.copy()

    for _ in features:
        outputs[_] = tft.scale_to_z_score(outputs[_])

And I build the metadata using:

RAW_DATA_FEATURE_SPEC = {}
for _ in features:
    RAW_DATA_FEATURE_SPEC[_] = tf.VarLenFeature(dtype=tf.float32)
    RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC))

So in short, given a dictionary:

d = {'amount': [50], 'age': [32]}, I would like to apply this transform_fn, and scale these values appropriately to input into my model for prediction. This dictionary is exactly the format of my PCollection before the data is processed by the pre_processing() function.

Pipeline Structure:

class BeamProccess():

def __init__(self):

    # init 

    self.run()


def run(self):

    def preprocessing_fn(inputs):

         # outputs = { 'id' : [list], 'amount': [list], 'age': [list] }
         return outputs

    with beam.Pipeline(options=self.pipe_opt) as p:
        with beam_impl.Context(temp_dir=self.google_cloud_options.temp_location):
            data = p | "read_table" >> beam.io.Read(table_bq) \
            | "create_data" >> beam.ParDo(ProcessFn())

            transformed_dataset, transform_fn = (
                        (train, RAW_DATA_METADATA) | beam_impl.AnalyzeAndTransformDataset(
                    preprocessing_fn))

            transformed_data, transformed_metadata = transformed_dataset

            transformed_data | "WriteTrainTFRecords" >> tfrecordio.WriteToTFRecord(
                    file_path_prefix=self.JOB_DIR + '/train/data',
                    file_name_suffix='.tfrecord',
                    coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))

            _ = (
                        transform_fn
                        | 'WriteTransformFn' >>
                        transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))

And finally the ParDo() is:

class ProcessFn(beam.DoFn):

    def process(self, element):

        yield { 'id' : [list], 'amount': [list], 'age': [list] }
English answered 7/1, 2019 at 20:54 Comment(2)
Is the code part of the ParDos? Can you post the whole ParDo, and maybe the pipeline? Sometimes errors like this can happen when you return a single object from a ParDo instead of a list.Algae
@Algae Added my pipeline structure. My code is actually not part of the ParDos. It is separate, since at serving time, I will not have access to the Beam. Hence I wanted to use the transforms and transform raw dictionaries that I feed myself. Thank you!English
S
7

The problem is with the snippet

raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})

print(tf_transform_output.transform_raw_features(raw_features))

In this code you construct a dictionary where the values are tensors. Like you said, this won't work for a VarLenFeature. Instead of using tf.constant try using tf.placeholder for a a FixedLenFeature and tf.sparse_placeholder for a VarLenFeature.

Spectrophotometer answered 25/1, 2019 at 20:25 Comment(1)
Currently So beam input for preprocessing differs from input at serving time? I go around the problem by feeding tf.sparse.SparseTensor(indices=[[0,0], [1,0]], values=[20,30], dense_shape=[2,1]) directly. However, when the beam applies the transform. I only feed the dictionary: d = { 'age': [20, 30] }. But now, I need to manually transform these dictionaries to sparse tensors, and then use the transform graph on them, and finally turn them into dense tensors which can be fed into the model. Also I don't understand why the raw input into Beam is 1D, while sparse placeholders are 2DEnglish

© 2022 - 2024 — McMap. All rights reserved.