Skip to content

Commit afc53c0

Browse files
Logstash Exporter implements custom publisher batch (#46336)
Logstash exporter implements the `publisher.Batch` interface that wraps a `plog.Logs`, converting it to beats events and handling all the interface's functions it currently supports. With that done, we can fully reuse the beats/logstash clients, leveraging the existing clients backoff implementations, for example. --------- Co-authored-by: Edmo Vamerlatti Costa <edmo.vamerlatticosta@elastic.co>
1 parent 4873a73 commit afc53c0

File tree

9 files changed

+1209
-21
lines changed

9 files changed

+1209
-21
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package otelctx
19+
20+
import (
21+
"context"
22+
23+
"go.opentelemetry.io/collector/client"
24+
25+
"github.com/elastic/beats/v7/libbeat/beat"
26+
)
27+
28+
const (
29+
BeatNameCtxKey = "beat_name"
30+
BeatVersionCtxKey = "beat_version"
31+
)
32+
33+
// NewConsumerContext creates a new context.Context adding the beats metadata
34+
// to the client.Info. This is used to pass the beat name and version to the
35+
// Collector, so it can be used by the components to access that data.
36+
func NewConsumerContext(ctx context.Context, beatInfo beat.Info) context.Context {
37+
clientInfo := client.Info{
38+
Metadata: client.NewMetadata(map[string][]string{
39+
BeatNameCtxKey: {beatInfo.Beat},
40+
BeatVersionCtxKey: {beatInfo.Version},
41+
}),
42+
}
43+
return client.NewContext(ctx, clientInfo)
44+
}
45+
46+
// GetBeatName retrieves the beat name from the context metadata
47+
// If the name is not found, it returns an empty string.
48+
func GetBeatName(ctx context.Context) string {
49+
clientInfo := client.FromContext(ctx)
50+
if values := clientInfo.Metadata.Get(BeatNameCtxKey); len(values) > 0 {
51+
return values[0]
52+
}
53+
return ""
54+
}
55+
56+
// GetBeatVersion retrieves the version of the beat from the context metadata
57+
// If the version is not found, it returns an empty string.
58+
func GetBeatVersion(ctx context.Context) string {
59+
clientInfo := client.FromContext(ctx)
60+
if values := clientInfo.Metadata.Get(BeatVersionCtxKey); len(values) > 0 {
61+
return values[0]
62+
}
63+
return ""
64+
}
65+
66+
// GetBeatEventMeta gives beat.Event.Meta from the context metadata
67+
func GetBeatEventMeta(ctx context.Context) map[string]any {
68+
ctxData := client.FromContext(ctx)
69+
var beatName, beatVersion string
70+
if v := ctxData.Metadata.Get(BeatNameCtxKey); len(v) > 0 {
71+
beatName = v[0]
72+
}
73+
if v := ctxData.Metadata.Get(BeatVersionCtxKey); len(v) > 0 {
74+
beatVersion = v[0]
75+
}
76+
return map[string]any{
77+
"beat": beatName,
78+
"version": beatVersion,
79+
}
80+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package otelctx
19+
20+
import (
21+
"context"
22+
"testing"
23+
24+
"github.com/stretchr/testify/assert"
25+
"go.opentelemetry.io/collector/client"
26+
)
27+
28+
func TestGetBeatEventMeta(t *testing.T) {
29+
tests := []struct {
30+
name string
31+
setupCtx func() context.Context
32+
expected map[string]any
33+
}{
34+
{
35+
name: "complete metadata",
36+
setupCtx: func() context.Context {
37+
ctx := t.Context()
38+
info := client.Info{
39+
Metadata: client.NewMetadata(map[string][]string{
40+
BeatNameCtxKey: {"filebeat"},
41+
BeatVersionCtxKey: {"8.0.0"},
42+
}),
43+
}
44+
return client.NewContext(ctx, info)
45+
},
46+
expected: map[string]any{
47+
"beat": "filebeat",
48+
"version": "8.0.0",
49+
},
50+
},
51+
{
52+
name: "missing beat name",
53+
setupCtx: func() context.Context {
54+
ctx := t.Context()
55+
info := client.Info{
56+
Metadata: client.NewMetadata(map[string][]string{
57+
BeatVersionCtxKey: {"8.0.0"},
58+
}),
59+
}
60+
return client.NewContext(ctx, info)
61+
},
62+
expected: map[string]any{
63+
"beat": "",
64+
"version": "8.0.0",
65+
},
66+
},
67+
{
68+
name: "missing beat version",
69+
setupCtx: func() context.Context {
70+
ctx := t.Context()
71+
info := client.Info{
72+
Metadata: client.NewMetadata(map[string][]string{
73+
BeatNameCtxKey: {"filebeat"},
74+
}),
75+
}
76+
return client.NewContext(ctx, info)
77+
},
78+
expected: map[string]any{
79+
"beat": "filebeat",
80+
"version": "",
81+
},
82+
},
83+
{
84+
name: "no metadata",
85+
setupCtx: func() context.Context {
86+
ctx := t.Context()
87+
info := client.Info{
88+
Metadata: client.NewMetadata(map[string][]string{}),
89+
}
90+
return client.NewContext(ctx, info)
91+
},
92+
expected: map[string]any{
93+
"beat": "",
94+
"version": "",
95+
},
96+
},
97+
{
98+
name: "no client info in context",
99+
setupCtx: func() context.Context {
100+
return t.Context()
101+
},
102+
expected: map[string]any{
103+
"beat": "",
104+
"version": "",
105+
},
106+
},
107+
}
108+
109+
for _, tt := range tests {
110+
t.Run(tt.name, func(t *testing.T) {
111+
ctx := tt.setupCtx()
112+
113+
metadata := GetBeatEventMeta(ctx)
114+
115+
assert.Equal(t, tt.expected, metadata)
116+
})
117+
}
118+
}
119+
120+
func TestGetBeatVersion(t *testing.T) {
121+
tests := []struct {
122+
name string
123+
setupCtx func() context.Context
124+
expected string
125+
}{
126+
{
127+
name: "version exists",
128+
setupCtx: func() context.Context {
129+
ctx := t.Context()
130+
info := client.Info{
131+
Metadata: client.NewMetadata(map[string][]string{
132+
BeatNameCtxKey: {"filebeat"},
133+
BeatVersionCtxKey: {"8.0.0"},
134+
}),
135+
}
136+
return client.NewContext(ctx, info)
137+
},
138+
expected: "8.0.0",
139+
},
140+
{
141+
name: "version missing",
142+
setupCtx: func() context.Context {
143+
ctx := t.Context()
144+
info := client.Info{
145+
Metadata: client.NewMetadata(map[string][]string{
146+
BeatNameCtxKey: {"filebeat"},
147+
}),
148+
}
149+
return client.NewContext(ctx, info)
150+
},
151+
expected: "",
152+
},
153+
{
154+
name: "no client info",
155+
setupCtx: func() context.Context {
156+
return t.Context()
157+
},
158+
expected: "",
159+
},
160+
}
161+
162+
for _, tt := range tests {
163+
t.Run(tt.name, func(t *testing.T) {
164+
ctx := tt.setupCtx()
165+
166+
version := GetBeatVersion(ctx)
167+
168+
assert.Equal(t, tt.expected, version)
169+
})
170+
}
171+
}
172+
173+
func TestGetBeatName(t *testing.T) {
174+
tests := []struct {
175+
name string
176+
setupCtx func() context.Context
177+
expected string
178+
}{
179+
{
180+
name: "beat name exists",
181+
setupCtx: func() context.Context {
182+
ctx := t.Context()
183+
info := client.Info{
184+
Metadata: client.NewMetadata(map[string][]string{
185+
BeatNameCtxKey: {"filebeat"},
186+
}),
187+
}
188+
return client.NewContext(ctx, info)
189+
},
190+
expected: "filebeat",
191+
},
192+
{
193+
name: "beat name missing",
194+
setupCtx: func() context.Context {
195+
ctx := t.Context()
196+
info := client.Info{
197+
Metadata: client.NewMetadata(map[string][]string{}),
198+
}
199+
return client.NewContext(ctx, info)
200+
},
201+
expected: "",
202+
},
203+
{
204+
name: "no client info",
205+
setupCtx: func() context.Context {
206+
return t.Context()
207+
},
208+
expected: "",
209+
},
210+
}
211+
212+
for _, tt := range tests {
213+
t.Run(tt.name, func(t *testing.T) {
214+
ctx := tt.setupCtx()
215+
name := GetBeatName(ctx)
216+
assert.Equal(t, tt.expected, name)
217+
})
218+
}
219+
}

x-pack/libbeat/outputs/otelconsumer/otelconsumer.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,9 @@ import (
1111
"runtime"
1212
"time"
1313

14-
"go.opentelemetry.io/collector/client"
15-
1614
"github.com/elastic/beats/v7/libbeat/beat"
1715
"github.com/elastic/beats/v7/libbeat/common"
16+
"github.com/elastic/beats/v7/libbeat/otelbeat/otelctx"
1817
"github.com/elastic/beats/v7/libbeat/otelbeat/otelmap"
1918
"github.com/elastic/beats/v7/libbeat/outputs"
2019
"github.com/elastic/beats/v7/libbeat/publisher"
@@ -36,8 +35,6 @@ const (
3635
otelComponentIDKey = "otelcol.component.id"
3736
// otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver".
3837
otelComponentKindKey = "otelcol.component.kind"
39-
beatNameCtxKey = "beat_name"
40-
beatVersionCtxtKey = "beat_version"
4138
)
4239

4340
func init() {
@@ -180,7 +177,7 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
180177
}
181178
}
182179

183-
err := out.logsConsumer.ConsumeLogs(out.newConsumerContext(ctx), pLogs)
180+
err := out.logsConsumer.ConsumeLogs(otelctx.NewConsumerContext(ctx, out.beatInfo), pLogs)
184181
if err != nil {
185182
// Permanent errors shouldn't be retried. This tipically means
186183
// the data cannot be serialized by the exporter that is attached
@@ -206,19 +203,6 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
206203
return nil
207204
}
208205

209-
// newConsumerContext creates a new context.Context adding the beats metadata
210-
// to the client.Info. This is used to pass the beat name and version to the
211-
// Collector, so it can be used by the components to access that data.
212-
func (out *otelConsumer) newConsumerContext(ctx context.Context) context.Context {
213-
clientInfo := client.Info{
214-
Metadata: client.NewMetadata(map[string][]string{
215-
beatNameCtxKey: {out.beatInfo.Beat},
216-
beatVersionCtxtKey: {out.beatInfo.Version},
217-
}),
218-
}
219-
return client.NewContext(ctx, clientInfo)
220-
}
221-
222206
func (out *otelConsumer) String() string {
223207
return "otelconsumer"
224208
}

x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"go.opentelemetry.io/collector/receiver/receivertest"
2323

2424
"github.com/elastic/beats/v7/libbeat/beat"
25+
"github.com/elastic/beats/v7/libbeat/otelbeat/otelctx"
2526
"github.com/elastic/beats/v7/libbeat/outputs"
2627
"github.com/elastic/beats/v7/libbeat/outputs/outest"
2728
"github.com/elastic/elastic-agent-libs/logp/logptest"
@@ -305,8 +306,8 @@ func TestPublish(t *testing.T) {
305306
batch := outest.NewBatch(event1)
306307
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
307308
cm := client.FromContext(ctx).Metadata
308-
assert.Equal(t, beatInfo.Beat, cm.Get(beatNameCtxKey)[0])
309-
assert.Equal(t, beatInfo.Version, cm.Get(beatVersionCtxtKey)[0])
309+
assert.Equal(t, beatInfo.Beat, cm.Get(otelctx.BeatNameCtxKey)[0])
310+
assert.Equal(t, beatInfo.Version, cm.Get(otelctx.BeatVersionCtxKey)[0])
310311
return nil
311312
})
312313

x-pack/otel/exporter/logstashexporter/factory.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/elastic/beats/v7/libbeat/outputs"
1717
"github.com/elastic/beats/v7/libbeat/outputs/logstash"
18+
"github.com/elastic/beats/v7/x-pack/otel/exporter/logstashexporter/internal"
1819
"github.com/elastic/elastic-agent-libs/config"
1920
)
2021

@@ -73,6 +74,11 @@ func createLogsExporter(
7374
)
7475
}
7576

76-
func (l *logstashExporter) pushLogData(context.Context, plog.Logs) error {
77+
func (l *logstashExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
78+
_, err := internal.NewLogBatch(ctx, ld)
79+
if err != nil {
80+
return err
81+
}
82+
7783
return nil
7884
}

0 commit comments

Comments
 (0)