Skip to content

Commit 32b770e

Browse files
authored
Merge branch 'master' into tailsampling-require-default-policy
2 parents ed8e204 + 6d8ad81 commit 32b770e

9 files changed

Lines changed: 698 additions & 12 deletions

File tree

beater/otlp/grpc.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/pkg/errors"
2424
"go.opentelemetry.io/collector/consumer/pdata"
2525
"go.opentelemetry.io/collector/receiver/otlpreceiver"
26+
"go.opentelemetry.io/collector/receiver/otlpreceiver/metrics"
2627
"go.opentelemetry.io/collector/receiver/otlpreceiver/trace"
2728
"google.golang.org/grpc"
2829

@@ -38,8 +39,10 @@ var (
3839
request.IDRequestCount, request.IDResponseCount, request.IDResponseErrorsCount, request.IDResponseValidCount,
3940
}
4041

41-
gRPCConsumerRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.consumer")
42-
gRPCConsumerMonitoringMap = request.MonitoringMapForRegistry(gRPCConsumerRegistry, monitoringKeys)
42+
gRPCMetricsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.metrics")
43+
gRPCMetricsMonitoringMap = request.MonitoringMapForRegistry(gRPCMetricsRegistry, monitoringKeys)
44+
gRPCTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.traces")
45+
gRPCTracesMonitoringMap = request.MonitoringMapForRegistry(gRPCTracesRegistry, monitoringKeys)
4346
)
4447

4548
// RegisterGRPCServices registers OTLP consumer services with the given gRPC server.
@@ -48,11 +51,24 @@ func RegisterGRPCServices(grpcServer *grpc.Server, reporter publish.Reporter, lo
4851
consumer: &otel.Consumer{Reporter: reporter},
4952
logger: logger,
5053
}
51-
// TODO(axw) add support for metrics to processer/otel.Consumer, and register a metrics receiver here.
54+
55+
// TODO(axw) rather than registering and unregistering monitoring callbacks
56+
// each time a new consumer is created, we should register one callback and
57+
// have it aggregate metrics from the dynamic consumers.
58+
//
59+
// For now, we take the easy way out: we only have one OTLP gRPC service
60+
// running at any time, so just unregister/register a new one.
61+
gRPCMetricsRegistry.Remove("consumer")
62+
monitoring.NewFunc(gRPCMetricsRegistry, "consumer", consumer.collectMetricsMonitoring, monitoring.Report)
63+
5264
traceReceiver := trace.New("otlp", consumer)
65+
metricsReceiver := metrics.New("otlp", consumer)
5366
if err := otlpreceiver.RegisterTraceReceiver(context.Background(), traceReceiver, grpcServer, nil); err != nil {
5467
return errors.Wrap(err, "failed to register OTLP trace receiver")
5568
}
69+
if err := otlpreceiver.RegisterMetricsReceiver(context.Background(), metricsReceiver, grpcServer, nil); err != nil {
70+
return errors.Wrap(err, "failed to register OTLP metrics receiver")
71+
}
5672
return nil
5773
}
5874

