Skip to content

Commit fafbdcb

Browse files
authored
otel: add otel-specific fields to ingested docs (#45242)
* otel: add otelcol.component.id field to ingested docs * adjust otelcol field name * rename otelcol.component.id in beats event to otel.component.name * add otel.component.type * update metricbeatreceiver unit tests * add integration test coverage * update receiver name in test pipeline * use filestream as receiver name * revert to otelcol.component.id and otelcol.component.kind * fix tests * ignore fields * ignore otel fields in gcppubsub test * move assertion on otel fields to the up * fix linter issues * handle existing otelcol.component.id * prevent otelcol fields from being replaced if they exist * put otelcol fields nested inside agent property * fix E2E tests * use EventuallyWithTf
1 parent 6815104 commit fafbdcb

12 files changed

Lines changed: 143 additions & 23 deletions

File tree

libbeat/beat/info.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Info struct {
4343
FIPSDistribution bool // If the beat was compiled as a FIPS distribution.
4444

4545
LogConsumer consumer.Logs // otel log consumer
46+
ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs"
4647
UseDefaultProcessors bool // Whether to use the default processors
4748
Logger *logp.Logger
4849
}

libbeat/otelbeat/oteltest/oteltest.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package oteltest
2020

2121
import (
2222
"context"
23+
"strings"
2324
"sync"
2425
"testing"
2526
"time"
@@ -107,17 +108,17 @@ func CheckReceivers(params CheckReceiversParams) {
107108
require.NotEmpty(t, rc.Beat, "receiver beat must not be empty")
108109

109110
var receiverSettings receiver.Settings
111+
receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name)
110112

111113
// Replicate the behavior of the collector logger
112114
receiverCore := core.
113115
With([]zapcore.Field{
114-
zap.String("otelcol.component.id", rc.Name),
116+
zap.String("otelcol.component.id", receiverSettings.ID.String()),
115117
zap.String("otelcol.component.kind", "receiver"),
116118
zap.String("otelcol.signal", "logs"),
117119
})
118120

119121
receiverSettings.Logger = zap.New(receiverCore)
120-
receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name)
121122

122123
logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
123124
for _, rl := range ld.ResourceLogs().All() {
@@ -160,9 +161,9 @@ func CheckReceivers(params CheckReceiversParams) {
160161
}
161162
})
162163

