Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- from starlette.requests import Request
- from google.rpc import code_pb2
- import sys
- from opencensus.trace import span as span_module
- from opencensus.trace import stack_trace, status
- from opencensus.trace import tracer as tracer_module
- from opencensus.trace import utils
- from opencensus.trace.propagation import trace_context_http_header_format
- from opencensus.ext.azure.trace_exporter import AzureExporter
- from opencensus.ext.azure.log_exporter import AzureLogHandler
- from opencensus.common import configuration
- from opencensus.trace import (
- attributes_helper,
- execution_context,
- print_exporter,
- samplers,
- )
- HTTP_HOST = attributes_helper.COMMON_ATTRIBUTES['HTTP_HOST']
- HTTP_METHOD = attributes_helper.COMMON_ATTRIBUTES['HTTP_METHOD']
- HTTP_PATH = attributes_helper.COMMON_ATTRIBUTES['HTTP_PATH']
- HTTP_ROUTE = attributes_helper.COMMON_ATTRIBUTES['HTTP_ROUTE']
- HTTP_URL = attributes_helper.COMMON_ATTRIBUTES['HTTP_URL']
- HTTP_STATUS_CODE = attributes_helper.COMMON_ATTRIBUTES['HTTP_STATUS_CODE']
- BLACKLIST_PATHS = 'BLACKLIST_PATHS'
- BLACKLIST_HOSTNAMES = 'BLACKLIST_HOSTNAMES'
- connection_string = 'InstrumentationKey=##############################################'
- # Logs are used to send error reports
- logger = logging.getLogger(__name__)
- logger.addHandler(AzureLogHandler(
- connection_string=connection_string))
- sampler = samplers.AlwaysOnSampler()
- exporter = AzureExporter(
- connection_string=connection_string)
- propagator = trace_context_http_header_format.TraceContextPropagator()
- class OpencensusMiddleware(object):
- def __init__(self, sampler=None, exporter=None, propagator=None):
- self.sampler = sampler
- self.exporter = exporter
- self.propagator = propagator
- self.tracer_instance = None
- self.span_instance = None
- self.init_app()
- def init_app(self):
- if self.sampler is None:
- self.sampler = samplers.ProbabilitySampler()
- if self.exporter is None:
- self.exporter = print_exporter.PrintExporter()
- if self.propagator is None:
- self.propagator = trace_context_http_header_format.TraceContextPropagator()
- def _before_request(self, request: Request):
- """
- This method sets the Tracer and creates the request Span.
- Args:
- ---------
- request: Request
- An object with request's headers, values and body.
- Return:
- ---------
- """
- # Do not trace if the url is blacklisted
- # --------------------- TODO review this later ---------------------
- try:
- span_context = self.propagator.from_headers(request.headers)
- self.tracer_instance = tracer_module.Tracer(
- span_context=span_context,
- sampler=self.sampler,
- exporter=self.exporter,
- propagator=self.propagator)
- self.span_instance = self.tracer_instance.start_span()
- self.span_instance.span_kind = span_module.SpanKind.SERVER
- # Set the span name as the name of the current module name
- self.span_instance.name = '[{}]{}'.format(
- request.method,
- request.url)
- self.tracer_instance.add_attribute_to_current_span(
- HTTP_HOST, request.url.hostname
- )
- self.tracer_instance.add_attribute_to_current_span(
- HTTP_METHOD, request.method
- )
- self.tracer_instance.add_attribute_to_current_span(
- HTTP_PATH, request.url.path
- )
- self.tracer_instance.add_attribute_to_current_span(
- HTTP_URL, str(request.url)
- )
- except Exception: # pragma: NO COVER
- logger.error('Failed to trace request', exc_info=True)
- def _after_request(self, response):
- """
- Sets the status code into the current Span.
- Args:
- ---------
- response: Response
- An object with status code and prediction results.
- Return:
- ---------
- """
- try:
- self.tracer_instance.add_attribute_to_current_span(
- HTTP_STATUS_CODE,
- response.status_code
- )
- except Exception: # pragma: NO COVER
- logger.error('Failed to trace request', exc_info=True)
- finally:
- return response
- def _teardown_request(self, exception):
- """
- Inserts the response code and finishes the Span.
- Args:
- ---------
- exception: Exception
- An object with request's headers, values and body.
- Return:
- ---------
- """
- try:
- self._throw_exceptions(
- span=self.span_instance, exception=exception)
- self.tracer_instance.end_span()
- self.tracer_instance.finish()
- except Exception: # pragma: NO COVER
- logger.error('Failed to trace request', exc_info=True)
- def _throw_exceptions(self, span, exception):
- """
- Verify if exists an exception, assign it to a span and throws to Azure.
- Args:
- ---------
- span: Span
- A time-span that threw an exception.
- exception: Exception / HTTPException
- Some error that needs to be thrown.
- Return:
- ---------
- """
- if exception is not None:
- logger.exception(exception)
- if span is not None:
- span.status = status.Status(
- code=code_pb2.UNKNOWN,
- message=str(exception)
- )
- # try attaching the stack trace to the span, only populated
- # if the app has 'PROPAGATE_EXCEPTIONS', 'DEBUG', or
- # 'TESTING' enabled
- exc_type, _, exc_traceback = sys.exc_info()
- if exc_traceback is not None:
- span.stack_trace = (
- stack_trace.StackTrace.from_traceback(
- exc_traceback
- )
- )
- opencensus_instance = OpencensusMiddleware(
- sampler=sampler, exporter=exporter, propagator=propagator)
Add Comment
Please, Sign In to add comment