Apply TensorFlow Transform to transform/scale feat

2020-08-26 04:32发布

问题:

Overview

I followed the following guide to write TF Records, where I used tf.Transform to preprocess my features. Now, I would like to deploy my model, for which I need apply this preprocessing function on real live data.

My Approach

First, suppose I have 2 features:

features = ['amount', 'age']

I have the transform_fn from the Apache Beam, residing in working_dir=gs://path-to-transform-fn/

Then I load the transform function using:

tf_transform_output = tft.TFTransformOutput(working_dir)

I thought that the easiest way to serve in in production was to get a numpy array of processed data, and call model.predict() (I am using Keras model).

To do this, I thought transform_raw_features() method is exactly what I need.

However, it seems that after building the schema:

raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})

print(tf_transform_output.transform_raw_features(raw_features))

I get:

AttributeError: 'Tensor' object has no attribute 'indices'

Now, I am assuming this happens because I used tf.VarLenFeature() when I defined schema in my preprocessing_fn.

def preprocessing_fn(inputs):
    outputs = inputs.copy()

    for _ in features:
        outputs[_] = tft.scale_to_z_score(outputs[_])

And I build the metadata using:

RAW_DATA_FEATURE_SPEC = {}
for _ in features:
    RAW_DATA_FEATURE_SPEC[_] = tf.VarLenFeature(dtype=tf.float32)
    RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC))

So in short, given a dictionary:

d = {'amount': [50], 'age': [32]}, I would like to apply this transform_fn, and scale these values appropriately to input into my model for prediction. This dictionary is exactly the format of my PCollection before the data is processed by the pre_processing() function.

Pipeline Structure:

class BeamProccess():

def __init__(self):

    # init 

    self.run()


def run(self):

    def preprocessing_fn(inputs):

         # outputs = { 'id' : [list], 'amount': [list], 'age': [list] }
         return outputs

    with beam.Pipeline(options=self.pipe_opt) as p:
        with beam_impl.Context(temp_dir=self.google_cloud_options.temp_location):
            data = p | "read_table" >> beam.io.Read(table_bq) \
            | "create_data" >> beam.ParDo(ProcessFn())

            transformed_dataset, transform_fn = (
                        (train, RAW_DATA_METADATA) | beam_impl.AnalyzeAndTransformDataset(
                    preprocessing_fn))

            transformed_data, transformed_metadata = transformed_dataset

            transformed_data | "WriteTrainTFRecords" >> tfrecordio.WriteToTFRecord(
                    file_path_prefix=self.JOB_DIR + '/train/data',
                    file_name_suffix='.tfrecord',
                    coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))

            _ = (
                        transform_fn
                        | 'WriteTransformFn' >>
                        transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))

And finally the ParDo() is:

class ProcessFn(beam.DoFn):

    def process(self, element):

        yield { 'id' : [list], 'amount': [list], 'age': [list] }

回答1:

The problem is with the snippet

raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})

print(tf_transform_output.transform_raw_features(raw_features))

In this code you construct a dictionary where the values are tensors. Like you said, this won't work for a VarLenFeature. Instead of using tf.constant try using tf.placeholder for a a FixedLenFeature and tf.sparse_placeholder for a VarLenFeature.