-
Notifications
You must be signed in to change notification settings - Fork 906
Closed
Labels
Description
I would like to add support for the gRPC AsyncIO API server.
I think it should be relatively straightforward to implement. We just need to subclass the grpc.aio.ServerInterceptor instead of grpc.ServerInterceptor and add some async/await logic.
Something like this seems to work well.
class AsyncOpenTelemetryServerInterceptor(grpc.aio.ServerInterceptor, OpenTelemetryServerInterceptor):
"""
An AsyncIO gRPC server interceptor, to add OpenTelemetry.
Usage::
tracer = some OpenTelemetry tracer
interceptors = [
AsyncOpenTelemetryServerInterceptor(tracer),
]
server = aio.server(
futures.ThreadPoolExecutor(max_workers=concurrency),
interceptors = (interceptors,))
"""
async def intercept_service(self, continuation, handler_call_details):
def telemetry_wrapper(behavior, request_streaming, response_streaming):
async def telemetry_interceptor(request_or_iterator, context):
# handle streaming responses specially
if response_streaming:
return self._intercept_server_stream(
behavior,
handler_call_details,
request_or_iterator,
context,
)
with self._set_remote_context(context):
with self._start_span(
handler_call_details,
context,
set_status_on_exception=False,
) as span:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)
# And now we run the actual RPC.
try:
return await behavior(request_or_iterator, context)
except Exception as error:
# Bare exceptions are likely to be gRPC aborts, which
# we handle in our context wrapper.
# Here, we're interested in uncaught exceptions.
# pylint:disable=unidiomatic-typecheck
if type(error) != Exception:
span.record_exception(error)
raise error
return telemetry_interceptor
next_handler = await continuation(handler_call_details)
return _wrap_rpc_behavior(
next_handler, telemetry_wrapper
)
I am unsure about intercepting the streaming response. Thoughts?
Reactions are currently unavailable