Overview
I followed the following guide to write TF Records, where I used tf.Transform
to preprocess my features. Now, I would like to deploy my model, for which I need apply this preprocessing function on real live data.
My Approach
First, suppose I have 2 features:
features = ['amount', 'age']
I have the transform_fn
from the Apache Beam, residing in working_dir=gs://path-to-transform-fn/
Then I load the transform function using:
tf_transform_output = tft.TFTransformOutput(working_dir)
I thought that the easiest way to serve in in production was to get a numpy array of processed data, and call model.predict()
(I am using Keras model).
To do this, I thought transform_raw_features()
method is exactly what I need.
However, it seems that after building the schema:
raw_features = {}
for k in features:
raw_features.update({k: tf.constant(1)})
print(tf_transform_output.transform_raw_features(raw_features))
I get:
AttributeError: 'Tensor' object has no attribute 'indices'
Now, I am assuming this happens because I used tf.VarLenFeature()
when I defined schema in my preprocessing_fn
.
def preprocessing_fn(inputs):
outputs = inputs.copy()
for _ in features:
outputs[_] = tft.scale_to_z_score(outputs[_])
And I build the metadata using:
RAW_DATA_FEATURE_SPEC = {}
for _ in features:
RAW_DATA_FEATURE_SPEC[_] = tf.VarLenFeature(dtype=tf.float32)
RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC))
So in short, given a dictionary:
d = {'amount': [50], 'age': [32]}
, I would like to apply this transform_fn
, and scale these values appropriately to input into my model for prediction. This dictionary is exactly the format of my PCollection
before the data is processed by the pre_processing()
function.
Pipeline Structure:
class BeamProccess():
def __init__(self):
# init
self.run()
def run(self):
def preprocessing_fn(inputs):
# outputs = { 'id' : [list], 'amount': [list], 'age': [list] }
return outputs
with beam.Pipeline(options=self.pipe_opt) as p:
with beam_impl.Context(temp_dir=self.google_cloud_options.temp_location):
data = p | "read_table" >> beam.io.Read(table_bq) \
| "create_data" >> beam.ParDo(ProcessFn())
transformed_dataset, transform_fn = (
(train, RAW_DATA_METADATA) | beam_impl.AnalyzeAndTransformDataset(
preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
transformed_data | "WriteTrainTFRecords" >> tfrecordio.WriteToTFRecord(
file_path_prefix=self.JOB_DIR + '/train/data',
file_name_suffix='.tfrecord',
coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
_ = (
transform_fn
| 'WriteTransformFn' >>
transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))
And finally the ParDo()
is:
class ProcessFn(beam.DoFn):
def process(self, element):
yield { 'id' : [list], 'amount': [list], 'age': [list] }
ParDos
? Can you post the wholeParDo
, and maybe the pipeline? Sometimes errors like this can happen when you return a single object from aParDo
instead of a list. – Algae