163-
beatForCompID := func(compID string) string {
164+
beatForCompName := func(compName string) string {
164165
for _, rec := range params.Receivers {
165-
if rec.Name == compID {
166+
if rec.Name == compName {
166167
return rec.Beat
167168
}
168169
}
@@ -183,8 +184,9 @@ func CheckReceivers(params CheckReceiversParams) {
183184
require.Contains(ct, zl.ContextMap(), "otelcol.component.id")
184185
compID, ok := zl.ContextMap()["otelcol.component.id"].(string)
185186
require.True(ct, ok, "otelcol.component.id should be a string")
187+
compName := strings.Split(compID, "/")[1]
186188
require.Contains(ct, zl.ContextMap(), "service.name")
187-
require.Equal(ct, beatForCompID(compID), zl.ContextMap()["service.name"])
189+
require.Equal(ct, beatForCompName(compName), zl.ContextMap()["service.name"])
188190
break
189191
}
190192
require.NotNil(ct, host.Evt, "expected not nil, got nil")

x-pack/filebeat/fbreceiver/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
4545
settings.ElasticLicensed = true
4646
settings.Initialize = append(settings.Initialize, include.InitializeModule)
4747

48-
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core())
48+
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
4949
if err != nil {
5050
return nil, fmt.Errorf("error creating %s: %w", Name, err)
5151
}

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ func TestNewReceiver(t *testing.T) {
9292
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
9393
_ = zapLogs
9494
require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"]))
95+
assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
96+
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
9597
var lastError strings.Builder
9698
assert.Conditionf(c, func() bool {
9799
return getFromSocket(t, &lastError, monitorSocket, "stats")
@@ -232,6 +234,11 @@ func TestMultipleReceivers(t *testing.T) {
232234
require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs")
233235
require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs")
234236

237+
assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record")
238+
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record")
239+
assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record")
240+
assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record")
241+
235242
// Make sure that each receiver has a separate logger
236243
// instance and does not interfere with others. Previously, the
237244
// logger in Beats was global, causing logger fields to be

x-pack/filebeat/input/gcppubsub/otel_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@ processors:
175175
"agent.ephemeral_id",
176176
"agent.id",
177177
"event.created",
178+
// only present in beats receivers
179+
"agent.otelcol.component.id",
180+
"agent.otelcol.component.kind",
178181
}
179182

180183
oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")

x-pack/filebeat/tests/integration/otel_test.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
2727
libbeattesting "github.com/elastic/beats/v7/libbeat/testing"
2828
"github.com/elastic/beats/v7/libbeat/tests/integration"
29+
"github.com/elastic/elastic-agent-libs/mapstr"
2930
"github.com/elastic/elastic-agent-libs/testing/estools"
3031
)
3132

@@ -120,18 +121,27 @@ http.port: %d
120121
},
121122
2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", numEvents)
122123

123-
filebeatDoc := filebeatDocs.Hits.Hits[0].Source
124-
otelDoc := otelDocs.Hits.Hits[0].Source
124+
var filebeatDoc, otelDoc mapstr.M
125+
filebeatDoc = filebeatDocs.Hits.Hits[0].Source
126+
otelDoc = otelDocs.Hits.Hits[0].Source
125127
ignoredFields := []string{
126128
// Expected to change between agentDocs and OtelDocs
127129
"@timestamp",
128130
"agent.ephemeral_id",
129131
"agent.id",
130132
"log.file.inode",
131133
"log.file.path",
134+
// only present in beats receivers
135+
"agent.otelcol.component.id",
136+
"agent.otelcol.component.kind",
132137
}
133138

134139
oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
140+
141+
assert.Equal(t, "filebeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
142+
assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
143+
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record")
144+
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record")
135145
assertMonitoring(t, otelMonitoringPort)
136146
}
137147

@@ -252,6 +262,9 @@ processors:
252262
"agent.ephemeral_id",
253263
"agent.id",
254264
"event.created",
265+
// only present in beats receivers
266+
"agent.otelcol.component.id",
267+
"agent.otelcol.component.kind",
255268
}
256269

257270
oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
@@ -325,7 +338,7 @@ func TestFilebeatOTelReceiverE2E(t *testing.T) {
325338
}
326339

327340
cfg := `receivers:
328-
filebeatreceiver:
341+
filebeatreceiver/filestream:
329342
filebeat:
330343
inputs:
331344
- type: filestream
@@ -366,7 +379,7 @@ service:
366379
pipelines:
367380
logs:
368381
receivers:
369-
- filebeatreceiver
382+
- filebeatreceiver/filestream
370383
exporters:
371384
- elasticsearch/log
372385
- debug
@@ -452,18 +465,26 @@ http.port: %d
452465
},
453466
2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", wantEvents)
454467

455-
filebeatDoc := filebeatDocs.Hits.Hits[0].Source
456-
otelDoc := otelDocs.Hits.Hits[0].Source
468+
var filebeatDoc, otelDoc mapstr.M
469+
filebeatDoc = filebeatDocs.Hits.Hits[0].Source
470+
otelDoc = otelDocs.Hits.Hits[0].Source
457471
ignoredFields := []string{
458472
// Expected to change between agentDocs and OtelDocs
459473
"@timestamp",
460474
"agent.ephemeral_id",
461475
"agent.id",
462476
"log.file.inode",
463477
"log.file.path",
478+
// only present in beats receivers
479+
"agent.otelcol.component.id",
480+
"agent.otelcol.component.kind",
464481
}
465482

466483
oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
484+
assert.Equal(t, "filebeatreceiver/filestream", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
485+
assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
486+
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record")
487+
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record")
467488
assertMonitoring(t, otelConfig.MonitoringPort)
468489
assertMonitoring(t, filebeatMonitoringPort) // filebeat
469490
}

x-pack/libbeat/cmd/instance/beat.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
)
3434

3535
// NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver
36-
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*instance.Beat, error) {
36+
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) {
3737
b, err := instance.NewBeat(settings.Name,
3838
settings.IndexPrefix,
3939
settings.Version,
@@ -43,6 +43,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
4343
return nil, err
4444
}
4545

46+
b.Info.ComponentID = componentID
4647
b.Info.LogConsumer = consumer
4748

4849
// begin code similar to configure

x-pack/libbeat/outputs/otelconsumer/otelconsumer.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,12 @@ import (
3232
const (
3333
// esDocumentIDAttribute is the attribute key used to store the document ID in the log record.
3434
esDocumentIDAttribute = "elasticsearch.document_id"
35-
beatNameCtxKey = "beat_name"
36-
beatVersionCtxtKey = "beat_version"
35+
// otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event.
36+
otelComponentIDKey = "otelcol.component.id"
37+
// otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver".
38+
otelComponentKindKey = "otelcol.component.kind"
39+
beatNameCtxKey = "beat_name"
40+
beatVersionCtxtKey = "beat_version"
3741
)
3842

3943
func init() {
@@ -145,6 +149,15 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
145149
}
146150
logRecord.SetObservedTimestamp(observedTimestamp)
147151

152+
if agent, _ := beatEvent.GetValue("agent"); agent != nil {
153+
switch agent := agent.(type) {
154+
case mapstr.M:
155+
agent[otelComponentIDKey] = out.beatInfo.ComponentID
156+
agent[otelComponentKindKey] = "receiver"
157+
beatEvent["agent"] = agent
158+
}
159+
}
160+
148161
otelmap.ConvertNonPrimitive(beatEvent)
149162

150163
// if data_stream field is set on beatEvent. Add it to logrecord.Attributes to support dynamic indexing

x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,53 @@ func TestPublish(t *testing.T) {
254254
assert.Len(t, batch.Signals, 1)
255255
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
256256
})
257+
t.Run("sets otel specific-fields", func(t *testing.T) {
258+
testCases := []struct {
259+
name string
260+
componentID string
261+
componentKind string
262+
expectedComponentID string
263+
expectedComponentKind string
264+
}{
265+
{
266+
name: "sets beat component ID",
267+
componentID: "filebeatreceiver/1",
268+
expectedComponentID: "filebeatreceiver/1",
269+
expectedComponentKind: "receiver",
270+
},
271+
}
257272

273+
for _, tc := range testCases {
274+
t.Run(tc.name, func(t *testing.T) {
275+
event := beat.Event{
276+
Fields: mapstr.M{
277+
"field": 1,
278+
"agent": mapstr.M{},
279+
},
280+
Meta: mapstr.M{
281+
"_id": "abc123",
282+
},
283+
}
284+
batch := outest.NewBatch(event)
285+
var countLogs int
286+
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
287+
countLogs = countLogs + ld.LogRecordCount()
288+
return nil
289+
})
290+
otelConsumer.beatInfo.ComponentID = tc.componentID
291+
err := otelConsumer.Publish(ctx, batch)
292+
assert.NoError(t, err)
293+
assert.Len(t, batch.Signals, 1)
294+
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
295+
assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed")
296+
for _, event := range batch.Events() {
297+
beatEvent := event.Content.Fields.Flatten()
298+
assert.Equal(t, tc.expectedComponentID, beatEvent["agent."+otelComponentIDKey], "expected agent.otelcol.component.id field in log record")
299+
assert.Equal(t, tc.expectedComponentKind, beatEvent["agent."+otelComponentKindKey], "expected agent.otelcol.component.kind field in log record")
300+
}
301+
})
302+
}
303+
})
258304
t.Run("sets the client context metadata with the beat info", func(t *testing.T) {
259305
batch := outest.NewBatch(event1)
260306
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {

x-pack/metricbeat/mbreceiver/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
5151
settings.ElasticLicensed = true
5252
settings.Initialize = append(settings.Initialize, include.InitializeModule)
5353

54-
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core())
54+
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
5555
if err != nil {
5656
return nil, fmt.Errorf("error creating %s: %w", Name, err)
5757
}

0 commit comments

Comments
 (0)