Im trying to catch any exception that is raised in any servicer so I can make sure that I only propagate known exceptions and not unexpected ones like ValueError, TypeError etc.
I'd like to be able to catch any raised error, and format them or convert them to other errors to better control the info that is exposed.
I don't want to have to enclose every servicer method with try/except.
I've tried with an interceptor, but im not able to catch the errors there.
Is there a way of specifying an error handler for the grpc Server? like what you do with flask or any other http server?
gRPC Python currently don't support server-side global error handler. The interceptor won't execute the server handler inside the intercept_service
function, so there is no way to try/except.
Also, I found the gRPC Python server interceptor implementation is different from what they proposed original at L13-Python-Interceptors.md#server-interceptors. If the implementation stick to the original design, we can use interceptor as global error handler easily with handler
and request
/request_iterator
.
# Current Implementation
intercept_service(self, continuation, handler_call_details)
# Original Design
intercept_unary_unary_handler(self, handler, method, request, servicer_context)
intercept_unary_stream_handler(self, handler, method, request, servicer_context)
intercept_stream_unary_handler(self, handler, method, request_iterator, servicer_context)
intercept_stream_stream_handler(self, handler, method, request_iterator, servicer_context)
Please submit a feature request issue to https://github.com/grpc/grpc/issues.
Maybe this will help you :)
def _wrap_rpc_behavior(handler, fn):
if handler is None:
return None
if handler.request_streaming and handler.response_streaming:
behavior_fn = handler.stream_stream
handler_factory = grpc.stream_stream_rpc_method_handler
elif handler.request_streaming and not handler.response_streaming:
behavior_fn = handler.stream_unary
handler_factory = grpc.stream_unary_rpc_method_handler
elif not handler.request_streaming and handler.response_streaming:
behavior_fn = handler.unary_stream
handler_factory = grpc.unary_stream_rpc_method_handler
else:
behavior_fn = handler.unary_unary
handler_factory = grpc.unary_unary_rpc_method_handler
return handler_factory(fn(behavior_fn,
handler.request_streaming,
handler.response_streaming),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer)
class TracebackLoggerInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
def latency_wrapper(behavior, request_streaming, response_streaming):
def new_behavior(request_or_iterator, servicer_context):
try:
return behavior(request_or_iterator, servicer_context)
except Exception as err:
logger.exception(err, exc_info=True)
return new_behavior
return _wrap_rpc_behavior(continuation(handler_call_details), latency_wrapper)
As some of the previous comments suggested, I tried the meta-class approach which works quite well.
Attached is a simple example to demonstrate how to intercept the grpc calls.
You could extend this by providing the metaclass a list of decorators which you could apply on each function.
Also, it would be wise to be more selective regarding the methods you apply the wrapper to. A good option would be to list the methods of the autogenerated base class and only wrap those.
from types import FunctionType
from functools import wraps
def wrapper(method):
@wraps(method)
def wrapped(*args, **kwargs):
# do stuff here
return method(*args, **kwargs)
return wrapped
class ServicerMiddlewareClass(type):
def __new__(meta, classname, bases, class_dict):
new_class_dict = {}
for attribute_name, attribute in class_dict.items():
if isinstance(attribute, FunctionType):
# replace it with a wrapped version
attribute = wrapper(attribute)
new_class_dict[attribute_name] = attribute
return type.__new__(meta, classname, bases, new_class_dict)
# In order to use
class MyGrpcService(grpc.MyGrpcServicer, metaclass=ServicerMiddlewareClass):
...