@@ -63,13 +79,36 @@ type monitoredConsumer struct {
6379

6480
// ConsumeTraces consumes OpenTelemtry trace data.
6581
func (c *monitoredConsumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error {
66-
gRPCConsumerMonitoringMap[request.IDRequestCount].Inc()
67-
defer gRPCConsumerMonitoringMap[request.IDResponseCount].Inc()
82+
gRPCTracesMonitoringMap[request.IDRequestCount].Inc()
83+
defer gRPCTracesMonitoringMap[request.IDResponseCount].Inc()
6884
if err := c.consumer.ConsumeTraces(ctx, traces); err != nil {
69-
gRPCConsumerMonitoringMap[request.IDResponseErrorsCount].Inc()
85+
gRPCTracesMonitoringMap[request.IDResponseErrorsCount].Inc()
7086
c.logger.With(logp.Error(err)).Error("ConsumeTraces returned an error")
7187
return err
7288
}
73-
gRPCConsumerMonitoringMap[request.IDResponseValidCount].Inc()
89+
gRPCTracesMonitoringMap[request.IDResponseValidCount].Inc()
7490
return nil
7591
}
92+
93+
// ConsumeMetrics consumes OpenTelemtry metrics data.
94+
func (c *monitoredConsumer) ConsumeMetrics(ctx context.Context, metrics pdata.Metrics) error {
95+
gRPCMetricsMonitoringMap[request.IDRequestCount].Inc()
96+
defer gRPCMetricsMonitoringMap[request.IDResponseCount].Inc()
97+
if err := c.consumer.ConsumeMetrics(ctx, metrics); err != nil {
98+
gRPCMetricsMonitoringMap[request.IDResponseErrorsCount].Inc()
99+
c.logger.With(logp.Error(err)).Error("ConsumeMetrics returned an error")
100+
return err
101+
}
102+
gRPCMetricsMonitoringMap[request.IDResponseValidCount].Inc()
103+
return nil
104+
}
105+
106+
func (c *monitoredConsumer) collectMetricsMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
107+
V.OnRegistryStart()
108+
V.OnRegistryFinished()
109+
110+
stats := c.consumer.Stats()
111+
monitoring.ReportNamespace(V, "consumer", func() {
112+
monitoring.ReportInt(V, "unsupported_dropped", stats.UnsupportedMetricsDropped)
113+
})
114+
}

beater/otlp/grpc_test.go

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@ import (
4040
)
4141

4242
var (
43-
exportTraceServiceRequestType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest")
44-
exportTraceServiceResponseType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse")
43+
exportMetricsServiceRequestType = proto.MessageType("opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest")
44+
exportMetricsServiceResponseType = proto.MessageType("opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse")
45+
exportTraceServiceRequestType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest")
46+
exportTraceServiceResponseType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse")
4547
)
4648

4749
func TestConsumeTraces(t *testing.T) {
@@ -93,7 +95,7 @@ func TestConsumeTraces(t *testing.T) {
9395
assert.Len(t, events, 2)
9496

9597
actual := map[string]interface{}{}
96-
monitoring.GetRegistry("apm-server.otlp.grpc.consumer").Do(monitoring.Full, func(key string, value interface{}) {
98+
monitoring.GetRegistry("apm-server.otlp.grpc.traces").Do(monitoring.Full, func(key string, value interface{}) {
9799
actual[key] = value
98100
})
99101
assert.Equal(t, map[string]interface{}{
@@ -104,6 +106,65 @@ func TestConsumeTraces(t *testing.T) {
104106
}, actual)
105107
}
106108

109+
func TestConsumeMetrics(t *testing.T) {
110+
var reportError error
111+
report := func(ctx context.Context, req publish.PendingReq) error {
112+
return reportError
113+
}
114+
115+
// Send a minimal metric to verify that everything is connected properly.
116+
//
117+
// We intentionally do not check the published event contents; those are
118+
// tested in processor/otel.
119+
cannedRequest := jsonExportMetricsServiceRequest(`{
120+
"resource_metrics": [
121+
{
122+
"instrumentation_library_metrics": [
123+
{
124+
"metrics": [
125+
{
126+
"name": "metric_name"
127+
}
128+
]
129+
}
130+
]
131+
}
132+
]
133+
}`)
134+
135+
conn := newServer(t, report)
136+
err := conn.Invoke(
137+
context.Background(), "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
138+
cannedRequest, newExportMetricsServiceResponse(),
139+
)
140+
assert.NoError(t, err)
141+
142+
reportError = errors.New("failed to publish events")
143+
err = conn.Invoke(
144+
context.Background(), "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
145+
cannedRequest, newExportMetricsServiceResponse(),
146+
)
147+
assert.Error(t, err)
148+
errStatus := status.Convert(err)
149+
assert.Equal(t, "failed to publish events", errStatus.Message())
150+
151+
actual := map[string]interface{}{}
152+
monitoring.GetRegistry("apm-server.otlp.grpc.metrics").Do(monitoring.Full, func(key string, value interface{}) {
153+
actual[key] = value
154+
})
155+
assert.Equal(t, map[string]interface{}{
156+
// In both of the requests we send above,
157+
// the metrics do not have a type and so
158+
// we treat them as unsupported metrics.
159+
"consumer.unsupported_dropped": int64(2),
160+
161+
"request.count": int64(2),
162+
"response.count": int64(2),
163+
"response.errors.count": int64(1),
164+
"response.valid.count": int64(1),
165+
}, actual)
166+
}
167+
107168
func jsonExportTraceServiceRequest(j string) interface{} {
108169
request := reflect.New(exportTraceServiceRequestType.Elem()).Interface()
109170
decoder := json.NewDecoder(strings.NewReader(j))
@@ -118,6 +179,20 @@ func newExportTraceServiceResponse() interface{} {
118179
return reflect.New(exportTraceServiceResponseType.Elem()).Interface()
119180
}
120181

182+
func jsonExportMetricsServiceRequest(j string) interface{} {
183+
request := reflect.New(exportMetricsServiceRequestType.Elem()).Interface()
184+
decoder := json.NewDecoder(strings.NewReader(j))
185+
decoder.DisallowUnknownFields()
186+
if err := decoder.Decode(request); err != nil {
187+
panic(err)
188+
}
189+
return request
190+
}
191+
192+
func newExportMetricsServiceResponse() interface{} {
193+
return reflect.New(exportMetricsServiceResponseType.Elem()).Interface()
194+
}
195+
121196
func newServer(t *testing.T, report publish.Reporter) *grpc.ClientConn {
122197
lis, err := net.Listen("tcp", "localhost:0")
123198
require.NoError(t, err)

changelogs/head.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ https://github.com/elastic/apm-server/compare/7.11\...master[View commits]
2424
* Support for reloading config in Fleet mode, gracefully stopping the HTTP server and starting a new one {pull}4623[4623]
2525
* Add a `_doc_count` field to transaction histogram docs {pull}4647[4647]
2626
* Upgrade Go to 1.15.7 {pull}4663[4663]
27-
* OpenTelemetry Protocol (OTLP) over gRPC is now supported on the standard endpoint (8200) {pull}4677[4677]
27+
* OpenTelemetry Protocol (OTLP) over gRPC is now supported on the standard endpoint (8200) {pull}4677[4677] {pull}4722[4722]
2828
* Add initial support for APM central config and sourcemaps when running under Fleet {pull}4670[4670]
2929
* Data stream and ILM policy for tail-based sampling {pull}4707[4707]
3030
* When tail-sampling is enabled, a default policy must be defined {pull}4729[4729]

processor/otel/consumer.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"net/url"
4242
"strconv"
4343
"strings"
44+
"sync/atomic"
4445

4546
"go.opentelemetry.io/collector/consumer/pdata"
4647
"go.opentelemetry.io/collector/translator/conventions"
@@ -65,9 +66,31 @@ const (
6566

6667
// Consumer transforms open-telemetry data to be compatible with elastic APM data
6768
type Consumer struct {
69+
stats consumerStats
70+
6871
Reporter publish.Reporter
6972
}
7073

74+
// ConsumerStats holds a snapshot of statistics about data consumption.
75+
type ConsumerStats struct {
76+
// UnsupportedMetricsDropped records the number of unsupported metrics
77+
// that have been dropped by the consumer.
78+
UnsupportedMetricsDropped int64
79+
}
80+
81+
// consumerStats holds the current statistics, which must be accessed and
82+
// modified using atomic operations.
83+
type consumerStats struct {
84+
unsupportedMetricsDropped int64
85+
}
86+
87+
// Stats returns a snapshot of the current statistics about data consumption.
88+
func (c *Consumer) Stats() ConsumerStats {
89+
return ConsumerStats{
90+
UnsupportedMetricsDropped: atomic.LoadInt64(&c.stats.unsupportedMetricsDropped),
91+
}
92+
}
93+
7194
// ConsumeTraces consumes OpenTelemetry trace data,
7295
// converting into Elastic APM events and reporting to the Elastic APM schema.
7396
func (c *Consumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error {

0 commit comments

Comments
 (0)