I want to write a custom Tensorflow op in Python and register it in the Protobuf registry for operations like explained here. The Protobuf registration is key because I will not be using this op directly from Python, but if it is registered like a C++ op and loaded into the Python runtime environment then I can run it in my environment.
I would expect the code to look something like,
import tensorflow as tf
from google.protobuf import json_format
from tensorflow.python.ops.data_flow_ops import QueueBase, _as_type_list, _as_shape_list, _as_name_list
""" Missing the Python equivalent of,
class HDF5QueueOp : public ResourceOpKernel<QueueInterface> {
public:
// Implementation
};
REGISTER_OP("HDF5Queue")
.Output("handle: resource")
.Attr("filename: string")
.Attr("datasets: list(string)")
.Attr("overwrite: bool = false")
.Attr("component_types: list(type) >= 0 = []")
.Attr("shapes: list(shape) >= 0 = []")
.Attr("shared_name: string = ''")
.Attr("container: string = ''")
.Attr("capacity: int = -1")
.SetIsStateful()
.SetShapeFn(TwoElementOutput);
"""
class HDF5Queue(QueueBase):
def __init__(self, stream_id, stream_columns, dtypes=None, capacity=100,
shapes=None, names=None, name="hdf5_queue"):
if not dtypes:
dtypes = [tf.int64, tf.float32]
if not shapes:
shapes = [[1], [1]]
dtypes = _as_type_list(dtypes)
shapes = _as_shape_list(shapes, dtypes)
names = _as_name_list(names, dtypes)
queue_ref = _op_def_lib.apply_op("HDF5Queue", stream_id=stream_id,
stream_columns=stream_columns, capacity=capacity,
component_types=dtypes, shapes=shapes,
name=name, container=None, shared_name=None)
super(HDF5Queue, self).__init__(dtypes, shapes,
names, queue_ref)
The above is pretty standard from TF. It can be seen for example with FIFOQueue. Python Wrapper, Protobuf Registration, C++ Implementation. There is a Python wrapper generated during compilation that I can't like to, but you see where its used by running grep -A 10 -B 10 -n FIFO $(find /usr/local -name "*gen_data_flow*.py") /dev/null
Below will dump a Protobuf message for the TF Graph in JSON format. I would expect this to dump with a block for the HDF5Queue operation as it does if I write C++ operations.
with tf.Session() as sess:
queue = HDF5Queue(stream_id=0xa)
write = queue.enqueue([[1], [1.2]])
read = queue.dequeue()
print json_format.MessageToJson(tf.train.export_meta_graph())