@@ -23,6 +23,7 @@ import (
2323 "github.com/pkg/errors"
2424 "go.opentelemetry.io/collector/consumer/pdata"
2525 "go.opentelemetry.io/collector/receiver/otlpreceiver"
26+ "go.opentelemetry.io/collector/receiver/otlpreceiver/metrics"
2627 "go.opentelemetry.io/collector/receiver/otlpreceiver/trace"
2728 "google.golang.org/grpc"
2829
3839 request .IDRequestCount , request .IDResponseCount , request .IDResponseErrorsCount , request .IDResponseValidCount ,
3940 }
4041
41- gRPCConsumerRegistry = monitoring .Default .NewRegistry ("apm-server.otlp.grpc.consumer" )
42- gRPCConsumerMonitoringMap = request .MonitoringMapForRegistry (gRPCConsumerRegistry , monitoringKeys )
42+ gRPCMetricsRegistry = monitoring .Default .NewRegistry ("apm-server.otlp.grpc.metrics" )
43+ gRPCMetricsMonitoringMap = request .MonitoringMapForRegistry (gRPCMetricsRegistry , monitoringKeys )
44+ gRPCTracesRegistry = monitoring .Default .NewRegistry ("apm-server.otlp.grpc.traces" )
45+ gRPCTracesMonitoringMap = request .MonitoringMapForRegistry (gRPCTracesRegistry , monitoringKeys )
4346)
4447
4548// RegisterGRPCServices registers OTLP consumer services with the given gRPC server.
@@ -48,11 +51,24 @@ func RegisterGRPCServices(grpcServer *grpc.Server, reporter publish.Reporter, lo
4851 consumer : & otel.Consumer {Reporter : reporter },
4952 logger : logger ,
5053 }
51- // TODO(axw) add support for metrics to processer/otel.Consumer, and register a metrics receiver here.
54+
55+ // TODO(axw) rather than registering and unregistering monitoring callbacks
56+ // each time a new consumer is created, we should register one callback and
57+ // have it aggregate metrics from the dynamic consumers.
58+ //
59+ // For now, we take the easy way out: we only have one OTLP gRPC service
60+ // running at any time, so just unregister/register a new one.
61+ gRPCMetricsRegistry .Remove ("consumer" )
62+ monitoring .NewFunc (gRPCMetricsRegistry , "consumer" , consumer .collectMetricsMonitoring , monitoring .Report )
63+
5264 traceReceiver := trace .New ("otlp" , consumer )
65+ metricsReceiver := metrics .New ("otlp" , consumer )
5366 if err := otlpreceiver .RegisterTraceReceiver (context .Background (), traceReceiver , grpcServer , nil ); err != nil {
5467 return errors .Wrap (err , "failed to register OTLP trace receiver" )
5568 }
69+ if err := otlpreceiver .RegisterMetricsReceiver (context .Background (), metricsReceiver , grpcServer , nil ); err != nil {
70+ return errors .Wrap (err , "failed to register OTLP metrics receiver" )
71+ }
5672 return nil
5773}
5874
@@ -63,13 +79,36 @@ type monitoredConsumer struct {
6379
6480// ConsumeTraces consumes OpenTelemtry trace data.
6581func (c * monitoredConsumer ) ConsumeTraces (ctx context.Context , traces pdata.Traces ) error {
66- gRPCConsumerMonitoringMap [request .IDRequestCount ].Inc ()
67- defer gRPCConsumerMonitoringMap [request .IDResponseCount ].Inc ()
82+ gRPCTracesMonitoringMap [request .IDRequestCount ].Inc ()
83+ defer gRPCTracesMonitoringMap [request .IDResponseCount ].Inc ()
6884 if err := c .consumer .ConsumeTraces (ctx , traces ); err != nil {
69- gRPCConsumerMonitoringMap [request .IDResponseErrorsCount ].Inc ()
85+ gRPCTracesMonitoringMap [request .IDResponseErrorsCount ].Inc ()
7086 c .logger .With (logp .Error (err )).Error ("ConsumeTraces returned an error" )
7187 return err
7288 }
73- gRPCConsumerMonitoringMap [request .IDResponseValidCount ].Inc ()
89+ gRPCTracesMonitoringMap [request .IDResponseValidCount ].Inc ()
7490 return nil
7591}
92+
93+ // ConsumeMetrics consumes OpenTelemtry metrics data.
94+ func (c * monitoredConsumer ) ConsumeMetrics (ctx context.Context , metrics pdata.Metrics ) error {
95+ gRPCMetricsMonitoringMap [request .IDRequestCount ].Inc ()
96+ defer gRPCMetricsMonitoringMap [request .IDResponseCount ].Inc ()
97+ if err := c .consumer .ConsumeMetrics (ctx , metrics ); err != nil {
98+ gRPCMetricsMonitoringMap [request .IDResponseErrorsCount ].Inc ()
99+ c .logger .With (logp .Error (err )).Error ("ConsumeMetrics returned an error" )
100+ return err
101+ }
102+ gRPCMetricsMonitoringMap [request .IDResponseValidCount ].Inc ()
103+ return nil
104+ }
105+
106+ func (c * monitoredConsumer ) collectMetricsMonitoring (_ monitoring.Mode , V monitoring.Visitor ) {
107+ V .OnRegistryStart ()
108+ V .OnRegistryFinished ()
109+
110+ stats := c .consumer .Stats ()
111+ monitoring .ReportNamespace (V , "consumer" , func () {
112+ monitoring .ReportInt (V , "unsupported_dropped" , stats .UnsupportedMetricsDropped )
113+ })
114+ }
0 commit comments