Skip to content

Support for gRPC aio #1099

@BalazsHoranyi

Description

@BalazsHoranyi

I would like to add support for the gRPC AsyncIO API server.
I think it should be relatively straightforward to implement. We just need to subclass the grpc.aio.ServerInterceptor instead of grpc.ServerInterceptor and add some async/await logic.
Something like this seems to work well.

class AsyncOpenTelemetryServerInterceptor(grpc.aio.ServerInterceptor, OpenTelemetryServerInterceptor):
    """
    An AsyncIO gRPC server interceptor, to add OpenTelemetry.
    Usage::
        tracer = some OpenTelemetry tracer
        interceptors = [
            AsyncOpenTelemetryServerInterceptor(tracer),
        ]
        server = aio.server(
            futures.ThreadPoolExecutor(max_workers=concurrency),
            interceptors = (interceptors,))
    """


    async def intercept_service(self, continuation, handler_call_details):
        def telemetry_wrapper(behavior, request_streaming, response_streaming):
            async def telemetry_interceptor(request_or_iterator, context):

                # handle streaming responses specially
                if response_streaming:
                    return self._intercept_server_stream(
                        behavior,
                        handler_call_details,
                        request_or_iterator,
                        context,
                    )

                with self._set_remote_context(context):
                    with self._start_span(
                        handler_call_details,
                        context,
                        set_status_on_exception=False,
                    ) as span:
                        # wrap the context
                        context = _OpenTelemetryServicerContext(context, span)

                        # And now we run the actual RPC.
                        try:
                            return await behavior(request_or_iterator, context)

                        except Exception as error:
                            # Bare exceptions are likely to be gRPC aborts, which
                            # we handle in our context wrapper.
                            # Here, we're interested in uncaught exceptions.
                            # pylint:disable=unidiomatic-typecheck
                            if type(error) != Exception:
                                span.record_exception(error)
                            raise error

            return telemetry_interceptor

        next_handler = await continuation(handler_call_details)


        return _wrap_rpc_behavior(
            next_handler, telemetry_wrapper
        )

I am unsure about intercepting the streaming response. Thoughts?

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions